tencent cloud

All product documents
TDMQ for Apache Pulsar
SDK for Java
Last updated: 2024-12-02 17:10:17
SDK for Java
Last updated: 2024-12-02 17:10:17

Scenarios

This document describes how to use open-source SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.

Prerequisites

Directions

Step 1: Installing Java Dependencies

1. Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.7.2</version>
</dependency>

Note:
SDK 2.7.2 or later is recommended.
SDK 2.7.4 or later is recommended if you use the batch message sending and receiving feature ( BatchReceive) of the client.

Step 2: Modifying Configuration Parameters

Modify the parameters in Constant.java.
package com.tencent.cloud.tdmq.pulsar;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;


public class Constant {


/**
* Service access address, located on the [Cluster Management] page access address
*/
private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";

/**
* Authorized role token of the namespace to be used, located on the [Role Management] page
*/
private static final String AUTHENTICATION = "eyJrZXlJZC......";


/**
* Initialize Pulsar client
*
* @return Pulsar client
*/
public static PulsarClient initPulsarClient() throws PulsarClientException {
// One Pulsar client corresponds to one client connection
// In principle, one process corresponds to one client. Try to avoid repeated creation, as this may consume resources
// For practice tutorials on clients and producers/consumers, refer to the official documentation https://cloud.tencent.com/document/product/1179/58090
PulsarClient pulsarClient = PulsarClient.builder()
// Service access address
.serviceUrl(SERVICE_URL)
// Authorize the role token
.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
System.out.println(">> pulsar client created.");
return pulsarClient;
}
}

PulsarClient pulsarClient = PulsarClient.builder()
// Service access address
.serviceUrl(SERVICE_URL)
// Role token
.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();

Parameter
Description
SERVICE_URL
Cluster access address, which can be viewed and copied on the Cluster page in the console.

AUTHENTICATION
The secret key for the role, which can be copied from the Role Management.


Step 3: Producing Messages

Create, compile, and run SimpleProducer.java.
package com.tencent.cloud.tdmq.pulsar.simple;

import com.tencent.cloud.tdmq.pulsar.Constant;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import java.nio.charset.StandardCharsets;

/**
* Synchronously send messages
*/
public class SimpleProducer {

public static void main(String[] args) throws PulsarClientException, InterruptedException {

// Initialize Pulsar client
PulsarClient pulsarClient = Constant.initPulsarClient();
// Construct a producer
Producer<byte[]> producer = pulsarClient.newProducer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
.topic("persistent://pulsar-xxx/sdk_java/topic1").create();
System.out.println(">> pulsar producer created.");
for (int i = 0; i < 10; i++) {
String value = "my-sync-message-" + i;
// Send the message
MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();
System.out.println("deliver msg " + msgId + ",value:" + value);

Thread.sleep(500);
}
// Disable the producer
producer.close();
// Close the client
pulsarClient.close();
}
}


Topic: Enter the name of the created topic. Enter the full path, namely persistent://clusterid/namespace/Topic. The clusterid/namespace/topic portion can be directly copied from the Topic page in the console.


Step 4: Consuming Messages

Create, compile, and run SimpleConsumer.java.
package com.tencent.cloud.tdmq.pulsar.simple;

import com.tencent.cloud.tdmq.pulsar.Constant;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;

/**
* Consumer
*/
public class SimpleConsumer {


public static void main(String[] args) throws PulsarClientException {
// Initialize Pulsar client
PulsarClient pulsarClient = Constant.initPulsarClient();
// Construct a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, copied from [Topic Management]
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub1_topic1")
// Declare the exclusive mode as the consumption mode
.subscriptionType(SubscriptionType.Exclusive)
// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
System.out.println(">> pulsar consumer created.");
for (int i = 0; i < 10; i++) {
// Receive a message corresponding to the current offset
Message<byte[]> msg = consumer.receive();
MessageId msgId = msg.getMessageId();
String value = new String(msg.getValue());
System.out.println("receive msg " + msgId + ",value:" + value);
// Messages must be acknowledged after being received, otherwise the offset will stay at the current message, causing message backlog
consumer.acknowledge(msg);
}
// Close the consumer
consumer.close();
// Close the client
pulsarClient.close();
}
}

You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topicpart can be copied directly from the Topic page in the console.

You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.

Step 5: Viewing Consumption Details

Go to the Message Query page to view message details.
Note:
Message trace query supports only a single message. If the Batch feature is enabled on the producer side, only the first message in the Batch can be queried in the message query.

The message trace is as follows:

Note:
The above provides a brief introduction to message publishing and subscribing methods. For more operations, see Demo or Pulsar Official Documentation.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support