pip install rocketmq-client-python
from rocketmq.client import Producer, Message# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1producer = Producer(groupName)# 设置服务地址producer.set_name_server_address(nameserver)# 设置权限(角色名和密钥)producer.set_session_credentials(accessKey, # 角色密钥secretKey, # 角色名称'')# 启动生产者producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。msg = Message(topicName)# 设置keysmsg.set_keys(TAGS)# 设置tagsmsg.set_tags(KEYS)# 消息内容msg.set_body('This is a new message.')# 发送同步消息ret = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)# 资源释放producer.shutdown()
import timefrom rocketmq.client import PushConsumer, ConsumeStatus# 消息处理回调def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息consumer = PushConsumer(groupName)# 设置服务地址consumer.set_name_server_address(nameserver)# 设置权限(角色名和密钥)consumer.set_session_credentials(accessKey, # 角色密钥secretKey, # 角色名称'')# 订阅topicconsumer.subscribe(topicName, callback, TAGS)print(' [Consumer] Waiting for messages.')# 启动消费者consumer.start()while True:time.sleep(3600)# 资源释放consumer.shutdown()
本页内容是否解决了您的问题?