tencent cloud

All product documents
TDMQ for RocketMQ
Use of C++ SDK
Last updated: 2024-01-17 16:56:06
Use of C++ SDK
Last updated: 2024-01-17 16:56:06

Overview

This document describes how to use an open-source SDK to send and receive messages with the SDK for C++ serving as example, for you to better understand the complete procedure involved in message sending and receiving.
Note:
The C++ client is used as an example. For clients of other languages, see the SDK Documentation.

Prerequisites

You have created and prepared the required resources.
You have installed a compiler suite supporting C++11.
You have installed Bazel version 5.2.0 or CMake version 3.13 and later.
If you use CMake for compilation, gRPC version 1.46.3 is recommended due to incompatibilities between higher versions and the SDK.

Directions:

Step 1: Installing the SDK for C++

Note:
TDMQ for RocketMQ 5.x series does not currently support TLS. A patch must be applied.

Step 2: Producing Messages

#include <algorithm>
#include <atomic>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <system_error>

#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"

using namespace ROCKETMQ_NAMESPACE;

const std::string &alphaNumeric() {
static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
return alpha_numeric;
}

std::string randomString(std::string::size_type len) {
std::string result;
result.reserve(len);
std::random_device rd;
std::mt19937 generator(rd());
std::string source(alphaNumeric());
std::string::size_type generated = 0;
while (generated < len) {
std::shuffle(source.begin(), source.end(), generator);
std::string::size_type delta = std::min({len - generated, source.length()});
result.append(source.substr(0, delta));
generated += delta;
}
return result;
}

static const std::string topic = "xxx";
// Enter the access address provided by Tencent Cloud
static const std::string access_point = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
// Add the configured ak and sk
static const std::string access_key = "xxx";
static const std::string access_secret = "xxx";
static const uint32_t total = 32;
static const int32_t message_body_size = 128;


int main(int argc, char *argv[]) {
CredentialsProviderPtr credentials_provider;
if (!access_key.empty() && !access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(access_key, access_secret);
}

// In most case, you don't need to create too many producers, singletion pattern is recommended.
auto producer = Producer::newBuilder()
.withConfiguration(Configuration::newBuilder()
.withEndpoints(access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(false)
.build())
.withTopics({topic})
.build();

std::atomic_bool stopped;
std::atomic_long count(0);

auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
while (count.compare_exchange_weak(cnt, 0)) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
}
};

std::thread stats_thread(stats_lambda);

std::string body = randomString(message_body_size);

try {
for (std::size_t i = 0; i < total; ++i) {
auto message = Message::newBuilder()
.withTopic(topic)
.withTag("TagA")
.withKeys({"Key-" + std::to_string(i)})
.withBody(body)
.build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
if (ec) {
std::cerr << "Failed to publish message to " << topic << ". Cause: " << ec.message() << std::endl;
} else {
std::cout << "Publish message to " << topic << " OK. Message-ID: " << send_receipt.message_id
<< std::endl;
count++;
}
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
}
stopped.store(true, std::memory_order_relaxed);
if (stats_thread.joinable()) {
stats_thread.join();
}

return EXIT_SUCCESS;
}

Step 3: Consuming Messages

TDMQ for RocketMQ 5.x series of Tencent Cloud supports two consumption modes: Push Consumer and Simple Consumer.
The following code is an example based on Push Consumer:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

#include "rocketmq/Logger.h"
#include "rocketmq/PushConsumer.h"

using namespace ROCKETMQ_NAMESPACE;

static const std::string topic = "xxx";
// Enter the access address provided by Tencent Cloud
static const std::string access_point = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
// Add the configured ak and sk
static const std::string access_key = "xxx";
static const std::string access_secret = "xxx";
static const std::string group = "group-xxx";

int main(int argc, char *argv[]) {
auto &logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.init();

std::string tag = "*";

auto listener = [](const Message &message) {
std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl;
return ConsumeResult::SUCCESS;
};

CredentialsProviderPtr credentials_provider;
if (!access_key.empty() && !access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(access_key, access_secret);
}

// In most case, you don't need to create too many consumers, singletion pattern is recommended.
auto push_consumer = PushConsumer::newBuilder()
.withGroup(group)
.withConfiguration(Configuration::newBuilder()
.withEndpoints(access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.withSsl(false)
.build())
.withConsumeThreads(4)
.withListener(listener)
.subscribe(topic, tag)
.build();

std::this_thread::sleep_for(std::chrono::minutes(30));

return EXIT_SUCCESS;
}

Step 4: Viewing Message Details

After the message is sent, you will receive a message ID (messageID). Developers can query the recently sent messages on the Message Query page, as shown in the following figure. Information such as details and traces for specific messages is also available. For details, see Message Query.


Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon