librocketmq
needs to be installed in advance.pip install rocketmq-client-python
from rocketmq.client import Producer, Message# Initialize the producer and set group information. group1producer = Producer(groupName)# Set the service addressproducer.set_name_server_address(nameserver)# Set permissions (role name and key)producer.set_session_credentials(accessKey, # Role keysecretKey, # Role name'')# Start the producerproducer.start()# Assemble the message. The topic name can be copied from the Topic tab on the console.msg = Message(topicName)# Set keysmsg.set_keys(TAGS)# Set tagsmsg.set_tags(KEYS)# Message contentmsg.set_body('This is a new message.')# Send synchronous messagesret = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)# Release resourcesproducer.shutdown()
Parameter | Description |
groupName | Producer group name, which is obtained from the Group tab of cluster management on the console. |
nameserver | Cluster access address in the basic information of the cluster. Select either the private network or public network access address as needed. |
secretKey | Role name, which can be copied from SecretKey on the Cluster Permission page. |
accessKey | Role key, which can be copied from AccessKey on the Cluster Permission page. |
topicName | Topic name, which can be copied from the Topic tab on the console. |
TAGS | Used to set the message tag. |
KEYS | Used to configure the message business key. |
import timefrom rocketmq.client import PushConsumer, ConsumeStatus# Callback for message processing.def callback(msg):# Simulate the business.print('Received message. messageId: ', msg.id, ' body: ', msg.body)# Return `CONSUME_SUCCESS` if the consumption is successful.return ConsumeStatus.CONSUME_SUCCESS# Return the status of the message upon successful consumption.# return ConsumeStatus.RECONSUME_LATER# Initialize the consumer and set the consumer group information.consumer = PushConsumer(groupName)# Set the service addressconsumer.set_name_server_address(nameserver)# Set permissions (role name and key)consumer.set_session_credentials(accessKey, # Role keysecretKey, # Role name'')# Subscribe to a topic.consumer.subscribe(topicName, callback, TAGS)print(' [Consumer] Waiting for messages.')# Start the consumer.consumer.start()while True:time.sleep(3600)# Release resourcesconsumer.shutdown()
Parameter | Description |
groupName | Consumer group name, which can be copied from the Group tab on the console. |
nameserver | The same as the producer address. |
secretKey | The same as the method of acquiring produced messages. |
accessKey | The same as the method of acquiring produced messages. |
topicName | Topic name, which can be copied from the Topic tab on the console. |
TAGS | Set the tag of subscribed messages, which is set to * by default, indicating the subscription to all messages. |
Was this page helpful?