操作背景
为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。
1. Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
2. 共享模式(Shared):消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
3. 灾备模式(Failover):当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。
4. KEY 共享模式(Key_Shared):当存在多个 Consumer 时,将根据消息的 Key 进行分发,Key 相同的消息只会被分发到同一个消费者。
本文以官网提供的 Demo 为例,介绍共享模式(Shared)订阅模式功能与使用方式。
操作步骤
步骤1. 在控制台新建 Pulsar 资源
2. 在左侧导航栏选择 Topic 管理页签,当前集群和命名空间分别勾选创建好的集群和命名空间。
3. 单击新建,输入 Topic 名称和说明,其他选项可保持默认,单击保存,创建一个 topic。 4. 单击操作列的新增订阅,为刚新建好的 Topic 创建俩个订阅关系。
5. 单击操作列的更多 > 查看订阅/消费者,可看到刚创建好的订阅。
步骤2. 下载 Demo 并配置相关参数
2. 修改 Constant.java 参数。
|
SERVICE_URL | 集群接入地址,可以在控制台集群管理页面查看并复制。 |
AUTHENTICATION | 角色的密钥,角色密钥可以在角色管理中复制。 |
步骤3. 生产消息
1. 进入/simple目录下,修改 SimpleProducer.java 参数。
topic:填写创建好的topic3名称,需要填入完整路径,即persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理页面直接复制。
2. 编译并运行 SimpleProducer.java 程序发送消息。运行结果如下,可以看到生产者已经向 topic 中生产了10条消息。
步骤4. 消费消息
1. 进入/submodle目录下,修改SharedConsumer1.java 和 SharedConsumer1.java 程序参数。
topic:填写创建好的 topic 名称,需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic
的部分可以从控制台上 Topic 管理页面直接复制。
.subscriptionName:填写 topic 的订阅名称,可在 Topic 消费者界面查看。SharedConsumer1 中填写 sub1 订阅名称,SharedConsumer2 中填写sub2 订阅名称。
2. 编译并运行消息重试程序 SharedConsumer1.java 和 SharedConsumer2.java 。运行结果如下:
本页内容是否解决了您的问题?