tencent cloud

Feedback

Spring Boot Starter

Last updated: 2024-12-02 17:10:17

    Overview

    This document describes how to use Spring Boot Starter to send and receive messages and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    Step 1. Add dependencies

    Import Pulsar Starter dependencies to the project.
    <dependency>
    <groupId>io.github.majusko</groupId>
    <artifactId>pulsar-java-spring-boot-starter</artifactId>
    <version>1.0.7</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
    <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.11</version>
    </dependency>
    

    Step 2. Prepare configurations

    Add the Pulsar configuration information to the application.yml configuration file.
    server:
    port: 8081
    pulsar:
    # Namespace name
    namespace: namespace_java
    # Service access URL
    service-url: http://pulsar-w7eognxxx.tdmq.ap-gz.public.tencenttdmq.com:8080
    # Authorization role secret key
    token-auth-value: eyJrZXlJZC....
    # Cluster name
    tenant: pulsar-w7eognxxx
    
    Parameter
    Description
    namespace
    Namespace name, which can be copied on the Namespace page in the console.
    service-url
    Cluster access address, which can be viewed and copied on the Cluster page in the console.
    
    
    
    token-auth-value
    Role token, which can be copied in the Token column on the Role Management page.
    
    
    
    tenant
    Cluster ID, which can be obtained on the Cluster page in the console.

    Step 3. Produce messages

    Configure the producer in ProducerConfiguration.java.
    package com.tencent.cloud.tdmq.pulsar.config;
    
    import io.github.majusko.pulsar.producer.ProducerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
    * Producer-related configurations
    * 1. The topic should be created in the console in advance.
    * 2. The message type should implement the Serializable API.
    * 3. A topic cannot bind to different data types.
    */
    @Configuration
    public class ProducerConfiguration {
    
    @Bean
    public ProducerFactory producerFactory() {
    return new ProducerFactory()
    // Producer for topic1
    .addProducer("topic1")
    // Producer for topic2
    .addProducer("topic2");
    }
    }
    
    Compile and run the message production program MyProducer.java.
    package com.tencent.cloud.tdmq.pulsar.service;
    
    import io.github.majusko.pulsar.producer.PulsarTemplate;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.CompletableFuture;
    
    @Service
    public class MyProducer {
    
    /**
    * 1. The topic for sending messages should be a topic that has already been declared in the producer configuration.
    * 2. The PulsarTemplate type should match the type of the messages being sent.
    * 3. When sending messages to a specific topic, the message type should correspond to the type bound to that topic in the producer factory configuration.
    */
    
    @Autowired
    private PulsarTemplate<byte[]> defaultProducer;
    
    public void syncSendMessage() throws PulsarClientException {
    defaultProducer.send("topic1", "Hello pulsar client.".getBytes(StandardCharsets.UTF_8));
    }
    
    public void asyncSendMessage() {
    String msg = "Hello pulsar client.";
    CompletableFuture<MessageId> completableFuture = defaultProducer.sendAsync("topic1", msg.getBytes(StandardCharsets.UTF_8));
    // Use asynchronous callbacks to determine whether the message was sent successfully.
    completableFuture.whenComplete(((messageId, throwable) -> {
    if( null != throwable ) {
    System.out.println("delivery failed, value: " + msg );
    // Add logic for delayed retries here.
    } else {
    System.out.println("delivered msg " + messageId + ", value:" + msg);
    }
    }));
    }
    
    /**
    * Sequential messages should be implemented using sequential-type topics, which support both global and partial ordering. Select the appropriate type based on specific requirements.
    */
    public void sendOrderMessage() throws PulsarClientException {
    for (int i = 0; i < 5; i++) {
    defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));
    }
    }
    }
    
    
    
    Note:
    The topic that sends messages is the one declared in the producer configuration.
    The type of PulsarTemplate must be the same as that of the sent message.
    When you send a message to the specified topic, the message type must be the same as that bound to the topic in the producer factory configuration.

    Step 4. Consume messages

    Compile and run the message consumption program MyConsumer.java.
    package com.tencent.cloud.tdmq.pulsar.service;
    
    import io.github.majusko.pulsar.annotation.PulsarConsumer;
    import io.github.majusko.pulsar.constant.Serialization;
    import org.apache.pulsar.client.api.SubscriptionType;
    import org.springframework.stereotype.Service;
    
    /**
    * Consumer configurations
    */
    @Service
    public class MyConsumer {
    
    @PulsarConsumer(topic = "topic1", // Subscription topic name
    subscriptionName = "sub_topic1", // Subscription name
    serialization = Serialization.JSON, // Serialization method
    subscriptionType = SubscriptionType.Shared, // Subscription mode, which is exclusive mode by default
    consumerName = "firstTopicConsumer", // Consumer name
    maxRedeliverCount = 3, // Maximum retry count
    deadLetterTopic = "sub_topic1-DLQ" // Dead letter topic name
    )
    public void firstTopicConsume(byte[] msg) {
    // TODO process your message
    System.out.println("Received a new message. content: [" + new String(msg) + "]");
    // If the consumption fails, throw an exception so that the message will enter the retry queue. It can then be consumed again until the maximum retry count is reached, after which it will enter the dead letter queue. The prerequisite is to create the retry and dead letter topics.
    }
    
    
    /**
    * Sequential messages can be handled using sequential-type topics, which support both global and partial ordering.
    */
    @PulsarConsumer(topic = "topic2", subscriptionName = "sub_topic2")
    public void orderTopicConsumer(byte[] msg) {
    // TODO process your message
    System.out.println("Received a order message. content: [" + new String(msg) + "]");
    }
    
    
    /**
    * Listen to the dead letter topic and process dead letter messages.
    */
    @PulsarConsumer(topic = "sub_topic1-DLQ", subscriptionName = "dead_sub")
    public void deadTopicConsumer(byte[] msg) {
    // TODO process your message
    System.out.println("Received a dead message. content: [" + new String(msg) + "]");
    }
    }
    Note:
    The above configurations demonstrate the simple use of Pulsar based on the Spring Boot Starter approach. For more details, see Demo, Starter Github, or Starter Gitee.

    Step 5. Query messages

    Log in to the console and enter the Message Query page to view the message trace after running the demo.
    
    The message trace is as follows:
    
    
    Note:
    The above is a simple configuration for using TDMQ for Apache Pulsar through Spring Boot Starter. For more information, see Demo or Spring Boot Starter for Apache Pulsar.

    Step 6: Viewing Consumption Details

    Go to the Message Query page to view the message details.
    
    The message details are as follows:
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support