tencent cloud

Feedback

SDK for Java

Last updated: 2024-06-28 11:33:56

    Scenarios

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    1. Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.7.2</version>
    </dependency>
    
    Note:
    SDK 2.7.2 or later is recommended.
    SDK 2.7.4 or later is recommended if you use the batch message sending and receiving feature (BatchReceive) of the client.
    2. Create a Pulsar client.
    PulsarClient pulsarClient = PulsarClient.builder()
    // Service access address
    .serviceUrl(SERVICE_URL)
    // Role token
    .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
    
    Parameter
    Description
    SERVICE_URL
    Cluster access address, which can be viewed and copied on the Cluster page in the console.
    
    
    
    AUTHENTICATION
    Role token, which can be copied in the **Token** column on the Role Management page.
    
    
    
    3. Create a producer.
    // Create a producer of the `byte[]` type
    Producer<byte[]> producer = pulsarClient.newProducer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    .topic("persistent://pulsar-xxx/sdk_java/topic1").create();
    
    Note:
    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic page in the console.
    4. Send the message.
    // Send the message
    MessageId msgId = producer.newMessage()
    // Message content
    .value("this is a new message.".getBytes(StandardCharsets.UTF_8))
    // Business key
    .key("youKey")
    // Business parameter
    .property("mykey", "myvalue").send();
    
    5. Release the resources.
    // Disable the producer
    producer.close();
    // Disable the client
    pulsarClient.close();
    
    6. Create a consumer.
    // Create a consumer of the `byte[]` type (default type)
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the exclusive mode as the consumption mode
    .subscriptionType(SubscriptionType.Exclusive)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    // Subscription
    .subscribe();
    
    Note:
    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic page in the console.
    
    
    You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.
    7. Consume the message.
    // Receive a message corresponding to the current offset
    Message<byte[]> msg = consumer.receive();
    MessageId msgId = msg.getMessageId();
    String value = new String(msg.getValue());
    System.out.println("receive msg " + msgId + ",value:" + value);
    // Messages must be acknowledged after being received; otherwise, the offset will be held in the position of the current message, causing message heap.
    consumer.acknowledge(msg);
    
    8. Use the listener for consumption.
    // Message listener
    MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
    try {
    System.out.println("Message received: " + new String(msg.getData()));
    // Return `ack` as the acknowledgement
    consumer.acknowledge(msg);
    } catch (Exception e){
    // Return `nack` if the consumption fails
    consumer.negativeAcknowledge(msg);
    }
    };
    pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("sub_topic1")
    // Declare the exclusive mode as the consumption mode
    .subscriptionType(SubscriptionType.Exclusive)
    // Set the listener
    .messageListener(myMessageListener)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscribe();
    
    9. Log in to the TDMQ for Apache Pulsar console, click Topic > Topic Name to enter the Consumption Management page, and click the triangle below a subscription name to view the production and consumption records.
    
    img
    
    
    Note:
    The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar Java client.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support