

// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive).subscribe();


// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为 Shared(共享)模式.subscriptionType(SubscriptionType.Shared).subscribe();


// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为灾备模式.subscriptionType(SubscriptionType.Failover).subscribe();


// 构建生产者Producer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false).create();// 发送消息时设置keyMessageId msgId = producer.newMessage()// 消息内容.value(value.getBytes(StandardCharsets.UTF_8))// 在此处设置key,key相同的消息只会被分发到同一个消费者。.key("youKey1").send();
// 构建生产者Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(true).batcherBuilder(BatcherBuilder.KEY_BASED).create();// 发送消息时设置keyMessageId msgId = producer.newMessage()// 消息内容.value(value.getBytes(StandardCharsets.UTF_8))// 在此处设置key,key相同的消息只会被分发到同一个消费者。.key("youKey1").send();
// 构建消费者 Consumer<byte[]> consumer = pulsarClient.newConsumer() // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 .topic("persistent://pulsar-xxx/sdk_java/topic1") // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名 .subscriptionName("sub_topic1") // 声明消费模式为 Key_Shared(Key 共享)模式 .subscriptionType(SubscriptionType.Key_Shared) .subscribe();

// 构建生产者Producer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false) // 禁用batch.create();// 发送消息时设置keyMessageId msgId = producer.newMessage()// 消息内容.value(value.getBytes(StandardCharsets.UTF_8))// 在此处设置key,key相同的消息会发送到同一个分区中.key("youKey1").send();
// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为Failover模式.subscriptionType(SubscriptionType.Failover).subscribe();
// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxx/sdk_java/topic1")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("sub_topic1")// 声明消费模式为 Key_Shared(Key 共享)模式.subscriptionType(SubscriptionType.Key_Shared)// 设置为不允许乱序.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false)).subscribe();
文档反馈