tencent cloud

All product documents
TDMQ for Apache Pulsar
Spring Boot Starter
Last updated: 2024-12-02 17:10:17
Spring Boot Starter
Last updated: 2024-12-02 17:10:17

Overview

This document describes how to use Spring Boot Starter to send and receive messages and helps you better understand the message sending and receiving processes.

Prerequisites

Directions

Step 1. Add dependencies

Import Pulsar Starter dependencies to the project.
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.0.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.11</version>
</dependency>


Step 2. Prepare configurations

Add the Pulsar configuration information to the application.yml configuration file.
server:
port: 8081
pulsar:
# Namespace name
namespace: namespace_java
# Service access URL
service-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080
# Authorization role secret key
token-auth-value: eyJrZXlJZC....
# Cluster name
tenant: pulsar-w7eognxxx

Parameter
Description
namespace
Namespace name, which can be copied on the Namespace page in the console.
service-url
Cluster access address, which can be viewed and copied on the Cluster page in the console.



token-auth-value
Role token, which can be copied in the Token column on the Role Management page.



tenant
Cluster ID, which can be obtained on the Cluster page in the console.

Step 3. Produce messages

Configure the producer in ProducerConfiguration.java.
package com.tencent.cloud.tdmq.pulsar.config;

import io.github.majusko.pulsar.producer.ProducerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* Producer-related configurations
* 1. The topic should be created in the console in advance.
* 2. The message type should implement the Serializable API.
* 3. A topic cannot bind to different data types.
*/
@Configuration
public class ProducerConfiguration {

@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
// Producer for topic1
.addProducer("topic1")
// Producer for topic2
.addProducer("topic2");
}
}

Compile and run the message production program MyProducer.java.
package com.tencent.cloud.tdmq.pulsar.service;

import io.github.majusko.pulsar.producer.PulsarTemplate;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

@Service
public class MyProducer {

/**
* 1. The topic for sending messages should be a topic that has already been declared in the producer configuration.
* 2. The PulsarTemplate type should match the type of the messages being sent.
* 3. When sending messages to a specific topic, the message type should correspond to the type bound to that topic in the producer factory configuration.
*/

@Autowired
private PulsarTemplate<byte[]> defaultProducer;

public void syncSendMessage() throws PulsarClientException {
defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));
}

public void asyncSendMessage() {
String msg = "Hello pulsar client.";
CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));
// Use asynchronous callbacks to determine whether the message was sent successfully.
completableFuture.whenComplete(((messageId, throwable) -> {
if( null != throwable ) {
System.out.println("delivery failed, value: " + msg );
// Add logic for delayed retries here.
} else {
System.out.println("delivered msg " + messageId + ", value:" + msg);
}
}));
}

/**
* Sequential messages should be implemented using sequential-type topics, which support both global and partial ordering. Select the appropriate type based on specific requirements.
*/
public void sendOrderMessage() throws PulsarClientException {
for (int i = 0; i < 5; i++) {
defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));
}
}
}



Note:
The topic that sends messages is the one declared in the producer configuration.
The type of PulsarTemplate must be the same as that of the sent message.
When you send a message to the specified topic, the message type must be the same as that bound to the topic in the producer factory configuration.

Step 4. Consume messages

Compile and run the message consumption program MyConsumer.java.
package com.tencent.cloud.tdmq.pulsar.service;

import io.github.majusko.pulsar.annotation.PulsarConsumer;
import io.github.majusko.pulsar.constant.Serialization;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.stereotype.Service;

/**
* Consumer configurations
*/
@Service
public class MyConsumer {

@PulsarConsumer(topic = "topic1", // Subscription topic name
subscriptionName = "sub_topic1", // Subscription name
serialization = Serialization.JSON, // Serialization method
subscriptionType = SubscriptionType.Shared, // Subscription mode, which is exclusive mode by default
consumerName = "firstTopicConsumer", // Consumer name
maxRedeliverCount = 3, // Maximum retry count
deadLetterTopic = "sub_topic1-DLQ" // Dead letter topic name
)
public void firstTopicConsume(byte[] msg) {
// TODO process your message
System.out.println("Received a new message. content: [" + new String(msg) + "]");
// If the consumption fails, throw an exception so that the message will enter the retry queue. It can then be consumed again until the maximum retry count is reached, after which it will enter the dead letter queue. The prerequisite is to create the retry and dead letter topics.
}


/**
* Sequential messages can be handled using sequential-type topics, which support both global and partial ordering.
*/
@PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")
public void orderTopicConsumer(byte[] msg) {
// TODO process your message
System.out.println("Received a order message. content: [" + new String(msg) + "]");
}


/**
* Listen to the dead letter topic and process dead letter messages.
*/
@PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")
public void deadTopicConsumer(byte[] msg) {
// TODO process your message
System.out.println("Received a dead message. content: [" + new String(msg) + "]");
}
}
Note:
The above configurations demonstrate the simple use of Pulsar based on the Spring Boot Starter approach. For more details, see Demo, Starter Github, or Starter Gitee.

Step 5. Query messages

Log in to the console and enter the Message Query page to view the message trace after running the demo.

The message trace is as follows:


Note:
The above is a simple configuration for using TDMQ for Apache Pulsar through Spring Boot Starter. For more information, see Demo or Spring Boot Starter for Apache Pulsar.

Step 6: Viewing Consumption Details

Go to the Message Query page to view the message details.

The message details are as follows:

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon