nodejskafkademo
in the downloaded demo to the Linux server.nodejskafkademo
directory.yum
source configuration directory /etc/yum.repos.d/
.cd /etc/yum.repos.d/
yum
source configuration file confluent.repo
.[Confluent.dist]name=Confluent repository (dist)baseurl=https://packages.confluent.io/rpm/5.1/7gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1[Confluent]name=Confluent repositorybaseurl=https://packages.confluent.io/rpm/5.1gpgcheck=1gpgkey=https://packages.confluent.io/rpm/5.1/archive.keyenabled=1
yum install librdkafka-devel
export CPPFLAGS=-I/usr/local/opt/openssl/include
export LDFLAGS=-L/usr/local/opt/openssl/lib
npm install i --unsafe-perm node-rdkafka
setting.js
.module.exports = {'bootstrap_servers': ["xxx.xx.xxx:xxxx"],'topic_name': 'xxx','group_id': 'xxx'}
Parameter | Description |
bootstrap_servers | Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console. |
topic_name | Topic name, which can be copied on the Topic Management page in the console. |
group_id | You can customize it. After the demo runs successfully, you can see the consumer group on the Consumer Group page. |
producer.js
.const Kafka = require('node-rdkafka');const config = require('./setting');console.log("features:" + Kafka.features);console.log(Kafka.librdkafkaVersion);var producer = new Kafka.Producer({'api.version.request': 'true',// To set the entry service, obtain the corresponding service address in the console.'bootstrap.servers': config['bootstrap_servers'],'dr_cb': true,'dr_msg_cb': true,// Number of retries upon request error. We recommend that you set the parameter to a value greater than 0 to enable retries and guarantee that messages are not lost to the greatest extent possible.'retries': '0',// Interval between a request failure and the next retry"retry.backoff.ms": 100,// Network request timeout of the producer'socket.timeout.ms': 6000,});var connected = falseproducer.setPollInterval(100);producer.connect();producer.on('ready', function() {connected = trueconsole.log("connect ok")});producer.on("disconnected", function() {connected = false;producer.connect();})producer.on('event.log', function(event) {console.log("event.log", event);});producer.on("error", function(error) {console.log("error:" + error);});function produce() {try {producer.produce(config['topic_name'],null,new Buffer('Hello CKafka Default'),null,Date.now());} catch (err) {console.error('Error occurred when sending message(s)');console.error(err);}}producer.on('delivery-report', function(err, report) {console.log("delivery-report: producer ok");});producer.on('event.error', function(err) {console.error('event.error:' + err);})setInterval(produce, 1000, "Interval");
node producer.js
consumer.js
.const Kafka = require('node-rdkafka');const config = require('./setting');console.log(Kafka.features);console.log(Kafka.librdkafkaVersion);console.log(config)var consumer = new Kafka.KafkaConsumer({'api.version.request': 'true',// To set the entry service, obtain the corresponding service address in the console.'bootstrap.servers': config['bootstrap_servers'],'group.id' : config['group_id'],// Consumer timeout period when the Kafka consumer grouping mechanism is used. If the broker does not receive the heartbeat of the consumer within this period,// the consumer will be considered to have failed and the broker will initiate rebalancing again.'session.timeout.ms': 10000,// Client request timeout period. If no response is received after this time elapses, the request will time out and fail.'metadata.request.timeout.ms': 305000,// Set the internal retry interval of the client'reconnect.backoff.max.ms': 3000});consumer.connect();consumer.on('ready', function() {console.log("connect ok");consumer.subscribe([config['topic_name']]);consumer.consume();})consumer.on('data', function(data) {console.log(data);});consumer.on('event.log', function(event) {console.log("event.log", event);});consumer.on('error', function(error) {console.log("error:" + error);});consumer.on('event', function(event) {console.log("event:" + event);});
node consumer.js
문제 해결에 도움이 되었나요?