tencent cloud

文档反馈

发送与接收过滤消息

最后更新时间:2023-03-28 10:15:40

    操作场景

    本文以调用 Java SDK 为例介绍通过开源 SDK 实现过滤消息收发的操作过程。目前支持两种,分别是Tag 和 SQL 方式。

    前提条件

    完成资源创建与准备(如果是全局顺序消息,需要创建单队列 topic)
    下载 Demo 或者前往 GitHub 项目
    参考普通消息的发送和接收

    TAG使用步骤

    创建生产者消费者的基础代码不在赘述,可以参考普通消息的发送和接收即可。
    对于生产消息,主要是在构造消息体的时候,带上 TAG。
    对于消费消息,主要是订阅的时候,带上单个 TAG 或者 * 或者多个 TAG 表达式。

    步骤1:生产消息

    发送消息

    发送代码和简单的消息没有区别主要是在构造消息体的时候,带上 TAG,仅允许一个。
    int totalMessagesToSend = 5;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
    // 发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + sendResult);
    }

    步骤2:消费消息

    订阅消息

    // 订阅topic 订阅所有的TAG
    pushConsumer.subscribe(topic_name, "*");
    
    //订阅指定的TAG
    //pushConsumer.subscribe(TOPIC_NAME, "Tag1");
    
    //订阅多个TAG
    //pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
    // 注册回调实现类来处理从broker拉取回来的消息
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 消息处理逻辑
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // 标记该消息已经被成功消费, 根据消费情况,返回处理状态
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 启动消费者实例
    pushConsumer.start();
    参数
    说明
    topic_name
    在控制台集群管理中 Topic 页签中复制具体 Topic 名称。
    "*"
    订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。
    说明:
    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档

    SQL 使用步骤

    创建生产者消费者的基础代码不在赘述,可以参考普通消息的发送和接收即可。
    对于生产消息,主要是在构造消息体的时候,带上用户自定义的 properties。
    对于消费消息,主要是订阅的时候,带上对应的 SQL 表达式。

    步骤1:生产消息

    发送代码和简单的消息没有区别 主要是在构造消息体的时候,带上自定义属性,允许多个。
    int totalMessagesToSend = 5;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
    msg.putUserProperty("key1","value1");
    // 发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + sendResult);
    }

    步骤2:消费消息

    对于消费消息,主要是订阅的时候,带上对应的 SQL 表达式,其他的和普通的没有区别。
    pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));
    
    // 订阅topic 订阅单个key的sql,最常用
    //pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));
    
    //订阅多个属性
    //pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
    // 注册回调实现类来处理从broker拉取回来的消息
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 消息处理逻辑
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // 标记该消息已经被成功消费, 根据消费情况,返回处理状态
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 启动消费者实例
    pushConsumer.start();
    说明:
    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持