<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.7.2</version></dependency>
PulsarClient pulsarClient = PulsarClient.builder()// 服务接入地址.serviceUrl(SERVICE_URL)// 授权角色密钥.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
// 构建byte[]类型的生产者Producer<byte[]> producer = pulsarClient.newProducer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-xxx/sdk_java/topic1").create();
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。//发送消息MessageId msgId = producer.newMessage()// 消息内容.value("this is a new message.".getBytes(StandardCharsets.UTF_8))// 业务key.key("youKey")// 业务相关参数.property("mykey", "myvalue").send();
// 关闭生产者producer.close();// 关闭客户端pulsarClient.close();
// 构建byte[]类型(默认类型)的消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)// 订阅.subscribe();
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。
// 接收当前offset对应的一条消息Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);// 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压consumer.acknowledge(msg);
// 消息监听器MessageListener<byte[]> myMessageListener = (consumer, msg) -> {try {System.out.println("Message received: " + new String(msg.getData()));// 回复ackconsumer.acknowledge(msg);} catch (Exception e) {// 消费失败,回复nackconsumer.negativeAcknowledge(msg);}};pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive)// 设置监听器.messageListener(myMessageListener)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
本页内容是否解决了您的问题?