/etc/yum.repos.d/
。cd /etc/yum.repos.d/
[Confluent.dist]name=Confluent repository (dist)baseurl=https://packages.confluent.io/rpm/5.1/7gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1[Confluent]name=Confluent repositorybaseurl=https://packages.confluent.io/rpm/5.1gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1
yum install librdkafka-devel
export CPPFLAGS=-I/usr/local/opt/openssl/include
export LDFLAGS=-L/usr/local/opt/openssl/lib
npm install i --unsafe-perm node-rdkafka
module.exports = {'sasl_plain_username': 'your_user_name','sasl_plain_password': 'your_user_password','bootstrap_servers': ["xxx.xx.xx.xx:port"],'topic_name': 'xxx','group_id': 'xxx'}
参数 | 描述 |
bootstrapServers | 接入地址,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_username | 用户名,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_password | 用户密码,在控制台的弹性 Topic 基本信息页面获取。 |
topic_name | Topic 名称,在控制台的弹性 Topic 基本信息页面获取。 |
group.id | 消费组名称,在控制台的弹性 Topic 的订阅关系列表获取。 |
const Kafka = require('node-rdkafka');const config = require('./setting');console.log("features:" + Kafka.features);console.log(Kafka.librdkafkaVersion);var producer = new Kafka.Producer({'api.version.request': 'true','bootstrap.servers': config['bootstrap_servers'],'dr_cb': true,'dr_msg_cb': true,'security.protocol' : 'SASL_PLAINTEXT','sasl.mechanisms' : 'PLAIN','sasl.username' : config['sasl_plain_username'],'sasl.password' : config['sasl_plain_password']});var connected = falseproducer.setPollInterval(100);producer.connect();producer.on('ready', function() {connected = trueconsole.log("connect ok")});function produce() {try {producer.produce(config['topic_name'],new Buffer('Hello CKafka SASL'),null,Date.now());} catch (err) {console.error('Error occurred when sending message(s)');console.error(err);}}producer.on("disconnected", function() {connected = false;producer.connect();})producer.on('event.log', function(event) {console.log("event.log", event);});producer.on("error", function(error) {console.log("error:" + error);});producer.on('delivery-report', function(err, report) {console.log("delivery-report: producer ok");});// Any errors we encounter, including connection errorsproducer.on('event.error', function(err) {console.error('event.error:' + err);})setInterval(produce,1000,"Interval");
node producer.js
consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});const Kafka = require('node-rdkafka');const config = require('./setting');console.log(Kafka.features);console.log(Kafka.librdkafkaVersion);console.log(config)var consumer = new Kafka.KafkaConsumer({'api.version.request': 'true','bootstrap.servers': config['bootstrap_servers'],'security.protocol' : 'SASL_PLAINTEXT','sasl.mechanisms' : 'PLAIN','message.max.bytes': 32000,'fetch.message.max.bytes': 32000,'max.partition.fetch.bytes': 32000,'sasl.username' : config['sasl_plain_username'],'sasl.password' : config['sasl_plain_password'],'group.id' : config['group_id']});consumer.connect();consumer.on('ready', function() {console.log("connect ok");consumer.subscribe([config['topic_name']]);consumer.consume();})consumer.on('data', function(data) {console.log(data);});consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});
node consumer.js
本页内容是否解决了您的问题?