tencent cloud

Feedback

SDK for Java

Last updated: 2024-12-02 17:10:17

    Scenarios

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    Step 1: Installing Java Dependencies

    1. Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.7.2</version>
    </dependency>
    
    Note:
    SDK 2.7.2 or later is recommended.
    SDK 2.7.4 or later is recommended if you use the batch message sending and receiving feature ( BatchReceive) of the client.

    Step 2: Modifying Configuration Parameters

    Modify the parameters in Constant.java.
    package com.tencent.cloud.tdmq.pulsar;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    
    public class Constant {
    
    
    /**
    * Service access address, located on the [Cluster Management] page access address
    */
    private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";
    
    /**
    * Authorized role token of the namespace to be used, located on the [Role Management] page
    */
    private static final String AUTHENTICATION = "eyJrZXlJZC......";
    
    
    /**
    * Initialize Pulsar client
    *
    * @return Pulsar client
    */
    public static PulsarClient initPulsarClient() throws PulsarClientException {
    // One Pulsar client corresponds to one client connection
    // In principle, one process corresponds to one client. Try to avoid repeated creation, as this may consume resources
    // For practice tutorials on clients and producers/consumers, refer to the official documentation https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1
    PulsarClient pulsarClient = PulsarClient.builder()
    // Service access address
    .serviceUrl(SERVICE_URL)
    // Authorize the role token
    .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
    System.out.println(">> pulsar client created.");
    return pulsarClient;
    }
    }
    
    PulsarClient pulsarClient = PulsarClient.builder()
    // Service access address
    .serviceUrl(SERVICE_URL)
    // Role token
    .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
    
    Parameter
    Description
    SERVICE_URL
    Cluster access address, which can be viewed and copied on the Cluster page in the console.
    
    AUTHENTICATION
    The secret key for the role, which can be copied from the Role Management.
    

    Step 3: Producing Messages

    Create, compile, and run SimpleProducer.java.
    package com.tencent.cloud.tdmq.pulsar.simple;
    
    import com.tencent.cloud.tdmq.pulsar.Constant;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    import java.nio.charset.StandardCharsets;
    
    /**
    * Synchronously send messages
    */
    public class SimpleProducer {
    
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
    
    // Initialize Pulsar client
    PulsarClient pulsarClient = Constant.initPulsarClient();
    // Construct a producer
    Producer<byte[]> producer = pulsarClient.newProducer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    .topic("persistent://pulsar-xxx/sdk_java/topic1").create();
    System.out.println(">> pulsar producer created.");
    for (int i = 0; i < 10; i++) {
    String value = "my-sync-message-" + i;
    // Send the message
    MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();
    System.out.println("deliver msg " + msgId + ",value:" + value);
    
    Thread.sleep(500);
    }
    // Disable the producer
    producer.close();
    // Close the client
    pulsarClient.close();
    }
    }
    
    
    Topic: Enter the name of the created topic. Enter the full path, namely persistent://clusterid/namespace/Topic. The clusterid/namespace/topic portion can be directly copied from the Topic page in the console.
    

    Step 4: Consuming Messages

    Create, compile, and run SimpleConsumer.java.
    package com.tencent.cloud.tdmq.pulsar.simple;
    
    import com.tencent.cloud.tdmq.pulsar.Constant;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    import org.apache.pulsar.client.api.SubscriptionType;
    
    /**
    * Consumer
    */
    public class SimpleConsumer {
    
    
    public static void main(String[] args) throws PulsarClientException {
    // Initialize Pulsar client
    PulsarClient pulsarClient = Constant.initPulsarClient();
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, copied from [Topic Management]
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub1_topic1")
    // Declare the exclusive mode as the consumption mode
    .subscriptionType(SubscriptionType.Exclusive)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    System.out.println(">> pulsar consumer created.");
    for (int i = 0; i < 10; i++) {
    // Receive a message corresponding to the current offset
    Message<byte[]> msg = consumer.receive();
    MessageId msgId = msg.getMessageId();
    String value = new String(msg.getValue());
    System.out.println("receive msg " + msgId + ",value:" + value);
    // Messages must be acknowledged after being received, otherwise the offset will stay at the current message, causing message backlog
    consumer.acknowledge(msg);
    }
    // Close the consumer
    consumer.close();
    // Close the client
    pulsarClient.close();
    }
    }
    
    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topicpart can be copied directly from the Topic page in the console.
    
    You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.

    Step 5: Viewing Consumption Details

    Go to the Message Query page to view message details.
    Note:
    Message trace query supports only a single message. If the Batch feature is enabled on the producer side, only the first message in the Batch can be queried in the message query.
    
    The message trace is as follows:
    
    Note:
    The above provides a brief introduction to message publishing and subscribing methods. For more operations, see Demo or Pulsar Official Documentation.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support