python -m pip install pika --upgrade
import pika
messageProducer.py
.import pika# Use the username and password to create a login credential objectcredentials = pika.PlainCredentials('rolename', 'eyJr***')# Create a connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host='amqp-xx.rabbitmq.x.com', port=5672, virtual_host='amqp-xxx|Vhostname', credentials=credentials))# Establish a channelchannel = connection.channel()# Declare the exchangechannel.exchange_declare(exchange='direct_exchange', exchange_type="direct")routingKeys = ['aaa.bbb.ccc', 'aaa.bbb.ddd', 'aaa.ccc.zzz', "xxx.yyy.zzz"]for routingKey in routingKeys:# Send a message to the specified exchange# If you send a message without specifying the exchange, you need to specify the message queue. If the exchange is specified, the `routing_key` parameter indicates the routing key; otherwise, it indicates the message queue namechannel.basic_publish(exchange='direct_exchange',routing_key=routingKey,body=(routingKey + 'This is a new direct message.').encode(),properties=pika.BasicProperties(delivery_mode=2, # Set message persistence))print('send success msg to rabbitmq')connection.close()
Parameter | Description |
rolename | |
eyJr*** | |
host | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster page. |
port | Cluster access port, which can be obtained from Access Address in the Operation column on the Cluster page. |
virtual_host | Vhost name in the format of "cluster ID + | + vhost name", which can be copied on the Vhost page in the console. |
direct_exchange | Exchange name, which can be obtained from the exchange list in the console. |
routingKeys | Message routing rule, which can be obtained in the Binding Key column in the binding list in the console. |
messageConsumer.py
.import osimport pikaimport sysdef main():# Use the username and password to create a login credential objectcredentials = pika.PlainCredentials('rolename', 'eyJr***')# Create a connectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host='amqp-xx.rabbitmq.x.com', port=5672, virtual_host='amqp-xxx|Vhostname', credentials=credentials))# Establish a channelchannel = connection.channel()# Declare the message queuechannel.queue_declare(queue='route_queue1', exclusive=True, durable=True)# Bind the message queue to the exchange and specify the routing keyrouting_keys = ['aaa.bbb.ccc', 'aaa.bbb.ddd']for routingKey in routing_keys:channel.queue_bind(exchange='direct_exchange', queue="route_queue1", routing_key=routingKey)# Set that only one unacknowledged message can be receivedchannel.basic_qos(prefetch_count=1)# Message consumption logicdef callback(ch, method, properties, body):print(" [Consumer1(Direct 'aaa.bbb.ccc'/'aaa.bbb.ddd')] Received (%r)" % body)# Manually return the ACKch.basic_ack(delivery_tag=method.delivery_tag)# Create a consumer to consume messages in the message queuechannel.basic_consume(queue='route_queue1',on_message_callback=callback,auto_ack=False) # Set to manual acknowledgmentprint(" [Consumer1(Direct 'aaa.bbb.ccc'/'aaa.bbb.ddd')] Waiting for messages. To exit press CTRL+C")channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)
Parameter | Description |
rolename | |
eyJr*** | |
host | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster page. |
port | Cluster access port, which can be obtained from Access Address in the Operation column on the Cluster page. |
virtual_host | Vhost name in the format of "cluster ID + | + vhost name", which can be copied on the Vhost page in the console. |
direct_exchange | Exchange name, which can be obtained from the exchange list in the console. |
route_queue1 | Queue name, which can be obtained from the queue list in the console. |
routingKey | Message routing rule, which can be obtained in the Binding Key column in the binding list in the console. |
Was this page helpful?