librocketmq
first.librocketmq
2.0.0 or later as instructed in Install librocketmq.rocketmq-client-python
.pip install rocketmq-client-python
from rocketmq.client import Producer, Message# Initialize the producer and set the producer group information. Be sure to use the full name of the group, such as `rocketmq-xxx|namespace_python%group1`.producer = Producer(groupName)# Set the service addressproducer.set_name_server_address(nameserver)# Set permissions (role name and token)producer.set_session_credentials(accessKey, # Role tokensecretKey, # Role name'')# Start the producerproducer.start()# Assemble messages. The topic name can be copied on the **Topic** page in the console.msg = Message(topicName)# Set keysmsg.set_keys(TAGS)# Set tagsmsg.set_tags(KEYS)# Message contentmsg.set_body('This is a new message.')# Send messages in sync moderet = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)# Release resourcesproducer.shutdown()
Parameter | Description |
groupName | Producer group name, which can be obtained under the Group tab on the cluster details page in the console. |
nameserver | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
topicName | Topic name, which can be copied on the Topic page in the console. |
TAGS | A parameter used to set the message tag. |
KEYS | A parameter used to set the message key. |
import timefrom rocketmq.client import PushConsumer, ConsumeStatus# Message processing callbackdef callback(msg):# Simulate the business processing logicprint('Received message. messageId: ', msg.id, ' body: ', msg.body)# Return CONSUME_SUCCESS if the consumption is successfulreturn ConsumeStatus.CONSUME_SUCCESS# Return the consumption status if the consumption is successful# return ConsumeStatus.RECONSUME_LATER# Initialize the consumer and set the consumer group informationconsumer = PushConsumer(groupName)# Set the service addressconsumer.set_name_server_address(nameserver)# Set permissions (role name and token)consumer.set_session_credentials(accessKey, # Role tokensecretKey, # Role name'')# Subscribe to a topicconsumer.subscribe(topicName, callback, TAGS)print(' [Consumer] Waiting for messages.')# Start the consumerconsumer.start()while True:time.sleep(3600)# Release resourcesconsumer.shutdown()
Parameter | Description |
groupName | Consumer group name, which can be copied under the Group tab on the cluster details page. |
nameserver | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
topicName | Topic name, which can be copied on the Topic page in the console. |
TAGS | A parameter used to set the tag of messages that are subscribed to. The default value is set to * , which means subscribing to all messages. |
Was this page helpful?