<dependency><groupId>io.github.majusko</groupId><artifactId>pulsar-java-spring-boot-starter</artifactId><version>1.0.7</version></dependency><!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core --><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.4.11</version></dependency>
pulsar:# Namespace namenamespace: namespace_java# Service access addressservice-url: http://pulsar-xxx.tdmq.ap-gz.public.tencenttdmq.com:8080# Role tokentoken-auth-value: eyJrZXlJZC....# Cluster nametenant: pulsar-xxx
Parameter | Description |
namespace | |
service-url | |
token-auth-value | |
tenant |
@Configurationpublic class ProducerConfiguration {@Beanpublic ProducerFactory producerFactory() {return new ProducerFactory()// topic1.addProducer("topic1")// topic2.addProducer("topic2");}}
@Autowiredprivate PulsarTemplate<byte[]> defaultProducer;
// Send the messagesdefaultProducer.send("topic2", ("Hello pulsar client, this is a order message.").getBytes(StandardCharsets.UTF_8));
PulsarTemplate
must be the same as that of the sent message.@PulsarConsumer(topic = "topic1", // Name of the subscribed topicsubscriptionName = "sub_topic1", // Subscription nameserialization = Serialization.JSON, // Serialization methodsubscriptionType = SubscriptionType.Shared, // Subscription mode, which is exclusive mode by defaultconsumerName = "firstTopicConsumer", // Consumer namemaxRedeliverCount = 3, // Maximum number of retriesdeadLetterTopic = "sub_topic1-DLQ" // Dead letter topic name)public void topicConsume(byte[] msg) {// TODO process your messageSystem.out.println("Received a new message. content: [" + new String(msg) + "]");// If the consumption fails, throw an exception, so that the message will enter the retry letter topic and can be consumed again. Once the maximum number of retries is reached, the message will enter the dead letter topic. You need to create retry and dead letter queues for the above process}
Was this page helpful?