操作场景
本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
1. 准备环境。
1.2 在项目中引入 Pulsar C++ client 相关头文件及动态库。
2. 创建客户端。
// 客户端配置信息
ClientConfiguration config;
// 设置授权角色密钥
AuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION);
config.setAuth(auth);
// 创建客户端
Client client(SERVICE_URL, config);
|
SERVICE_URL | 集群接入地址,可以在控制台 集群管理 页面查看并复制。 |
AUTHENTICATION | |
3. 创建生产者。
// 生产者配置
ProducerConfiguration producerConf;
producerConf.setBlockIfQueueFull(true);
producerConf.setSendTimeout(5000);
// 生产者
Producer producer;
// 创建生产者
Result result = client.createProducer(
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
"persistent://pulsar-xxx/sdk_cpp/topic1",
producerConf,
producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << std::endl;
return -1;
}
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。 3. 发送消息。
// 消息内容
std::string content = "hello cpp client, this is a msg";
// 构建消息对象
Message msg = MessageBuilder().setContent(content)
.setPartitionKey("mykey") // 业务key
.setProperty("x", "1") // 设置消息参数
.build();
// 发送消息
Result result = producer.send(msg);
if (result != ResultOk) {
// 发送失败
std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
} else {
// 发送成功
std::cout << "The message " << content << " sent successfully" << std::endl;
}
4. 创建消费者。
// 消费者配置信息
ConsumerConfiguration consumerConfiguration;
consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest);
// 消费者
Consumer consumer;
// 订阅topic
Result result = client.subscribe(
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
"persistent://pulsar-xxx/sdk_cpp/topic1",
// 订阅名称
"sub_topic1",
consumerConfiguration,
consumer);
if (result != ResultOk) {
std::cout << "Failed to subscribe: " << result << std::endl;
return -1;
}
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。
subscriptionName 需要写入订阅名,可在消费管理界面查看。
5. 消费消息。
Message msg;
// 获取消息
consumer.receive(msg);
// 模拟业务
std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << std::endl;
// 回复ack
consumer.acknowledge(msg);
// 消费失败回复nack, 消息将会重新投递
// consumer.negativeAcknowledge(msg);
本页内容是否解决了您的问题?