pip install 'pulsar-client==3.1.0'
# Create a clientclient = pulsar.Client(authentication=pulsar.AuthenticationToken(# Authorized role tokenAUTHENTICATION),# Service access addressservice_url=SERVICE_URL)
Parameter | Description |
SERVICE_URL | |
AUTHENTICATION | |
# Create a producerproducer = client.create_producer(# Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.topic='pulsar-xxx/sdk_python/topic1')
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.# Send the messageproducer.send(# Message content'Hello python client, this is a msg.'.encode('utf-8'),# Message parameterproperties={'k': 'v'},# Business keypartition_key='yourKey')
# Send the callback in async modedef send_callback(send_result, msg_id):print('Message published: result:{} msg_id:{}'.format(send_result, msg_id))# Send the messageproducer.send_async(# Message content'Hello python client, this is a async msg.'.encode('utf-8'),# Async callbackcallback=send_callback,# Message configurationproperties={'k': 'v'},# Business keypartition_key='yourKey')
# Subscribe to the messageconsumer = client.subscribe(# Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.topic='pulsar-xxx/sdk_python/topic1',# Subscription namesubscription_name='sub_topic1')
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.subscriptionName
parameter. The name can be viewed on the Consumption Management page.# Obtain the messagemsg = consumer.receive()try:# Simulate the business processing logicprint("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))# Return `ack` as the acknowledgement if the consumption is successfulconsumer.acknowledge(msg)except:# If the consumption fails, the message will be delivered again.consumer.negative_acknowledge(msg)
Was this page helpful?