操作场景
本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
1. 准备环境。
pip install 'pulsar-client==3.1.0'
2. 创建客户端。
client = pulsar.Client(
authentication=pulsar.AuthenticationToken(
AUTHENTICATION),
service_url=SERVICE_URL)
|
SERVICE_URL | 集群接入地址,可以在控制台 集群管理 页面查看并复制。 |
AUTHENTICATION | |
3. 创建生产者。
producer = client.create_producer(
topic='pulsar-xxx/sdk_python/topic1'
)
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。 4. 发送消息。
producer.send(
'Hello python client, this is a msg.'.encode('utf-8'),
properties={'k': 'v'},
partition_key='yourKey'
)
还可以使用异步方式发送消息。
def send_callback(send_result, msg_id):
print('Message published: result:{} msg_id:{}'.format(send_result, msg_id))
producer.send_async(
'Hello python client, this is a async msg.'.encode('utf-8'),
callback=send_callback,
properties={'k': 'v'},
partition_key='yourKey'
)
5. 创建消费者。
consumer = client.subscribe(
topic='pulsar-xxx/sdk_python/topic1',
subscription_name='sub_topic1'
)
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。 subscriptionName 需要写入订阅名,可在消费管理界面查看。
6. 消费消息。
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
本页内容是否解决了您的问题?