pom.xml
file. This document uses a Maven project as an example.<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.7.2</version></dependency>
BatchReceive
) of the client.package com.tencent.cloud.tdmq.pulsar;import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;public class Constant {/*** Service access address, located on the [Cluster Management] page access address*/private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";/*** Authorized role token of the namespace to be used, located on the [Role Management] page*/private static final String AUTHENTICATION = "eyJrZXlJZC......";/*** Initialize Pulsar client** @return Pulsar client*/public static PulsarClient initPulsarClient() throws PulsarClientException {// One Pulsar client corresponds to one client connection// In principle, one process corresponds to one client. Try to avoid repeated creation, as this may consume resources// For practice tutorials on clients and producers/consumers, refer to the official documentation https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1PulsarClient pulsarClient = PulsarClient.builder()// Service access address.serviceUrl(SERVICE_URL)// Authorize the role token.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();System.out.println(">> pulsar client created.");return pulsarClient;}}
PulsarClient pulsarClient = PulsarClient.builder()// Service access address.serviceUrl(SERVICE_URL)// Role token.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
Parameter | Description |
SERVICE_URL | |
AUTHENTICATION | |
package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import java.nio.charset.StandardCharsets;/*** Synchronously send messages*/public class SimpleProducer {public static void main(String[] args) throws PulsarClientException, InterruptedException {// Initialize Pulsar clientPulsarClient pulsarClient = Constant.initPulsarClient();// Construct a producerProducer<byte[]> producer = pulsarClient.newProducer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`.topic("persistent://pulsar-xxx/sdk_java/topic1").create();System.out.println(">> pulsar producer created.");for (int i = 0; i < 10; i++) {String value = "my-sync-message-" + i;// Send the messageMessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();System.out.println("deliver msg " + msgId + ",value:" + value);Thread.sleep(500);}// Disable the producerproducer.close();// Close the clientpulsarClient.close();}}
persistent://clusterid/namespace/Topic
. The clusterid/namespace/topic
portion can be directly copied from the Topic page in the console.package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionInitialPosition;import org.apache.pulsar.client.api.SubscriptionType;/*** Consumer*/public class SimpleConsumer {public static void main(String[] args) throws PulsarClientException {// Initialize Pulsar clientPulsarClient pulsarClient = Constant.initPulsarClient();// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, copied from [Topic Management].topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub1_topic1")// Declare the exclusive mode as the consumption mode.subscriptionType(SubscriptionType.Exclusive)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");for (int i = 0; i < 10; i++) {// Receive a message corresponding to the current offsetMessage<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);// Messages must be acknowledged after being received, otherwise the offset will stay at the current message, causing message backlogconsumer.acknowledge(msg);}// Close the consumerconsumer.close();// Close the clientpulsarClient.close();}}
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.
Was this page helpful?