tencent cloud

All product documents
TDMQ for Apache Pulsar
Subscription Mode
Last updated: 2024-06-28 11:29:49
Subscription Mode
Last updated: 2024-06-28 11:29:49
In order to meet the needs of different use cases, TDMQ for Apache Pulsar supports four subscription modes: exclusive, shared, failover, and key_shared.




Exclusive Mode

Exclusive mode (default): A subscription can only be associated with one consumer. Only this consumer can receive all messages in the topic, and if it fails, consumption will stop.
In the exclusive subscription mode, only one consumer in a subscription can consume messages in the topic. If multiple consumers subscribe to the topic, an error will be reported. This mode is suitable for globally sequential consumption scenarios.



// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the exclusive mode to be the consumption mode
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
If multiple consumers are started, an error will be reported.



Shared Mode

Messages are distributed to different consumers through a customizable round robin mechanism, with each message going to only one consumer. When a consumer is disconnected, any messages delivered to it but not acknowledged will be redistributed to other active consumers.



// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the shared mode to be the consumption mode
.subscriptionType(SubscriptionType.Shared)
.subscribe();
There can be multiple consumers in the shared mode.



Failover Mode

In this mode, when there are multiple consumers, they will be sorted lexicographically, and the first consumer is initialized to be the only one who can receive messages. When the first consumer is disconnected, all the unacknowledged and upcoming messages will be distributed to the next consumer in the queue.



// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the failover mode to be the consumption mode
.subscriptionType(SubscriptionType.Failover)
.subscribe();
There can be multiple consumers in the failover mode.



Key_Shared mMode

If there are multiple consumers, messages will be distributed by key, and messages with the same key will only be distributed to the same consumer.



Note:
The key_shared mode has certain use limits. It is continuously iterated in the community due to its complex engineering implementation. Therefore, it doesn't have the same level of stability as exclusive, failover, and shared modes. We recommend that you first select the other three modes if they can meet your business needs.
Pro clusters can guarantee the sequential delivery of messages with the same key, while virtual clusters cannot.

Suggestions for the key_shared mode

When to use the key_shared mode

Choose the shared mode for general production/consumption scenarios.
If you want messages with the same key to be distributed to the same consumer, you cannot use the shared mode. You have two options:
Choose the key_shared mode.
Use a multi-partition topic + failover mode.

Where to use the key_shared mode

There are a lot of message keys with even message distribution.
Consumption is fast with no message heap.
If the above two conditions cannot be met in the production process, we recommend that you use the combo of multi-partition topic and failover mode.

Sample code

Sample key_shared subscription

By default, Pulsar enables the batch feature when producing messages, and batch messages are parsed on the consumer side. Therefore, a batch of messages on the broker side is treated as one entry. Since messages with different keys may be packaged into the same batch, the key_shared mode will become ineffective in this case because it achieves sequential subscription based on the same message key. There are two ways to avoid this when creating a producer:
1. Disable the batch feature.
// Construct a producer
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
// Set the key when sending messages
MessageId msgId = producer.newMessage()
// Message content
.value(value.getBytes(StandardCharsets.UTF_8))
// Set the key here. Messages with the same key will only be distributed to the same consumer.
.key("youKey1")
.send();
2. Use the key_based batch type.
// Construct a producer
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
// Set the key when sending messages
MessageId msgId = producer.newMessage()
// Message content
.value(value.getBytes(StandardCharsets.UTF_8))
// Set the key here. Messages with the same key will only be distributed to the same consumer.
.key("youKey1")
.send();
Sample code for the consumer:
// Construct a consumer Consumer<byte[]> consumer = pulsarClient.newConsumer() // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page. .topic("persistent://pulsar-xxx/sdk_java/topic1") // You need to create a subscription on the topic details page in the console and enter the subscription name here .subscriptionName("sub_topic1") // Declare the key_shared mode to be the consumption mode .subscriptionType(SubscriptionType.Key_Shared) .subscribe();
There can be multiple consumers in the key_shared mode.



Sample "multi-partition topic + failover" subscription

Note:
In this mode, each partition will be assigned to only one consumer instance at a time. When there are more consumers than partitions, the excessive consumers cannot consume messages. This problem can be solved by adding more partitions.
Try to ensure an even key distribution when designing keys.
The delayed message is not supported in failover mode.
1. Sample code for the producer.
// Construct a producer
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false) // Disable the batch feature
.create();
// Set the key when sending messages
MessageId msgId = producer.newMessage()
// Message content
.value(value.getBytes(StandardCharsets.UTF_8))
// Set the key here. Messages with the same key will be sent to the same partition.
.key("youKey1")
.send();
2. Sample code for the consumer.
// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the failover mode to be the consumption mode
.subscriptionType(SubscriptionType.Failover)
.subscribe();

Enabling sequence guarantee

TDMQ for Apache Pulsar 2.9.2 clusters support the sequential message delivery by key. To enable the sequence guarantee feature, you need to specify keySharedPolicy when creating the consumer instance.
// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the key_shared mode to be the consumption mode
.subscriptionType(SubscriptionType.Key_Shared)
// Set to require sequence
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
.subscribe();
Note:
The sequence guarantee feature is not supported for clusters on v2.7.2, which may lead to message push congestion and subsequent consumption failure.
If the sequence guarantee feature is enabled, the consumption may slow down and messages may be heaped after the consumer is restarted. This is because the feature requires the restarted consumer to consume messages sequentially, starting with earlier received messages (with all the consumption acknowledged) and moving on to any later received messages.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon