In messaging scenarios, it is common to encounter message sending failures and message heap exceeding expectations, leading to messages not being consumed normally. To deal with these scenarios, TDMQ for Pulsar provides message retry and dead letter mechanisms.
Automatic Retry
An automatic retry topic is designed to ensure that messages are consumed normally. If no normal response is received after some messages are consumed by the consumer for the first time, messages will enter the retry topic. And after a specified number of failed retries, they will stop retrying and will be delivered to the dead letter topic.
When messages enter the dead letter queue, it indicates that TDMQ for Apache Pulsar can no longer automatically process them. At this point, manual intervention is typically required. You can create a dedicated client to subscribe to the dead letter Topic and handle the previously failed messages.
Relevant concepts
Retry Topic : Each retry topic corresponds to one subscription name (the unique identifier of a subscriber group) and exists in TDMQ for Pulsar in the form of a topic. When you create a subscription in the console and enable automatically create retry & dead-letter queues
, a retry topic will be automatically created, which can independently implement the message retry mechanism.
This topic is named as follows:
For clusters on v2.9.2: [ Topic name]-[ subscription name]-RETRY
For clusters on v2.7.2: [ Subscription name]-retry
For clusters on v2.6.1: [ Subscription name]-retry
How It Works
When you create a consumer that subscribes to a Topic in shared mode using a specific subscription name and enable the enableRetry
attribute, it will automatically subscribe to the retry queue associated with that subscription name.
When consumption fails, calling consumer.reconsumeLater
API will cause the client to check the retry count of the message internally. If the specified maximum retry count is reached, the message will be delivered to the dead letter queue. (Messages in the dead letter queue cannot be consumed automatically. If consumption is needed, users need to create extra consumers.) If the maximum retry count is not reached, the message will be delivered to the retry queue. The retry interval is implemented via the delayed message. The message in the retry queue is essentially a delayed message, and the delay time is specified by users through reconsumeLater
API.
Note:
Only the shared mode (including Key sharing) supports the automatic retry and dead letter mechanisms.
If the subscription mode is Exclusive or Failover, the specified retry interval is invalid, and there will be an immediate retry. This is because the retry interval is implemented via the delayed message, which is not supported under Exclusive or Failover mode.
Note that the client version must match the cluster version for the client to accurately recognize the automatically created retry and dead letter queues.
During the use of tokens to access the retry/dead letter queue, roles used by consumers need to be granted permission to produce messages.
Here, a Java client is used as an example. The sub1
subscription is created in topic1
, and the client uses the sub1
subscription name to subscribe to topic1
and enables enableRetry
as shown below:
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://1******30/my-ns/topic1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.subscriptionName("sub1")
.subscribe();
At this point, the sub1
subscription to topic1
forms a delivery model with the retry mechanism, and sub1
will automatically subscribe to the retry letter topic created automatically during subscription creation (which can be found in the topic list in the console). If no ack is received from the consumer after a message in topic1
is delivered for the first time, the message will be automatically delivered to the retry letter topic, and as the consumer has subscribed to this topic automatically, the message will subsequently be consumed again according to particular retry rules. If it still fails to be consumed after the specified maximum number of retries, it will be delivered to the dead letter topic and wait for manual processing. Note:
If the subscription is automatically created by the client, you can click Topic Management > More > View Subscription in the console to enter the Subscription Management page and manually recreate the retry letter and dead letter topics.
Custom parameter settings
A set of retry and dead letter parameters are configured for TDMQ for Apache Pulsar by default as follows:
Specify the number of retries as 16 (after 16 failed retries, the message will be delivered to the dead letter topic at the 17th time)
Specify the retry letter topic as [topic name]-[subscription name]-RETRY
Specify the dead letter topic as [topic name]-[subscription name]-DLQ
Specify the number of retries as 16 (after 16 failed retries, the message will be delivered to the dead letter topic at the 17th time)
Specify the retry letter topic as [subscription name]-RETRY
Specify the dead letter topic as [subscription name]-DLQ
Specify the number of retries as 16 (after 16 failed retries, the message will be delivered to the dead letter topic at the 17th time)
Specify the retry letter topic as [subscription name]-retry
Specify the dead letter topic as [subscription name]-dlq
You can call the deadLetterPolicy
API to customize these parameters. Below is the sample code:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/sub1-retry")
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")
.build())
.subscribe();
Retry rules
The retry rules are implemented through the reconsumerLater
API in three modes:
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
consumer.reconsumeLater(msg, 1);
consumer.reconsumeLater(msg);
Mode 1: specify any delay time. Enter the delay time in the second parameter and specify the time unit in the third parameter. The delay time is in the same value range as delayed message, which is 1–864,000 seconds.
Mode 2: specify any delay level (for existing users of the Tencent Cloud SDK only) . Its implementation effect is basically the same as that of mode 1, but it allows you to manage the delay time in distributed systems more easily. The delay level is as described below:
1.1 The second parameter in reconsumeLater(msg, 1)
is the message level.
1.2 By default, the MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
constant determines the respective delay time of each level; for example, level 1 corresponds to 1s and level 3 to 10s. You can customize the default value if it cannot meet your actual business needs.
Mode 3: increase with level (for existing users of the Tencent Cloud SDK only) . Different from the implementation effect of the above two modes, this mode adopts backoff retry; that is, the retry interval is 1s after the first failure, 5s after the second failure, and so on. The more the failures, the longer the interval. The specific retry interval is also determined by the MESSAGE_DELAYLEVEL
parameter as described in mode 2.
This retry mechanism often has more practical applications in business scenarios. If consumption fails, the service generally will not be recovered immediately, therefore using such progressive retry mechanism makes more sense.
Note:
If you use the SDK provided by the Pulsar community, the delay level and increase with level modes are not supported.
Attributes of retry message
A retry message carries the following attributes:
{
REAL_TOPIC="persistent://my-property/my-ns/test,
ORIGIN_MESSAGE_ID=314:28:-1,
RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,
RECONSUMETIMES=16
}
REAL_TOPIC
: original topic
ORIGIN_MESSAGE_ID
: ID of the initially produced message
RETRY_TOPIC
: retry letter topic
RECONSUMETIMES
: number of retries performed for the message
Method to Obtain Retry Count
msg.getProperties().get("RECONSUMETIMES")
Note:
The number of retries obtained through the msg.getRedeliveryCount() API corresponds to the number under the negativeAcknowledge retry mode.
Flow of retry message ID
The message ID flow process is as shown below, and you can analyze relevant logs based on it.
Original consumption: msgid=1:1:0:1
1st retry: msgid=2:1:-1
2nd retry: msgid=2:2:-1
3rd retry: msgid=2:3:-1
.......
16th retry: msgid=2:16:0:1
17th write to the dead letter topic: msgid=3:1:-1
Complete sample code
The retry letter (-RETRY) topic must be enabled first in the consumer ( enableRetry(true)
), which is disabled by default. Then, the reconsumeLater()
API needs to be called before messages can be sent to the retry letter topic.
The dead letter topic (-DLQ) must call consumer.reconsumeLater()
. After reconsumeLater()
is executed, the message of the original topic will be acked and transferred to the retry letter topic. After the retry reaches the upper limit, the message will then be transferred to the dead letter topic. The Pulsar client subscribes to the retry topic automatically, but not to the dead letter topic, which users must subscribe to on their own.
Below is the sample code for implementing the complete message retry mechanism in TDMQ for Apache Pulsar:
Subscribe to a topic
Consumer<byte[]> consumer1 = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.subscribe();
Executing consumption
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e){
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
}
}
Active Retry
If the client fails to consume a message and wants to reconsume it, the consumer can call the negativeAcknowledge API. The message will be obtained again after a period of time. The interval for reobtaining the message can be specified by consumers through the configuration of negativeAckRedeliveryDelay API.
Brief Description of the Implementation Mechanism
The clients cache the message ID of negativeAcknowledge and the list of negativeAcknowledge messages (the scan interval is negativeAckRedeliveryDelay x 1/3) is regularly scanned for the clients. When messages reach the time specified by negativeAckRedeliveryDelay, the client notifies the server to resubmit the messages that need to be retried. Upon receiving the client's redelivery request, the server re-pushes the corresponding messages to the client.
Note:
1. The actual time to receive the message again may be 1/3 more than the time specified by negativeAckRedeliveryDelay. This is related to the client's implementation logic.
2. In this method, no new messages are generated.
Below is the sample code in Java for active retry:
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e){
consumer.negativeAcknowledge(msg);
}
}
Precautions
1. In the retry method of negativeAcknowledge, the messages that need to be retried are still unacked in the server's view.
2. In the retry method of negativeAcknowledge, there is no default maximum retry count. However, it can be achieved by configuring the maximum retry count and the dead letter queue. In this method, after a message has been retried for the specified number of times, it will be delivered to the dead letter queue.
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(5)
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")
.build())
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
3. In the retry method of negativeAcknowledge, the retry count can be obtained from msg.getRedeliveryCount(). However, note that if all consumers under a subscription are offline, the message retry count will be reset to 0 (typical scenario: if there is only one consumer under a subscription, after the consumer restarts, the retry count of the messages will be reset to 0).
Was this page helpful?