In order to meet the needs of different use cases, TDMQ for Apache Pulsar supports four subscription modes: exclusive, shared, failover, and key_shared.
Exclusive Mode
Exclusive mode (default): A subscription can only be associated with one consumer. Only this consumer can receive all messages in the topic, and if it fails, consumption will stop.
In the exclusive subscription mode, only one consumer in a subscription can consume messages in the topic. If multiple consumers subscribe to the topic, an error will be reported. This mode is suitable for globally sequential consumption scenarios.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
If multiple consumers are started, an error will be reported.
Shared Mode
Messages are distributed to different consumers through a customizable round robin mechanism, with each message going to only one consumer. When a consumer is disconnected, any messages delivered to it but not acknowledged will be redistributed to other active consumers.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
There can be multiple consumers in the shared mode.
Failover Mode
In this mode, when there are multiple consumers, they will be sorted lexicographically, and the first consumer is initialized to be the only one who can receive messages. When the first consumer is disconnected, all the unacknowledged and upcoming messages will be distributed to the next consumer in the queue.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
There can be multiple consumers in the failover mode.
Key_Shared mMode
If there are multiple consumers, messages will be distributed by key, and messages with the same key will only be distributed to the same consumer.
Note:
The key_shared mode has certain use limits. It is continuously iterated in the community due to its complex engineering implementation. Therefore, it doesn't have the same level of stability as exclusive, failover, and shared modes. We recommend that you first select the other three modes if they can meet your business needs.
Pro clusters can guarantee the sequential delivery of messages with the same key, while virtual clusters cannot.
Suggestions for the key_shared mode
When to use the key_shared mode
Choose the shared mode for general production/consumption scenarios.
If you want messages with the same key to be distributed to the same consumer, you cannot use the shared mode. You have two options:
Choose the key_shared mode.
Use a multi-partition topic + failover mode.
Where to use the key_shared mode
There are a lot of message keys with even message distribution.
Consumption is fast with no message heap.
If the above two conditions cannot be met in the production process, we recommend that you use the combo of multi-partition topic and failover mode.
Sample code
Sample key_shared subscription
By default, Pulsar enables the batch feature when producing messages, and batch messages are parsed on the consumer side. Therefore, a batch of messages on the broker side is treated as one entry. Since messages with different keys may be packaged into the same batch, the key_shared mode will become ineffective in this case because it achieves sequential subscription based on the same message key. There are two ways to avoid this when creating a producer:
1. Disable the batch feature.
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
MessageId msgId = producer.newMessage()
.value(value.getBytes(StandardCharsets.UTF_8))
.key("youKey1")
.send();
2. Use the key_based batch type.
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
MessageId msgId = producer.newMessage()
.value(value.getBytes(StandardCharsets.UTF_8))
.key("youKey1")
.send();
Sample code for the consumer:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
There can be multiple consumers in the key_shared mode.
Sample "multi-partition topic + failover" subscription
Note:
In this mode, each partition will be assigned to only one consumer instance at a time. When there are more consumers than partitions, the excessive consumers cannot consume messages. This problem can be solved by adding more partitions.
Try to ensure an even key distribution when designing keys.
The delayed message is not supported in failover mode.
1. Sample code for the producer.
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
MessageId msgId = producer.newMessage()
.value(value.getBytes(StandardCharsets.UTF_8))
.key("youKey1")
.send();
2. Sample code for the consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
Enabling sequence guarantee
TDMQ for Apache Pulsar 2.9.2 clusters support the sequential message delivery by key. To enable the sequence guarantee feature, you need to specify keySharedPolicy
when creating the consumer instance.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxx/sdk_java/topic1")
.subscriptionName("sub_topic1")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
.subscribe();
Note:
The sequence guarantee feature is not supported for clusters on v2.7.2, which may lead to message push congestion and subsequent consumption failure.
If the sequence guarantee feature is enabled, the consumption may slow down and messages may be heaped after the consumer is restarted. This is because the feature requires the restarted consumer to consume messages sequentially, starting with earlier received messages (with all the consumption acknowledged) and moving on to any later received messages.
Was this page helpful?