// Client configuration informationClientConfiguration config;// Set the role tokenAuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION);config.setAuth(auth);// Create a clientClient client(SERVICE_URL, config);
Parameter | Description |
SERVICE_URL | |
AUTHENTICATION | |
// Producer configurationsProducerConfiguration producerConf;producerConf.setBlockIfQueueFull(true);producerConf.setSendTimeout(5000);// ProducerProducer producer;// Create a producerResult result = client.createProducer(// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`"persistent://pulsar-xxx/sdk_cpp/topic1",producerConf,producer);if (result != ResultOk) {std::cout << "Error creating producer: " << result << std::endl;return -1;}
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.// Message contentstd::string content = "hello cpp client, this is a msg";// Create a message objectMessage msg = MessageBuilder().setContent(content).setPartitionKey("mykey") // Business key.setProperty("x", "1") // Set message parameters.build();// Send the messageResult result = producer.send(msg);if (result != ResultOk) {// The message failed to be sentstd::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;} else {// The message was successfully sentstd::cout << "The message " << content << " sent successfully" << std::endl;}
// Consumer configuration informationConsumerConfiguration consumerConfiguration;consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest);// ConsumerConsumer consumer;// Subscribe to a topicResult result = client.subscribe(// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`"persistent://pulsar-xxx/sdk_cpp/topic1",// Subscription name"sub_topic1",consumerConfiguration,consumer);if (result != ResultOk) {std::cout << "Failed to subscribe: " << result << std::endl;return -1;}
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.
subscriptionName
parameter, which can be viewed on the Consumption Management page.Message msg;// Obtain the messageconsumer.receive(msg);// Simulate the business processing logicstd::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << std::endl;// Return `ack` as the acknowledgementconsumer.acknowledge(msg);// Return `nack` if the consumption fails, and the message will be delivered again// consumer.negativeAcknowledge(msg);
Was this page helpful?