<dependency><groupId>io.github.majusko</groupId><artifactId>pulsar-java-spring-boot-starter</artifactId><version>1.0.7</version></dependency><!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.11</version></dependency>
pulsar:# 命名空间名称namespace: namespace_java# 服务接入地址service-url: http://pulsar-xxx.tdmq.ap-gz.public.tencenttdmq.com:8080# 授权角色密钥token-auth-value: eyJrZXlJZC....# 集群名称tenant: pulsar-xxx
@Configurationpublic class ProducerConfiguration {@Beanpublic ProducerFactory producerFactory() {return new ProducerFactory()// topic1.addProducer("topic1")// topic2.addProducer("topic2");}}
@Autowiredprivate PulsarTemplate<byte[]> defaultProducer;
// 发送消息defaultProducer.send("topic2", ("Hello pulsar client, this is a order message.").getBytes(StandardCharsets.UTF_8));
@PulsarConsumer(topic = "topic1", // 订阅topic名称subscriptionName = "sub_topic1", // 订阅名称serialization = Serialization.JSON, // 序列化方式subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式consumerName = "firstTopicConsumer", // 消费者名称maxRedeliverCount = 3, // 最大重试次数deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称)public void topicConsume(byte[] msg) {// TODO process your messageSystem.out.println("Received a new message. content: [" + new String(msg) + "]");// 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic}
本页内容是否解决了您的问题?