消息场景下,经常会发生消息发送失败、消息堆积超过预期等消息未被正常消费的场景。为应对这些场景,TDMQ Pulsar 版提供了消息重试和死信机制。
自动重试
自动重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
相关概念
重试 Topic:一个重试 Topic 对应一个订阅名(一个订阅者组的唯一标识),以 Topic 形式存在于 TDMQ Pulsar 版中。当您在控制台新建订阅,并打开自动创建重试&死信队列
,系统会自动创建重试 Topic,该 Topic 会自主实现消息重试的机制。
该 Topic 命名为:
2.9.2 版本集群:[Topic 名称]-[订阅名]-RETRY
2.7.2 版本集群:[订阅名]-RETRY
2.6.1 版本集群:[订阅名]-retry
实现原理
当消费失败,调用consumer.reconsumeLater
接口之后,客户端内部检查消息对应的重试次数,如果达到指定的最大重试次数,消费被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater
中指定的时间。
说明:
仅共享模式(包括 Key 共享)支持自动重试和死信机制。
如果订阅模式是 Exclusive 或 Failover,指定的重试的时间间隔无效,会立即重试。本质是因为重试间隔是通过延迟消息功能实现的,但是Exclusive 或 Failover 模式下面不支持延迟消息。
注意客户端版本需要与集群版本保持一致,客户端才能准确识别自动创建出的重试、死信队列。
当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。
这里以 Java 语言客户端为例,在 topic1
创建了一个 sub1
的订阅,客户端使用 sub1
订阅名订阅了 topic1
并开启了 enableRetry
,如下所示:
Consumer consumer = client.newConsumer()
.topic("persistent://1******30/my-ns/topic1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.subscriptionName("sub1")
.subscribe();
此时,topic1
对 sub1
的订阅就形成了带有重试机制的投递模式,sub1
会自动订阅之前在新建订阅时自动创建的重试 Topic 中(可以在控制台 Topic 列表中找到)。当 topic1
中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic ,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则 下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列,等待人工处理。 说明:
如果是 client 端自动创建的订阅,可以通过控制台上的 Topic 管理 > 更多 > 查看订阅进入消费管理页面手动重建重试和死信队列。 自定义参数设置
TDMQ Pulsar 版会默认配置一套重试和死信参数,具体如下:
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为[Topic 名称]-[订阅名]-RETRY
指定死信队列为[Topic 名称]-[订阅名]-DLQ
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为 [订阅名]-RETRY
指定死信队列为 [订阅名]-DLQ
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为[订阅名]-retry
指定死信队列为[订阅名]-dlq
如果希望自定义配置这些参数,可以使用 deadLetterPolicy
API 进行配置,代码如下:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/sub1-retry")
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")
.build())
.subscribe();
重试规则
重试规则由 reconsumerLater
API 实现,有三种模式:
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
consumer.reconsumeLater(msg, 1);
consumer.reconsumeLater(msg);
第一种:指定任意延迟时间。第二个参数填写延迟时间,第三个参数指定时间单位。延迟时间和延时消息的取值范围一致,范围在1 - 864000(单位:秒)。
第二种:指定任意延迟等级(仅限存量腾讯云版SDK的用户使用)。实现效果和第一种基本一致,更方便统一管理分布式系统中的延时时长,延迟等级说明如下:
1.1 reconsumeLater(msg, 1)
中的第二个参数即为消息等级。
1.2 默认MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
,这个常数决定了每级对应的延时时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。
第三种:等级递增(仅限存量腾讯云版 SDK 的用户使用)。实现的效果不同于以上两种,为退避式的重试,即第一次失败后重试间隔为1秒,第二次失败后重试间隔为5秒,以此类推,次数越多,间隔时间越长。具体时间间隔同样由第二种中介绍的 MESSAGE_DELAYLEVEL
决定。
这种重试机制往往在业务场景中有更实际的应用,如果消费失败,一般服务不会立刻恢复,使用这种渐进式的重试方式更为合理。
注意:
如果您使用的是 Pulsar 社区的 SDK,则不支持延迟等级和等级递增两种模式。
重试消息的消息属性
一条重试消息会给消息带上如下 property。
{
REAL_TOPIC="persistent://my-property/my-ns/test,
ORIGIN_MESSAGE_ID=314:28:-1,
RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,
RECONSUMETIMES=16
}
REAL_TOPIC
:原 Topic。
ORIGIN_MESSAGE_ID
:最初生产的消息 ID。
RETRY_TOPIC
:重试 Topic。
RECONSUMETIMES
:代表该消息重试的次数。
重试次数的获取方法
msg.getProperties().get("RECONSUMETIMES")
注意:
通过 msg.getRedeliveryCount() 接口获取到的是 negativeAcknowledge 重试方式下的重试次数。
重试消息的消息 ID 流转
消息 ID 流转过程如下所示,您可以借助此规则对相关日志进行分析。
原始消费: msgid=1:1:0:1
第一次重试: msgid=2:1:-1
第二次重试: msgid=2:2:-1
第三次重试: msgid=2:3:-1
.......
第16次重试: msgid=2:16:0:1
第17次写入死信队列: msgid=3:1:-1
完整代码示例
重试(-RETRY)Topic 需要在 Consumer 中首先开启该功能(enableRetry(true)),默认为关闭状态。之后需要调用 reconsumeLater() 的接口消息才会被发送到重试 Topic 中。
死信(-DLQ)Topic 需要调用 consumer.reconsumeLater(),执行 reconsumeLater 之后原 topic 的那条消息会被 ack,消息转存到 retry topic,重试到达上限后消息转存至死信。Pulsar Client 会自动订阅 retry topic,但是进入死信就不会自动订阅,需要用户自己来订阅。
以下为借助 TDMQ Pulsar 版实现完整消息重试机制的代码示例,供开发者参考。
订阅主题
Consumer<byte[]> consumer1 = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.subscribe();
执行消费
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
}
}
主动重试
当客户端消费某条消息失败,如果想重新消费到这条消息,消费者可以调用 negativeAcknowledge 接口。消息会在一段时间后重新被获取到,重新获取消息的间隔时间,可以通过 consumer 配置 negativeAckRedeliveryDelay 指定。
实现机制简述
客户端会缓存 negativeAcknowledge 的消息 id,客户端内部定期扫描 negativeAcknowledge 消息的列表(扫描间隔 negativeAckRedeliveryDelay * 1/3)。当有到达 negativeAckRedeliveryDelay 指定时间的消息之后,客户端通知服务端重新投递对应需要重试的消息,服务端接受到客户端的重新投递请求之后,重新推送对应的消息到客户端。
注意:
1. 实际重新接收到消息的时间可能会比 negativeAckRedeliveryDelay 指定的时间多 1/3,这里和客户端的实现逻辑相关。
2. 这种方式下,并没有产生新消息。
以下为主动重试的 Java 代码示例:
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
注意事项
1. negativeAcknowledge 的重试方式下,对应需要重试的消息在服务端看了依然是 unack 的消息。
2. negativeAcknowledge的重试方式下,默认没有最大重试次数。但是,可以通过并且配置最大重试次数和死信队列的方式实现。这种方式下,当一条消息重试了指定次数之后,会被投递到死信队列中。
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(5)
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")
.build())
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.printf("Message received: %s", new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
3. negativeAcknowledge 的重试方式下,重试次数可以从 msg.getRedeliveryCount() 中获取。但是要注意,如果一个订阅下的消费者全部离线,那么消息的重试次数会被重置到 0(典型的场景:一个订阅下只有一个消费者,那么消费者重启之后,消息的重试次数都会重置为 0 )。
本页内容是否解决了您的问题?