pip install rocketmq-client-python
from rocketmq.client import Producer, Message# 初始化生产者,并设置生产组信息,group1producer = Producer(groupName)# 设置服务地址producer.set_name_server_address(nameserver)# 设置权限(角色名和密钥)producer.set_session_credentials(accessKey, # 角色密钥secretKey, # 角色名称'')# 启动生产者producer.start()# 组装消息 topic名称,在控制台 topic 页面复制。msg = Message(topicName)# 设置keysmsg.set_keys(TAGS)# 设置tagsmsg.set_tags(KEYS)# 消息内容msg.set_body('This is a new message.')# 发送同步消息ret = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)# 资源释放producer.shutdown()
参数 | 说明 |
groupName | 生产者组名称。在控制台集群管理中 Group tab 中获取。 |
nameserver | 集群接入地址,在集群基本信息中,根据使用需求,使用不同的内网/公网接入地址 |
secretKey | 角色名称,在 集群权限 页面复制 SecretKey 复制。 |
accessKey | 角色密钥,在 集群权限 页面复制 AccessKey 复制。 |
topicName | Topic 的名称,在控制台 Topic 页面复制。 |
TAGS | 用来设置消息的 TAG。 |
KEYS | 设置消息业务 key。 |
import timefrom rocketmq.client import PushConsumer, ConsumeStatus# 消息处理回调def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息consumer = PushConsumer(groupName)# 设置服务地址consumer.set_name_server_address(nameserver)# 设置权限(角色名和密钥)consumer.set_session_credentials(accessKey, # 角色密钥secretKey, # 角色名称'')# 订阅topicconsumer.subscribe(topicName, callback, TAGS)print(' [Consumer] Waiting for messages.')# 启动消费者consumer.start()while True:time.sleep(3600)# 资源释放consumer.shutdown()
参数 | 说明 |
groupName | 消费者 Group 的名称,在控制台 Group 页面复制。 |
nameserver | 同生产地址。 |
secretKey | 同生产消息的获取方式。 |
accessKey | 同生产消息的获取方式。 |
topicName | Topic 的名称,在控制台 Topic 页面复制。 |
TAGS | 设置订阅消息的tag,默认为"*",表示订阅所有消息。 |
本页内容是否解决了您的问题?