关键词搜索

源码搜索 ×
×

Python&Rabbitmq文档阅读笔记-生产者数据直接送入队列消费者消费

发布2021-03-16浏览691次

详情内容

Hello World!

使用Pika库连接到Rabbitmq

本次要实现的功能:生产者生直接发送消息到队列中,消费者消费队列中的数据。

逻辑结构如下:

Rabbitmq的通信协议有很多,这里使用AMQP 0-9-1,其中python对应的库为Pika 1.0.0

pip安装此库:

python -m pip install pika --upgrade

生产者:

在发送数据之前需要先建立连接

  1. import pika
  2. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  3. channel = connection.channel()

如果要带用户名和密码,以及virtualHost需要这么写:

  1. credentials = pika.PlainCredentials('userName', 'password')
  2. connection = pika.BlockingConnection(pika.ConnectionParameters(host='ipaddress', port=5672, virtual_host="/virtualHost", credentials=credentials))
  3. channel = connection.channel()

发消息前要确保队列是存在的,声明一下队列

channel.queue_declare(queue="hello_python")

Rabbitmq其实是不能直接连接队列的,他需要一个交换机,通过交换机去连队列。但在写代码时,将exchange设置为空字符串,他就会走默认的交换机,将routing_key设置为队列的名字,就变成了,直连队列的效果。

代码如下:

channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')

Rabbitmq其实是不能直接连接队列的,他需要一个交换机,通过交换机去连队列。但在写代码时,将exchange设置为空字符串,他就会走默认的交换机,将routing_key设置为队列的名字,就变成了,直连队列的效果。

代码如下:

channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')

最后要关闭连接:

connection.close()

下面是消费者:

接收生产者发送的消息,并将消息打印到屏幕上。

首先还是创建连接,以及使用queue_declare确保队列存在

  1. credentials = pika.PlainCredentials('userName', 'password')
  2. connection = pika.BlockingConnection(pika.ConnectionParameters(host='ipaddress', port=5672, virtual_host="/virtualHost", credentials=credentials))
  3. channel = connection.channel()
  4. channel.queue_declare(queue="hello_python")

定义一个回调函数用于消息的接收,如下:

  1. def receiverCallBak(ch, method, properties, body):
  2. print("Recived %r" % body)
  3. pass

告诉channel,接收到数据后,调用这个回调函数:

channel.basic_consume(queue="hello_python", auto_ack=True, on_message_callback=receiverCallBak)

最后进入循环接收数据:

channel.start_consuming()

程序运行截图如下:

整体代码如下:

  1. import pika
  2. def producer():
  3. credentials = pika.PlainCredentials('userName', 'password')
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host="/hehe", credentials=credentials))
  5. channel = connection.channel()
  6. channel.queue_declare(queue="hello_python")
  7. channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')
  8. connection.close()
  9. print("send msg over")
  10. pass
  11. def receiverCallBak(ch, method, properties, body):
  12. print("Recived %r" % body)
  13. pass
  14. def receiver():
  15. credentials = pika.PlainCredentials('userName', 'password')
  16. connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host="/hehe", credentials=credentials))
  17. channel = connection.channel()
  18. channel.queue_declare(queue="hello_python")
  19. channel.basic_consume(queue="hello_python", auto_ack=True, on_message_callback=receiverCallBak)
  20. channel.start_consuming()
  21. print("Receiver set finished")
  22. pass
  23. if __name__ == '__main__':
  24. for i in range(1, 10):
  25. producer()
  26. pass
  27. receiver()
  28. pass

 

相关技术文章

最新源码

下载排行榜

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载