tencent cloud

文档反馈

公网 SASL_PLAINTEXT 方式接入

最后更新时间:2024-05-31 14:40:49

    操作场景

    该任务以 Node.js 客户端为例,指导您使用公网 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息。

    前提条件

    操作步骤

    步骤1:安装 C++ 依赖库

    1. 执行以下命令切换到 yum 源配置目录 /etc/yum.repos.d/
    cd /etc/yum.repos.d/
    2. 创建 yum 源配置文件 confluent.repo。
    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    3. 执行以下命令安装 C++ 依赖库。
    yum install librdkafka-devel

    步骤2:安装 Node.js 依赖库

    1. 执行以下命令为预处理器指定 OpenSSL 头文件路径。
    export CPPFLAGS=-I/usr/local/opt/openssl/include
    2. 执行以下命令为连接器指定 OpenSSL 库路径。
    export LDFLAGS=-L/usr/local/opt/openssl/lib
    3. 执行以下命令安装 Node.js 依赖库。
    npm install i --unsafe-perm node-rdkafka

    步骤3:准备配置

    创建消息队列 CKafka 版配置文件 setting.js。
    module.exports = {
    'sasl_plain_username': 'ckafka-xxxxxxx#ckafkademo',
    'sasl_plain_password': 'ckafkademo123',
    'bootstrap_servers': ["xxx.ckafka.tencentcloudmq.com:6018"],
    'topic_name': 'xxx',
    'group_id': 'xxx'
    }
    参数
    描述
    sasl_plain_username
    用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在 ACL 策略管理下的用户管理创建用户时设置。
    sasl_plain_password
    用户密码,在 CKafka 控制台实例详情页面 ACL 策略管理下的用户管理创建用户时设置。
    bootstrap_servers
    SASL 接入点,在 CKafka 控制台的实例详情页面的基本信息 > 接入方式获取。
    
    topic_name
    Topic名称,在 CKafka 控制台实例详情页面的 topic 管理创建和获取。
    
    group_id
    消费者的组 ID,根据业务需求自定义

    步骤4:发送消息

    1. 编写生产消息程序 producer.js。
    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
    'api.version.request': 'true',
    'bootstrap.servers': config['bootstrap_servers'],
    'dr_cb': true,
    'dr_msg_cb': true,
    'security.protocol' : 'SASL_PLAINTEXT',
    'sasl.mechanisms' : 'PLAIN',
    'sasl.username' : config['sasl_plain_username'],
    'sasl.password' : config['sasl_plain_password']
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    producer.on('ready', function() {
    connected = true
    console.log("connect ok")
    
    });
    
    function produce() {
    try {
    producer.produce(
    config['topic_name'],
    new Buffer('Hello CKafka SASL'),
    null,
    Date.now()
    );
    } catch (err) {
    console.error('Error occurred when sending message(s)');
    console.error(err);
    }
    }
    
    producer.on("disconnected", function() {
    connected = false;
    producer.connect();
    })
    
    producer.on('event.log', function(event) {
    console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
    console.log("error:" + error);
    });
    
    producer.on('delivery-report', function(err, report) {
    console.log("delivery-report: producer ok");
    });
    // Any errors we encounter, including connection errors
    producer.on('event.error', function(err) {
    console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");
    2. 执行以下命令发送消息。
    node producer.js
    3. 查看运行结果。
    
    4. CKafka 控制台 topic 管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚发送的消息。
    

    步骤5:订阅消息

    1. 创建消费消息程序 consumer.js。
    consumer.on('event.log', function(event) {
    console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
    console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
    console.log("event:" + event);
    });const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
    'api.version.request': 'true',
    'bootstrap.servers': config['bootstrap_servers'],
    'security.protocol' : 'SASL_PLAINTEXT',
    'sasl.mechanisms' : 'PLAIN',
    'message.max.bytes': 32000,
    'fetch.message.max.bytes': 32000,
    'max.partition.fetch.bytes': 32000,
    'sasl.username' : config['sasl_plain_username'],
    'sasl.password' : config['sasl_plain_password'],
    'group.id' : config['group_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
    console.log("connect ok");
    consumer.subscribe([config['topic_name']]);
    consumer.consume();
    })
    
    consumer.on('data', function(data) {
    console.log(data);
    });
    2. 执行以下命令消费消息。
    node consumer.js
    3. 查看运行结果。
    
    4. CKafka 控制台 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持