tencent cloud

文档反馈

Spring Boot Starter 接入

最后更新时间:2024-12-02 17:10:17

    操作场景

    本文以 Spring Boot Starter 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

    前提条件

    操作步骤

    步骤1:添加依赖

    在项目中引入 Pulsar Starter 相关依赖。
    <dependency>
    <groupId>io.github.majusko</groupId>
    <artifactId>pulsar-java-spring-boot-starter</artifactId>
    <version>1.0.7</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
    <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.11</version>
    </dependency>
    

    步骤2:准备配置

    在配置文件 application.yml 中添加 Pulsar 相关配置信息。
    server:
    port: 8081
    pulsar:
    # 命名空间名称
    namespace: namespace_java
    # 服务接入地址
    service-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080
    # 授权角色密钥
    token-auth-value: eyJrZXlJZC....
    # 集群名称
    tenant: pulsar-w7eognxxx
    
    参数
    说明
    namespace
    命名空间名称,在控制台 命名空间 管理页面中复制。
    service-url
    集群接入地址,可以在控制台 集群管理 页面查看并复制。
    
    
    
    token-auth-value
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    
    tenant
    集群 ID,在控制台 集群管理 页面中获取。

    步骤3:生产消息

    在 ProducerConfiguration.java 中配置生产者
    package com.tencent.cloud.tdmq.pulsar.config;
    
    import io.github.majusko.pulsar.producer.ProducerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
    * 生产者相关配置
    * 1.topic要提前在控制台中完成创建
    * 2.消息类型需要实现Serializable接口
    * 3.如果一个topic不能绑定不同的数据类型
    */
    @Configuration
    public class ProducerConfiguration {
    
    @Bean
    public ProducerFactory producerFactory() {
    return new ProducerFactory()
    // topic1 生产者
    .addProducer("topic1")
    // topic2 生产者
    .addProducer("topic2");
    }
    }
    
    编译并运行生产消息程序 MyProducer.java。
    package com.tencent.cloud.tdmq.pulsar.service;
    
    import io.github.majusko.pulsar.producer.PulsarTemplate;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.CompletableFuture;
    
    @Service
    public class MyProducer {
    
    /**
    * 1.发送消息的topic是在生产者配置中已经声明的topic
    * 2.PulsarTemplate类型应于发送消息的类型一致
    * 3.发送消息到指定topic时,消息类型需要与生产者工厂配置中的topic绑定的消息类型对应
    */
    
    @Autowired
    private PulsarTemplate<byte[]> defaultProducer;
    
    public void syncSendMessage() throws PulsarClientException {
    defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));
    }
    
    public void asyncSendMessage() {
    String msg = "Hello pulsar client.";
    CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));
    // 通过异步回调得知消息发送成功与否
    completableFuture.whenComplete(((messageId, throwable) -> {
    if( null != throwable ) {
    System.out.println("delivery failed, value: " + msg );
    // 此处可以添加延时重试的逻辑
    } else {
    System.out.println("delivered msg " + messageId + ", value:" + msg);
    }
    }));
    }
    
    /**
    * 顺序消息需要使用顺序类型topic来完成,顺序类型的topic支撑全局顺序和局部顺序两种类型,根据实际情况选择合适的类型即可
    */
    public void sendOrderMessage() throws PulsarClientException {
    for (int i = 0; i < 5; i++) {
    defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));
    }
    }
    }
    
    
    注意:
    发送消息的 Topic 是在生产者配置中已经声明的 Topic。
    PulsarTemplate 类型应与发送消息的类型一致。
    发送消息到指定 Topic 时,消息类型需要与生产者工厂配置中的 Topic 绑定的消息类型对应。

    步骤4:消费消息

    编译并运行消费消息程序 MyConsumer.java。
    package com.tencent.cloud.tdmq.pulsar.service;
    
    import io.github.majusko.pulsar.annotation.PulsarConsumer;
    import io.github.majusko.pulsar.constant.Serialization;
    import org.apache.pulsar.client.api.SubscriptionType;
    import org.springframework.stereotype.Service;
    
    /**
    * 消费者配置
    */
    @Service
    public class MyConsumer {
    
    @PulsarConsumer(topic = "topic1", // 订阅topic名称
    subscriptionName = "sub_topic1", // 订阅名称
    serialization = Serialization.JSON, // 序列化方式
    subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
    consumerName = "firstTopicConsumer", // 消费者名称
    maxRedeliverCount = 3, // 最大重试次数
    deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称
    )
    public void firstTopicConsume(byte[] msg) {
    // TODO process your message
    System.out.println("Received a new message. content: [" + new String(msg) + "]");
    // 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic
    }
    
    
    /**
    * 顺序类型的消息可借助顺序类型的topic来完成,支持全局顺序和局部顺序两种类型
    */
    @PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")
    public void orderTopicConsumer(byte[] msg) {
    // TODO process your message
    System.out.println("Received a order message. content: [" + new String(msg) + "]");
    }
    
    
    /**
    * 监听死信topic,处理死信消息
    */
    @PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")
    public void deadTopicConsumer(byte[] msg) {
    // TODO process your message
    System.out.println("Received a dead message. content: [" + new String(msg) + "]");
    }
    }
    说明:
    以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 DemoStarter GithubStarter Gitee

    步骤5:查询消息

    登录控制台,进入 消息查询 页面,可查看 Demo 运行后的消息轨迹。
    
    
    
    消息轨迹如下:
    
    
    
    说明:
    以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 DemoStarter 文档

    步骤6:查看消费情况

    进入 消息查询 页面,可查看消息详情。
    
    
    
    消息详情如下:
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持