tencent cloud

文档反馈

Java SDK

最后更新时间:2024-12-02 17:10:17

    操作场景

    本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

    前提条件

    操作步骤

    步骤1:安装 Java 依赖库

    1. Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.7.2</version>
    </dependency>
    
    说明:
    建议使用 2.7.2 及以上版本。
    如果在客户端中使用批量收发消息功能(BatchReceive),则使用 2.7.4 及以上版本的 SDK。

    步骤2:修改配置参数

    修改 Constant.java 参数。
    package com.tencent.cloud.tdmq.pulsar;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    
    public class Constant {
    
    
    /**
    * 服务接入地址,位于【集群管理】页面接入地址
    */
    private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";
    
    /**
    * 要使用的命名空间授权的角密钥,位于【角色管理】页面
    */
    private static final String AUTHENTICATION = "eyJrZXlJZC......";
    
    
    /**
    * 初始化pulsar客户端
    *
    * @return pulsar客户端
    */
    public static PulsarClient initPulsarClient() throws PulsarClientException {
    // 一个Pulsar client对应一个客户端链接
    // 原则上一个进程一个client,尽量避免重复创建,消耗资源
    // 关于客户端和生产消费者的实践教程,可以参考官方文档 https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1
    PulsarClient pulsarClient = PulsarClient.builder()
    // 服务接入地址
    .serviceUrl(SERVICE_URL)
    // 授权角色密钥
    .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
    System.out.println(">> pulsar client created.");
    return pulsarClient;
    }
    }
    
    参数
    说明
    SERVICE_URL
    集群接入地址,可以在控制台 集群管理 页面查看并复制。
    
    
    
    AUTHENTICATION
    角色的密钥,角色密钥可以在角色管理中复制。
    
    
    

    步骤3. 生产消息

    创建并编译运行 SimpleProducer.java。
    package com.tencent.cloud.tdmq.pulsar.simple;
    
    import com.tencent.cloud.tdmq.pulsar.Constant;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    import java.nio.charset.StandardCharsets;
    
    /**
    * 同步发送消息
    */
    public class SimpleProducer {
    
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
    
    // 初始化pulsar客户端
    PulsarClient pulsarClient = Constant.initPulsarClient();
    // 构建生产者
    Producer<byte[]> producer = pulsarClient.newProducer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
    .topic("persistent://pulsar-xxx/sdk_java/topic1").create();
    System.out.println(">> pulsar producer created.");
    for (int i = 0; i < 10; i++) {
    String value = "my-sync-message-" + i;
    // 发送消息
    MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();
    System.out.println("deliver msg " + msgId + ",value:" + value);
    
    Thread.sleep(500);
    }
    // 关闭生产者
    producer.close();
    // 关闭客户端
    pulsarClient.close();
    }
    }
    
    
    topic:填写创建好的 topic 名称,需要填入完整路径,即 persistent://clusterid/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
    

    步骤4:消费消息

    创建并编译运行 SimpleConsumer.java。
    package com.tencent.cloud.tdmq.pulsar.simple;
    
    import com.tencent.cloud.tdmq.pulsar.Constant;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    import org.apache.pulsar.client.api.SubscriptionType;
    
    /**
    * 消费者
    */
    public class SimpleConsumer {
    
    
    public static void main(String[] args) throws PulsarClientException {
    // 初始化pulsar客户端
    PulsarClient pulsarClient = Constant.initPulsarClient();
    // 构建消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("sub1_topic1")
    // 声明消费模式为exclusive(独占)模式
    .subscriptionType(SubscriptionType.Exclusive)
    // 配置从最早开始消费,否则可能会消费不到历史消息
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    System.out.println(">> pulsar consumer created.");
    for (int i = 0; i < 10; i++) {
    // 接收当前offset对应的一条消息
    Message<byte[]> msg = consumer.receive();
    MessageId msgId = msg.getMessageId();
    String value = new String(msg.getValue());
    System.out.println("receive msg " + msgId + ",value:" + value);
    // 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压
    consumer.acknowledge(msg);
    }
    // 关闭消费者
    consumer.close();
    // 关闭客户端
    pulsarClient.close();
    }
    }
    
    
    Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上Topic管理页面直接复制。
    
    
    
    subscriptionName 需要写入订阅名,可在消费管理界面查看。

    步骤5:查看消费情况

    进入 消息查询 页面,可查看消息详情。
    说明:
    消息轨迹的查询只支持单条消息,如果用户在 Producer 侧开启了 Batch 功能,则在消息查询中,同一个 Batch 的消息只可以查询到 Batch 中的第一条消息。
    
    
    
    消息轨迹如下:
    
    
    
    说明:
    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoPulsar 官方文档
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持