yum
source configuration directory /etc/yum.repos.d/
.cd /etc/yum.repos.d/
yum
source configuration file confluent.repo
.[Confluent.dist]name=Confluent repository (dist)baseurl=https://packages.confluent.io/rpm/5.1/7gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1[Confluent]name=Confluent repositorybaseurl=https://packages.confluent.io/rpm/5.1gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1
yum install librdkafka-devel
export CPPFLAGS=-I/usr/local/opt/openssl/include
export LDFLAGS=-L/usr/local/opt/openssl/lib
npm install i --unsafe-perm node-rdkafka
setting.js
.module.exports = {'sasl_plain_username': 'ckafka-xxxxxxx#ckafkademo','sasl_plain_password': 'ckafkademo123','bootstrap_servers': ["xxx.ckafka.tencentcloudmq.com:6018"],'topic_name': 'xxx','group_id': 'xxx'}
Parameter | Description |
sasl_plain_username | Username in the format of instance ID + # + username . The instance ID can be obtained in Basic Info on the instance details page in the CKafka console, and the username is set when the user is created in ACL Policy Management > User Management. |
sasl_plain_password | User password, which is set when the user is created in ACL Policy Management > User Management on the instance details page in the CKafka console. |
bootstrap_servers | SASL access point. You can obtain it in Basic Info >Access Mode on the instance details page in the CKafka console. |
topic_name | Topic name, which can be created and obtained in Topic Management on the instance details page in the CKafka console. |
group_id | Consumer group ID, which can be customized as needed. |
producer.js
.const Kafka = require('node-rdkafka');const config = require('./setting');console.log("features:" + Kafka.features);console.log(Kafka.librdkafkaVersion);var producer = new Kafka.Producer({'api.version.request': 'true','bootstrap.servers': config['bootstrap_servers'],'dr_cb': true,'dr_msg_cb': true,'security.protocol' : 'SASL_PLAINTEXT','sasl.mechanisms' : 'PLAIN','sasl.username' : config['sasl_plain_username'],'sasl.password' : config['sasl_plain_password']});var connected = falseproducer.setPollInterval(100);producer.connect();producer.on('ready', function() {connected = trueconsole.log("connect ok")});function produce() {try {producer.produce(config['topic_name'],new Buffer('Hello CKafka SASL'),null,Date.now());} catch (err) {console.error('Error occurred when sending message(s)');console.error(err);}}producer.on("disconnected", function() {connected = false;producer.connect();})producer.on('event.log', function(event) {console.log("event.log", event);});producer.on("error", function(error) {console.log("error:" + error);});producer.on('delivery-report', function(err, report) {console.log("delivery-report: producer ok");});// Any errors we encounter, including connection errorsproducer.on('event.error', function(err) {console.error('event.error:' + err);})setInterval(produce,1000,"Interval");
node producer.js
consumer.js
.consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});const Kafka = require('node-rdkafka');const config = require('./setting');console.log(Kafka.features);console.log(Kafka.librdkafkaVersion);console.log(config)var consumer = new Kafka.KafkaConsumer({'api.version.request': 'true','bootstrap.servers': config['bootstrap_servers'],'security.protocol' : 'SASL_PLAINTEXT','sasl.mechanisms' : 'PLAIN','message.max.bytes': 32000,'fetch.message.max.bytes': 32000,'max.partition.fetch.bytes': 32000,'sasl.username' : config['sasl_plain_username'],'sasl.password' : config['sasl_plain_password'],'group.id' : config['group_id']});consumer.connect();consumer.on('ready', function() {console.log("connect ok");consumer.subscribe([config['topic_name']]);consumer.consume();})consumer.on('data', function(data) {console.log(data);});
node consumer.js
Was this page helpful?