tencent cloud

文档反馈

顺序消息

最后更新时间:2024-01-17 16:52:42
    顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。
    顺序消息适用于对消息发送和消费顺序有严格要求的情况。

    使用场景

    顺序消息和普通消息的对比如下:
    消息类型
    消费顺序
    性能
    适用场景
    普通消息
    无顺序
    适用于对吞吐量要求高,且对生产和消费顺序无要求
    顺序消息
    指定的 Topic 内的消息遵循先入先出(FIFO)规则
    一般
    吞吐量要求一般,但是要求特定的 Topic 严格地按照 FIFO 原则进行消息发布和消费的场景
    对应到具体的业务场景,顺序消息可以被用在以下场景中:
    订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。
    日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。
    金融场景:在一些撮合交易的场景下,例如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。

    实现原理

    在 RocketMQ 中支持顺序消息的原理如下图所示。我们可以按照某一个标准对消息进行分区(例如图中的ShardingKey),同一个ShardingKey 的消息会被分配到同一个队列中,并按照顺序被消费。
    
    
    
    顺序消息的代码如下所示:
    public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
    try {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.start();
    
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
    int orderId = i % 10;
    Message msg =
    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    Integer id = (Integer) arg;
    int index = id % mqs.size();
    return mqs.get(index);
    }
    }, orderId);
    
    System.out.printf("%s%n", sendResult);
    }
    
    producer.shutdown();
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    这里的区别主要是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg) 方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。
    MessageQueueSelector 的接口如下:
    public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    其中 mqs 是可以发送的队列,msg 是消息,arg 是上述 send 接口中传入的 Object 对象,返回的是该消息需要发送到的队列。上述例子里,是以 orderId 作为分区分类标准,对所有队列个数取余,来对将相同 orderId 的消息发送到同一个队列中。
    生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
    注意:
    为了保证消息的高可用,目前TDMQ RocketMQ版不支持单队列的 “全局顺序消息”(已经创建了全局顺序消息的用户可以正常使用);如果您想保证全局的顺序性,您可以通过使用一致的 ShardingKey 来实现。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持