<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.7.2</version></dependency>
package com.tencent.cloud.tdmq.pulsar;import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;public class Constant {/*** 服务接入地址,位于【集群管理】页面接入地址*/private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";/*** 要使用的命名空间授权的角密钥,位于【角色管理】页面*/private static final String AUTHENTICATION = "eyJrZXlJZC......";/*** 初始化pulsar客户端** @return pulsar客户端*/public static PulsarClient initPulsarClient() throws PulsarClientException {// 一个Pulsar client对应一个客户端链接// 原则上一个进程一个client,尽量避免重复创建,消耗资源// 关于客户端和生产消费者的实践教程,可以参考官方文档 https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1PulsarClient pulsarClient = PulsarClient.builder()// 服务接入地址.serviceUrl(SERVICE_URL)// 授权角色密钥.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();System.out.println(">> pulsar client created.");return pulsarClient;}}
参数 | 说明 |
SERVICE_URL | |
AUTHENTICATION | 角色的密钥,角色密钥可以在角色管理中复制。 |
package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import java.nio.charset.StandardCharsets;/*** 同步发送消息*/public class SimpleProducer {public static void main(String[] args) throws PulsarClientException, InterruptedException {// 初始化pulsar客户端PulsarClient pulsarClient = Constant.initPulsarClient();// 构建生产者Producer<byte[]> producer = pulsarClient.newProducer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-xxx/sdk_java/topic1").create();System.out.println(">> pulsar producer created.");for (int i = 0; i < 10; i++) {String value = "my-sync-message-" + i;// 发送消息MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();System.out.println("deliver msg " + msgId + ",value:" + value);Thread.sleep(500);}// 关闭生产者producer.close();// 关闭客户端pulsarClient.close();}}
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionInitialPosition;import org.apache.pulsar.client.api.SubscriptionType;/*** 消费者*/public class SimpleConsumer {public static void main(String[] args) throws PulsarClientException {// 初始化pulsar客户端PulsarClient pulsarClient = Constant.initPulsarClient();// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub1_topic1")// 声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");for (int i = 0; i < 10; i++) {// 接收当前offset对应的一条消息Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);// 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压consumer.acknowledge(msg);}// 关闭消费者consumer.close();// 关闭客户端pulsarClient.close();}}
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上Topic管理页面直接复制。
本页内容是否解决了您的问题?