<dependency><groupId>io.github.majusko</groupId><artifactId>pulsar-java-spring-boot-starter</artifactId><version>1.0.7</version></dependency><!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.11</version></dependency>
server:port: 8081pulsar:# 命名空间名称namespace: namespace_java# 服务接入地址service-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080# 授权角色密钥token-auth-value: eyJrZXlJZC....# 集群名称tenant: pulsar-w7eognxxx
package com.tencent.cloud.tdmq.pulsar.config;import io.github.majusko.pulsar.producer.ProducerFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 生产者相关配置* 1.topic要提前在控制台中完成创建* 2.消息类型需要实现Serializable接口* 3.如果一个topic不能绑定不同的数据类型*/@Configurationpublic class ProducerConfiguration {@Beanpublic ProducerFactory producerFactory() {return new ProducerFactory()// topic1 生产者.addProducer("topic1")// topic2 生产者.addProducer("topic2");}}
package com.tencent.cloud.tdmq.pulsar.service;import io.github.majusko.pulsar.producer.PulsarTemplate;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.PulsarClientException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.nio.charset.StandardCharsets;import java.util.concurrent.CompletableFuture;@Servicepublic class MyProducer {/*** 1.发送消息的topic是在生产者配置中已经声明的topic* 2.PulsarTemplate类型应于发送消息的类型一致* 3.发送消息到指定topic时,消息类型需要与生产者工厂配置中的topic绑定的消息类型对应*/@Autowiredprivate PulsarTemplate<byte[]> defaultProducer;public void syncSendMessage() throws PulsarClientException {defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));}public void asyncSendMessage() {String msg = "Hello pulsar client.";CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));// 通过异步回调得知消息发送成功与否completableFuture.whenComplete(((messageId, throwable) -> {if( null != throwable ) {System.out.println("delivery failed, value: " + msg );// 此处可以添加延时重试的逻辑} else {System.out.println("delivered msg " + messageId + ", value:" + msg);}}));}/*** 顺序消息需要使用顺序类型topic来完成,顺序类型的topic支撑全局顺序和局部顺序两种类型,根据实际情况选择合适的类型即可*/public void sendOrderMessage() throws PulsarClientException {for (int i = 0; i < 5; i++) {defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));}}}
package com.tencent.cloud.tdmq.pulsar.service;import io.github.majusko.pulsar.annotation.PulsarConsumer;import io.github.majusko.pulsar.constant.Serialization;import org.apache.pulsar.client.api.SubscriptionType;import org.springframework.stereotype.Service;/*** 消费者配置*/@Servicepublic class MyConsumer {@PulsarConsumer(topic = "topic1", // 订阅topic名称subscriptionName = "sub_topic1", // 订阅名称serialization = Serialization.JSON, // 序列化方式subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式consumerName = "firstTopicConsumer", // 消费者名称maxRedeliverCount = 3, // 最大重试次数deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称)public void firstTopicConsume(byte[] msg) {// TODO process your messageSystem.out.println("Received a new message. content: [" + new String(msg) + "]");// 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic}/*** 顺序类型的消息可借助顺序类型的topic来完成,支持全局顺序和局部顺序两种类型*/@PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")public void orderTopicConsumer(byte[] msg) {// TODO process your messageSystem.out.println("Received a order message. content: [" + new String(msg) + "]");}/*** 监听死信topic,处理死信消息*/@PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")public void deadTopicConsumer(byte[] msg) {// TODO process your messageSystem.out.println("Received a dead message. content: [" + new String(msg) + "]");}}
本页内容是否解决了您的问题?