Hello World!
使用Pika库连接到Rabbitmq。
本次要实现的功能:生产者生直接发送消息到队列中,消费者消费队列中的数据。
逻辑结构如下:
Rabbitmq的通信协议有很多,这里使用AMQP 0-9-1,其中python对应的库为Pika 1.0.0
pip安装此库:
python -m pip install pika --upgrade
生产者:
在发送数据之前需要先建立连接
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
- channel = connection.channel()
如果要带用户名和密码,以及virtualHost需要这么写:
- credentials = pika.PlainCredentials('userName', 'password')
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='ipaddress', port=5672, virtual_host="/virtualHost", credentials=credentials))
- channel = connection.channel()
发消息前要确保队列是存在的,声明一下队列
channel.queue_declare(queue="hello_python")
Rabbitmq其实是不能直接连接队列的,他需要一个交换机,通过交换机去连队列。但在写代码时,将exchange设置为空字符串,他就会走默认的交换机,将routing_key设置为队列的名字,就变成了,直连队列的效果。
代码如下:
channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')
Rabbitmq其实是不能直接连接队列的,他需要一个交换机,通过交换机去连队列。但在写代码时,将exchange设置为空字符串,他就会走默认的交换机,将routing_key设置为队列的名字,就变成了,直连队列的效果。
代码如下:
channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')
最后要关闭连接:
connection.close()
下面是消费者:
接收生产者发送的消息,并将消息打印到屏幕上。
首先还是创建连接,以及使用queue_declare确保队列存在
- credentials = pika.PlainCredentials('userName', 'password')
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='ipaddress', port=5672, virtual_host="/virtualHost", credentials=credentials))
- channel = connection.channel()
- channel.queue_declare(queue="hello_python")
定义一个回调函数用于消息的接收,如下:
- def receiverCallBak(ch, method, properties, body):
- print("Recived %r" % body)
- pass
告诉channel,接收到数据后,调用这个回调函数:
channel.basic_consume(queue="hello_python", auto_ack=True, on_message_callback=receiverCallBak)
最后进入循环接收数据:
channel.start_consuming()
程序运行截图如下:
整体代码如下:
- import pika
-
- def producer():
- credentials = pika.PlainCredentials('userName', 'password')
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host="/hehe", credentials=credentials))
- channel = connection.channel()
- channel.queue_declare(queue="hello_python")
- channel.basic_publish(exchange='', routing_key='hello_python', body='Hello World')
- connection.close()
- print("send msg over")
- pass
-
- def receiverCallBak(ch, method, properties, body):
- print("Recived %r" % body)
- pass
-
- def receiver():
- credentials = pika.PlainCredentials('userName', 'password')
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host="/hehe", credentials=credentials))
- channel = connection.channel()
- channel.queue_declare(queue="hello_python")
- channel.basic_consume(queue="hello_python", auto_ack=True, on_message_callback=receiverCallBak)
- channel.start_consuming()
- print("Receiver set finished")
- pass
-
-
-
- if __name__ == '__main__':
-
- for i in range(1, 10):
- producer()
- pass
-
- receiver()
- pass