参数 | 说明 |
链接类型 | 当前支持 SASL_PLAINTEXT |
hosts | |
topic | 配置为日志主题 ID。例如:76c63473-c496-466b-XXXX-XXXXXXXXXXXX |
username | 配置为日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX |
password | 格式为 ${SecurityId}#${SecurityKey} 。例如:XXXXXXXXXXXXXX#YYYYYYYY |
地域 | 网络类型 | 端口号 | 服务入口 |
广州 | 内网 | 9095 | gz-producer.cls.tencentyun.com:9095 |
| 外网 | 9096 | gz-producer.cls.tencentcs.com:9096 |
output.kafka:enabled: truehosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO 服务地址;外网端口9096,内网端口9095topic: "${topicID}" # TODO topicIDversion: "0.11.0.2"compression: "${compress}" # 配置压缩方式,支持gzip,snappy,lz4;例如"lz4"username: "${logsetID}"password: "${SecurityId}#${SecurityKey}"
output {kafka {topic_id => "${topicID}"bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"sasl_mechanism => "PLAIN"security_protocol => "SASL_PLAINTEXT"compression_type => "${compress}"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"}}
import ("fmt""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Net.SASL.Mechanism = "PLAIN"config.Net.SASL.Version = int16(1)config.Net.SASL.Enable = trueconfig.Net.SASL.User = "${logsetID}" // TODO 日志集 IDconfig.Net.SASL.Password = "${SecurityId}#${SecurityKey}" // TODO 格式为 ${SecurityId}#${SecurityKey}config.Producer.Return.Successes = trueconfig.Producer.RequiredAcks = ${acks} // TODO 根据使用场景选择acks的值config.Version = sarama.V1_1_0_0config.Producer.Compression = ${compress} // TODO 配置压缩方式// TODO 服务地址;外网端口9096,内网端口9095producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9095"}, config)if err != nil {panic(err)}msg := &sarama.ProducerMessage{Topic: "${topicID}", // TODO topicIDValue: sarama.StringEncoder("goland sdk sender demo"),}// 发送消息for i := 0; i <= 5; i++ {partition, offset, err := producer.SendMessage(msg)if err != nil {panic(err)}fmt.Printf("send response; partition:%d, offset:%d\\n", partition, offset)}_ = producer.Close()}
from kafka import KafkaProducerif __name__ == '__main__':produce = KafkaProducer(# TODO 服务地址;外网端口9096,内网端口9095bootstrap_servers=["${region}-producer.cls.tencentyun.com:9095"],security_protocol='SASL_PLAINTEXT',sasl_mechanism='PLAIN',# TODO 日志集 IDsasl_plain_username='${logsetID}',# TODO 格式为 ${SecurityId}#${SecurityKey}sasl_plain_password='${SecurityId}#${SecurityKey}',api_version=(0, 11, 0),# TODO 配置压缩方式compression_type="${compress_type}",)for i in range(0, 5):# 发送消息 TODO topicIDfuture = produce.send(topic="${topicID}", value=b'python sdk sender demo')result = future.get(timeout=10)print(result)
<dependencies><!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.2</version></dependency></dependencies>
import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 0.配置一系列参数Properties props = new Properties();// TODO 使用时props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9095");// TODO 以下值根据业务场景设置props.put("acks", ${acks});props.put("retries", ${retries});props.put("batch.size", ${batch.size});props.put("linger.ms", ${linger.ms});props.put("buffer.memory", ${buffer.memory});props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO 配置压缩方式props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");// TODO 用户名为logsetID;密码为securityID和securityKEY的组合 securityID#securityKEYprops.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");// 1.创建一个生产者对象Producer<String, String> producer = new KafkaProducer<String, String>(props);// 2.调用send方法Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);System.out.println("offset = " + recordMetadata.offset());// 3.关闭生产者producer.close();}}
// https://github.com/edenhill/librdkafka - master#include <iostream>#include <librdkafka/rdkafka.h>#include <string>#include <unistd.h>#define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"#define USERNAME "${logsetID}"#define PASSWORD "${SecurityId}#${SecurityKey}"#define TOPIC "${topicID}"#define ACKS "${acks}"#define COMPRESS_TYPE "${compress_type}"static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stdout, "%% Message delivery failed : %s\\n", rd_kafka_err2str(rkmessage->err));} else {fprintf(stdout, "%% Message delivery successful %zu:%d\\n", rkmessage->len, rkmessage->partition);}}int main(int argc, char **argv) {// 1. 初始化配置rd_kafka_conf_t *conf = rd_kafka_conf_new();rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);char errstr[512];if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// 设置认证方式if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// 2. 创建 handlerrd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!rk) {rd_kafka_conf_destroy(conf);fprintf(stdout, "create produce handler failed: %s\\n", errstr);return -1;}// 3. 发送数据std::string value = "test lib kafka ---- ";for (int i = 0; i < 100; ++i) {retry:rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(TOPIC),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);if (err) {fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {rd_kafka_poll(rk, 1000);goto retry;}} else {fprintf(stdout, "send message to topic successful : %s\\n", TOPIC);}rd_kafka_poll(rk, 0);}std::cout << "message flush final" << std::endl;rd_kafka_flush(rk, 10 * 1000);if (rd_kafka_outq_len(rk) > 0) {fprintf(stdout, "%d message were not deliverer\\n", rd_kafka_outq_len(rk));}rd_kafka_destroy(rk);return 0;}
/** 该demo只提供了最简单的使用方法,具体生产还需要结合调用放来实现* 在使用过程中,demo中留的todo项需要替换使用** 注意:* 1. 该Demo基于Confluent.Kafka/1.8.2版本验证通过* 2. MessageMaxBytes最大值不能超过5M* 3. 该demo使用同步的方式生产,在使用时也可根据业务场景调整为异步的方式* 4. 其他参数在使用过程中可以根据业务参考文档自己调整:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html** Confluent.Kafka 参考文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html*/using Confluent.Kafka;namespace Producer{class Producer{private static void Main(string[] args){var config = new ProducerConfig{// todo 域名参考 https://www.tencentcloud.com/document/product/614/18940 填写,注意内网端口9095,公网端口9096BootstrapServers = "${domain}:${port}",SaslMechanism = SaslMechanism.Plain,SaslUsername = "${logsetID}", // todo topic所属日志集IDSaslPassword = "${SecurityId}#${SecurityKey}", // todo topic所属uin的密钥SecurityProtocol = SecurityProtocol.SaslPlaintext,Acks = Acks.None, // todo 根据实际使用场景赋值。可取值: Acks.None、Acks.Leader、Acks.AllMessageMaxBytes = 5242880 // todo 请求消息的最大 大小,最大不能超过5M};// deliveryHandlerAction<DeliveryReport<Null, string>> handler =r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");using (var produce = new ProducerBuilder<Null, string>(config).Build()){try{// todo 测试验证代码for (var i = 0; i < 100; i++){// todo 替换日志主题IDproduce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);}produce.Flush(TimeSpan.FromSeconds(10));}catch (ProduceException<Null, string> pe){Console.WriteLine($"send message receiver error : {pe.Error.Reason}");}}}}}
本页内容是否解决了您的问题?