<dependency><groupId>io.github.majusko</groupId><artifactId>pulsar-java-spring-boot-starter</artifactId><version>1.0.7</version></dependency><!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.11</version></dependency>
server:port: 8081pulsar:# Namespace namenamespace: namespace_java# Service access URLservice-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080# Authorization role secret keytoken-auth-value: eyJrZXlJZC....# Cluster nametenant: pulsar-w7eognxxx
Parameter | Description |
namespace | |
service-url | |
token-auth-value | |
tenant |
package com.tencent.cloud.tdmq.pulsar.config;import io.github.majusko.pulsar.producer.ProducerFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** Producer-related configurations* 1. The topic should be created in the console in advance.* 2. The message type should implement the Serializable API.* 3. A topic cannot bind to different data types.*/@Configurationpublic class ProducerConfiguration {@Beanpublic ProducerFactory producerFactory() {return new ProducerFactory()// Producer for topic1.addProducer("topic1")// Producer for topic2.addProducer("topic2");}}
package com.tencent.cloud.tdmq.pulsar.service;import io.github.majusko.pulsar.producer.PulsarTemplate;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.PulsarClientException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.nio.charset.StandardCharsets;import java.util.concurrent.CompletableFuture;@Servicepublic class MyProducer {/*** 1. The topic for sending messages should be a topic that has already been declared in the producer configuration.* 2. The PulsarTemplate type should match the type of the messages being sent.* 3. When sending messages to a specific topic, the message type should correspond to the type bound to that topic in the producer factory configuration.*/@Autowiredprivate PulsarTemplate<byte[]> defaultProducer;public void syncSendMessage() throws PulsarClientException {defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));}public void asyncSendMessage() {String msg = "Hello pulsar client.";CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));// Use asynchronous callbacks to determine whether the message was sent successfully.completableFuture.whenComplete(((messageId, throwable) -> {if( null != throwable ) {System.out.println("delivery failed, value: " + msg );// Add logic for delayed retries here.} else {System.out.println("delivered msg " + messageId + ", value:" + msg);}}));}/*** Sequential messages should be implemented using sequential-type topics, which support both global and partial ordering. Select the appropriate type based on specific requirements.*/public void sendOrderMessage() throws PulsarClientException {for (int i = 0; i < 5; i++) {defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));}}}
PulsarTemplate
must be the same as that of the sent message.package com.tencent.cloud.tdmq.pulsar.service;import io.github.majusko.pulsar.annotation.PulsarConsumer;import io.github.majusko.pulsar.constant.Serialization;import org.apache.pulsar.client.api.SubscriptionType;import org.springframework.stereotype.Service;/*** Consumer configurations*/@Servicepublic class MyConsumer {@PulsarConsumer(topic = "topic1", // Subscription topic namesubscriptionName = "sub_topic1", // Subscription nameserialization = Serialization.JSON, // Serialization methodsubscriptionType = SubscriptionType.Shared, // Subscription mode, which is exclusive mode by defaultconsumerName = "firstTopicConsumer", // Consumer namemaxRedeliverCount = 3, // Maximum retry countdeadLetterTopic = "sub_topic1-DLQ" // Dead letter topic name)public void firstTopicConsume(byte[] msg) {// TODO process your messageSystem.out.println("Received a new message. content: [" + new String(msg) + "]");// If the consumption fails, throw an exception so that the message will enter the retry queue. It can then be consumed again until the maximum retry count is reached, after which it will enter the dead letter queue. The prerequisite is to create the retry and dead letter topics.}/*** Sequential messages can be handled using sequential-type topics, which support both global and partial ordering.*/@PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")public void orderTopicConsumer(byte[] msg) {// TODO process your messageSystem.out.println("Received a order message. content: [" + new String(msg) + "]");}/*** Listen to the dead letter topic and process dead letter messages.*/@PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")public void deadTopicConsumer(byte[] msg) {// TODO process your messageSystem.out.println("Received a dead message. content: [" + new String(msg) + "]");}}
Was this page helpful?