tencent cloud

文档反馈

Spring Boot Starter 接入

最后更新时间:2024-01-03 14:27:38

    操作场景

    本文以 Spring Boot Starter 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

    前提条件

    操作步骤

    步骤1:添加依赖

    在项目中引入 Pulsar Starter 相关依赖。
    <dependency>
    <groupId>io.github.majusko</groupId>
    <artifactId>pulsar-java-spring-boot-starter</artifactId>
    <version>1.0.7</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
    <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.11</version>
    </dependency>
    

    步骤2:准备配置

    在配置文件中 添加 Pulsar 相关配置信息。
    pulsar:
    # 命名空间名称
    namespace: namespace_java
    # 服务接入地址
    service-url: http://pulsar-xxx.tdmq.ap-gz.public.tencenttdmq.com:8080
    # 授权角色密钥
    token-auth-value: eyJrZXlJZC....
    # 集群名称
    tenant: pulsar-xxx
    
    参数
    说明
    namespace
    命名空间名称,在控制台 命名空间 管理页面中复制。
    service-url
    集群接入地址,可以在控制台 集群管理 页面查看并复制。
    
    
    
    token-auth-value
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    
    tenant
    集群 ID,在控制台 集群管理 页面中获取。

    步骤3:生产消息

    1. 生产者配置。
    @Configuration
    public class ProducerConfiguration {
    
    @Bean
    public ProducerFactory producerFactory() {
    return new ProducerFactory()
    // topic1
    .addProducer("topic1")
    // topic2
    .addProducer("topic2");
    }
    }
    
    2. 注入生产者。
    @Autowired
    private PulsarTemplate<byte[]> defaultProducer;
    
    3. 发送消息。
    // 发送消息
    defaultProducer.send("topic2", ("Hello pulsar client, this is a order message.").getBytes(StandardCharsets.UTF_8));
    
    注意:
    发送消息的 Topic 是在生产者配置中已经声明的 Topic。
    PulsarTemplate 类型应与发送消息的类型一致。
    发送消息到指定 Topic 时,消息类型需要与生产者工厂配置中的 Topic 绑定的消息类型对应。

    步骤4:消费消息

    消费者配置。
    @PulsarConsumer(topic = "topic1", // 订阅topic名称
    subscriptionName = "sub_topic1", // 订阅名称
    serialization = Serialization.JSON, // 序列化方式
    subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
    consumerName = "firstTopicConsumer", // 消费者名称
    maxRedeliverCount = 3, // 最大重试次数
    deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称
    )
    public void topicConsume(byte[] msg) {
    // TODO process your message
    System.out.println("Received a new message. content: [" + new String(msg) + "]");
    // 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic
    }
    

    步骤5:查询消息

    登录控制台,进入 消息查询 页面,可查看 Demo 运行后的消息轨迹。
    
    
    
    消息轨迹如下:
    
    
    
    说明:
    以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 DemoStarter 文档
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持