tencent cloud

文档反馈

消息重试与死信机制

最后更新时间:2024-12-02 17:03:07
    消息场景下,经常会发生消息发送失败、消息堆积超过预期等消息未被正常消费的场景。为应对这些场景,TDMQ Pulsar 版提供了消息重试和死信机制。

    自动重试

    自动重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
    当消息进入到死信队列中,表示 TDMQ Pulsar 版已经无法自动处理这批消息,一般这时就需要人为介入来处理这批消息。您可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。

    相关概念

    重试 Topic:一个重试 Topic 对应一个订阅名(一个订阅者组的唯一标识),以 Topic 形式存在于 TDMQ Pulsar 版中。当您在控制台新建订阅,并打开自动创建重试&死信队列,系统会自动创建重试 Topic,该 Topic 会自主实现消息重试的机制。
    该 Topic 命名为:
    2.9.2 版本集群:[Topic 名称]-[订阅名]-RETRY
    2.7.2 版本集群:[订阅名]-RETRY
    2.6.1 版本集群:[订阅名]-retry

    实现原理

    您创建的消费者使用某个订阅名以共享模式订阅了一个 Topic 后,如果开启了 enableRetry 属性,就会自动订阅这个订阅名对应的重试队列。
    当消费失败,调用consumer.reconsumeLater接口之后,客户端内部检查消息对应的重试次数,如果达到指定的最大重试次数,消费被投递到死信队列(投递到死信队列的消息不会自动消费,如果需要,用户自己创建额外的消费者进行消费);如果没有达到最大重试次数,消费被投递到重试队列。重试间隔是通过延迟消息实现的,投递到重试队列的实际上是一个延迟消息,延迟时间就是用户在reconsumeLater中指定的时间。
    说明:
    仅共享模式(包括 Key 共享)支持自动重试和死信机制。
    如果订阅模式是 Exclusive 或 Failover,指定的重试的时间间隔无效,会立即重试。本质是因为重试间隔是通过延迟消息功能实现的,但是Exclusive 或 Failover 模式下面不支持延迟消息。
    注意客户端版本需要与集群版本保持一致,客户端才能准确识别自动创建出的重试、死信队列。
    当使用 Token 访问重试/死信队列时,需要为消费者所使用角色赋予生产消息权限。
    这里以 Java 语言客户端为例,在 topic1 创建了一个 sub1 的订阅,客户端使用 sub1 订阅名订阅了 topic1 并开启了 enableRetry,如下所示:
    Consumer consumer = client.newConsumer()
    .topic("persistent://1******30/my-ns/topic1")
    .subscriptionType(SubscriptionType.Shared)//仅共享消费模式支持重试和死信
    .enableRetry(true)
    .subscriptionName("sub1")
    .subscribe();
    此时,topic1sub1 的订阅就形成了带有重试机制的投递模式,sub1 会自动订阅之前在新建订阅时自动创建的重试 Topic 中(可以在控制台 Topic 列表中找到)。当 topic1 中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic ,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则 下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列,等待人工处理。
    说明:
    如果是 client 端自动创建的订阅,可以通过控制台上的 Topic 管理 > 更多 > 查看订阅进入消费管理页面手动重建重试和死信队列。
    
    
    

    自定义参数设置

    TDMQ Pulsar 版会默认配置一套重试和死信参数,具体如下:
    2.9.2 版本集群
    2.7.2 版本集群
    2.6.1 版本集群
    指定重试次数为16次(失败16次后,第17次会投递到死信队列)
    指定重试队列为[Topic 名称]-[订阅名]-RETRY
    指定死信队列为[Topic 名称]-[订阅名]-DLQ
    指定重试次数为16次(失败16次后,第17次会投递到死信队列)
    指定重试队列为 [订阅名]-RETRY
    指定死信队列为 [订阅名]-DLQ
    指定重试次数为16次(失败16次后,第17次会投递到死信队列)
    指定重试队列为[订阅名]-retry
    指定死信队列为[订阅名]-dlq
    如果希望自定义配置这些参数,可以使用 deadLetterPolicy API 进行配置,代码如下:
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    .topic("persistent://pulsar-****")
    .subscriptionName("sub1")
    .subscriptionType(SubscriptionType.Shared)
    .enableRetry(true)//开启重试消费
    .deadLetterPolicy(DeadLetterPolicy.builder()
    .maxRedeliverCount(maxRedeliveryCount)//可以指定最大重试次数
    .retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//可以指定重试队列
    .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
    .build())
    .subscribe();

    重试规则

    重试规则由 reconsumerLater API 实现,有三种模式:
    //指定任意延迟时间
    consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
    //指定延迟等级
    consumer.reconsumeLater(msg, 1);
    //等级递增
    consumer.reconsumeLater(msg);
    第一种:指定任意延迟时间。第二个参数填写延迟时间,第三个参数指定时间单位。延迟时间和延时消息的取值范围一致,范围在1 - 864000(单位:秒)。
    第二种:指定任意延迟等级(仅限存量腾讯云版SDK的用户使用)。实现效果和第一种基本一致,更方便统一管理分布式系统中的延时时长,延迟等级说明如下:
    1.1 reconsumeLater(msg, 1)中的第二个参数即为消息等级。
    1.2 默认MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",这个常数决定了每级对应的延时时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。
    第三种:等级递增(仅限存量腾讯云版 SDK 的用户使用)。实现的效果不同于以上两种,为退避式的重试,即第一次失败后重试间隔为1秒,第二次失败后重试间隔为5秒,以此类推,次数越多,间隔时间越长。具体时间间隔同样由第二种中介绍的 MESSAGE_DELAYLEVEL 决定。
    这种重试机制往往在业务场景中有更实际的应用,如果消费失败,一般服务不会立刻恢复,使用这种渐进式的重试方式更为合理。
    注意:
    如果您使用的是 Pulsar 社区的 SDK,则不支持延迟等级和等级递增两种模式。

    重试消息的消息属性

    一条重试消息会给消息带上如下 property。
    {
    REAL_TOPIC="persistent://my-property/my-ns/test,
    ORIGIN_MESSAGE_ID=314:28:-1,
    RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,
    RECONSUMETIMES=16
    }
    REAL_TOPIC:原 Topic。
    ORIGIN_MESSAGE_ID:最初生产的消息 ID。
    RETRY_TOPIC:重试 Topic。
    RECONSUMETIMES:代表该消息重试的次数。

    重试次数的获取方法

    msg.getProperties().get("RECONSUMETIMES")
    注意:
    通过 msg.getRedeliveryCount() 接口获取到的是 negativeAcknowledge 重试方式下的重试次数。

    重试消息的消息 ID 流转

    消息 ID 流转过程如下所示,您可以借助此规则对相关日志进行分析。
    原始消费: msgid=1:1:0:1
    第一次重试: msgid=2:1:-1
    第二次重试: msgid=2:2:-1
    第三次重试: msgid=2:3:-1
    .......
    第16次重试: msgid=2:16:0:1
    第17次写入死信队列: msgid=3:1:-1

    完整代码示例

    重试(-RETRY)Topic 需要在 Consumer 中首先开启该功能(enableRetry(true)),默认为关闭状态。之后需要调用 reconsumeLater() 的接口消息才会被发送到重试 Topic 中。
    死信(-DLQ)Topic 需要调用  consumer.reconsumeLater(),执行 reconsumeLater 之后原 topic 的那条消息会被 ack,消息转存到 retry topic,重试到达上限后消息转存至死信。Pulsar Client 会自动订阅 retry topic,但是进入死信就不会自动订阅,需要用户自己来订阅。
    以下为借助 TDMQ Pulsar 版实现完整消息重试机制的代码示例,供开发者参考。
    订阅主题
    Consumer<byte[]> consumer1 = client.newConsumer()
    .topic("persistent://pulsar-****")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .enableRetry(true)//开启重试消费
    //.deadLetterPolicy(DeadLetterPolicy.builder()
    // .maxRedeliverCount(maxRedeliveryCount)
    // .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//可以指定重试队列
    // .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//可以指定死信队列
    // .build())
    .subscribe();
    执行消费
    while (true) {
    Message msg = consumer.receive();
    try {
    // Do something with the message
    System.out.printf("Message received: %s", new String(msg.getData()));
    // Acknowledge the message so that it can be deleted by the message broker
    consumer.acknowledge(msg);
    } catch (Exception e) {
    // select reconsume policy
    consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
    //consumer.reconsumeLater(msg, 1);
    //consumer.reconsumeLater(msg);
    }
    }

    主动重试

    当客户端消费某条消息失败,如果想重新消费到这条消息,消费者可以调用 negativeAcknowledge 接口。消息会在一段时间后重新被获取到,重新获取消息的间隔时间,可以通过 consumer 配置 negativeAckRedeliveryDelay 指定。

    实现机制简述

    客户端会缓存 negativeAcknowledge 的消息 id,客户端内部定期扫描 negativeAcknowledge 消息的列表(扫描间隔 negativeAckRedeliveryDelay * 1/3)。当有到达 negativeAckRedeliveryDelay 指定时间的消息之后,客户端通知服务端重新投递对应需要重试的消息,服务端接受到客户端的重新投递请求之后,重新推送对应的消息到客户端。
    注意:
    1. 实际重新接收到消息的时间可能会比 negativeAckRedeliveryDelay 指定的时间多 1/3,这里和客户端的实现逻辑相关。
    2. 这种方式下,并没有产生新消息。
    以下为主动重试的 Java 代码示例:
    Consumer<byte[]> consumer = client.newConsumer()
    .topic("persistent://pulsar-****")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    // 默认1min
    .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
    .subscribe();
    while (true) {
    Message msg = consumer.receive();
    try {
    // Do something with the message
    System.out.printf("Message received: %s", new String(msg.getData()));
    // Acknowledge the message so that it can be deleted by the message broker
    consumer.acknowledge(msg);
    } catch (Exception e) {
    // Message failed to process, redeliver later
    consumer.negativeAcknowledge(msg);
    }
    }

    注意事项

    1. negativeAcknowledge 的重试方式下,对应需要重试的消息在服务端看了依然是 unack 的消息。
    2. negativeAcknowledge的重试方式下,默认没有最大重试次数。但是,可以通过并且配置最大重试次数和死信队列的方式实现。这种方式下,当一条消息重试了指定次数之后,会被投递到死信队列中。
    Consumer<byte[]> consumer = client.newConsumer()
    .topic("persistent://pulsar-****")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    // 默认1min
    .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES)
    .deadLetterPolicy(DeadLetterPolicy.builder()
    .maxRedeliverCount(5)//可以指定最大重试次数
    .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
    .build())
    .subscribe();
    
    
    while (true) {
    Message msg = consumer.receive();
    try {
    // Do something with the message
    System.out.printf("Message received: %s", new String(msg.getData()));
    // Acknowledge the message so that it can be deleted by the message broker
    consumer.acknowledge(msg);
    } catch (Exception e) {
    // Message failed to process, redeliver later
    consumer.negativeAcknowledge(msg);
    }
    }
    3. negativeAcknowledge 的重试方式下,重试次数可以从 msg.getRedeliveryCount() 中获取。但是要注意,如果一个订阅下的消费者全部离线,那么消息的重试次数会被重置到 0(典型的场景:一个订阅下只有一个消费者,那么消费者重启之后,消息的重试次数都会重置为 0 )。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持