permission. For more information, see Access Policy Templates.Parameter | Description |
LinkType | Currently, SASL_PLAINTEXT is supported. |
hosts | |
topic | Log topic ID. Example: 76c63473-c496-466b-XXXX-XXXXXXXXXXXX |
username | Logset ID. Example: 0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX |
password | Password in the format of ${SecurityId}#${SecurityKey} . Example: XXXXXXXXXXXXXX#YYYYYYYY |
Region | Network Type | Port Number | Service Entry |
Guangzhou | Private network | 9095 | |
| Public network | 9096 | |
output.kafka:enabled: truehosts: ["${region}"] # TODO: Service address. The public network port is 9096, and the private network port is 9095.topic: "${topicID}" # TODO: Topic IDversion: ""compression: "${compress}" # Configure the compression method. Valid values: `gzip`, `snappy`, `lz4`.username: "${logsetID}"password: "${SecurityId}#${SecurityKey}"
output {kafka {topic_id => "${topicID}"bootstrap_servers => "${region}${port}"sasl_mechanism => "PLAIN"security_protocol => "SASL_PLAINTEXT"compression_type => "${compress}"sasl_jaas_config => " required username='${logsetID}' password='${securityID}#${securityKEY}';"}}
import ("fmt""")func main(){config := sarama.NewConfig()config.Net.SASL.Mechanism = "PLAIN"config.Net.SASL.Version = int16(1)config.Net.SASL.Enable = trueconfig.Net.SASL.User = "${logsetID}" // TODO: Logset IDconfig.Net.SASL.Password = "${SecurityId}#${SecurityKey}" // TODO: Format: ${SecurityId}#${SecurityKey}config.Producer.Return.Successes = trueconfig.Producer.RequiredAcks = ${acks} // TODO: Select the acks value according to the use caseconfig.Version = sarama.V1_1_0_0config.Producer.Compression = ${compress} // TODO: Configuration compression mode// TODO: Service address. The public network port is 9096, and the private network port is 9095.producer, err := sarama.NewSyncProducer([]string{"${region}"}, config)if err != nil{panic(err)}msg := &sarama.ProducerMessage{Topic: "${topicID}", // TODO: Topic IDValue: sarama.StringEncoder("goland sdk sender demo"),}// Send the messagesfor i := 0; i <= 5; i++ {partition, offset, err := producer.SendMessage(msg)if err != nil{panic(err)}fmt.Printf("send response; partition:%d, offset:%d\\n", partition, offset)}_ = producer.Close()}
from kafka import KafkaProducerif __name__ == '__main__':produce = KafkaProducer(# TODO: Service address. The public network port is 9096, and the private network port is 9095.bootstrap_servers=["${region}"],security_protocol='SASL_PLAINTEXT',sasl_mechanism='PLAIN',# TODO: Logset IDsasl_plain_username='${logsetID}',# TODO: Format: ${SecurityId}#${SecurityKey}sasl_plain_password='${SecurityId}#${SecurityKey}',api_version=(0, 11, 0),# TODO: Configuration compression modecompression_type="${compress_type}",)for i in range(0, 5):# TODO: Topic ID of the sent messagefuture = produce.send(topic="${topicID}", value=b'python sdk sender demo')result = future.get(timeout=10)print(result)
import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 0. Set parametersProperties props = new Properties();// TODO: In useprops.put("bootstrap.servers", "${region}");// TODO: Set the following according to the actual business scenarioprops.put("acks", ${acks});props.put("retries", ${retries});props.put("batch.size", ${batch.size});props.put("", ${});props.put("buffer.memory", ${buffer.memory});props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO: configuration compression modeprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");// TODO: The user name is logsetID, and the password is the combination of securityID and securityKEY: securityID#securityKEY.props.put("sasl.jaas.config"," required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");// 1. Create a producer object.Producer<String, String> producer = new KafkaProducer<String, String>(props);// 2. Call the send method.Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);System.out.println("offset = " + recordMetadata.offset());// 3. Close the producer.producer.close();}}
// - master#include <iostream>#include <librdkafka/rdkafka.h>#include <string>#include <unistd.h>#define BOOTSTRAP_SERVER "${region}${port}"#define USERNAME "${logsetID}"#define PASSWORD "${SecurityId}#${SecurityKey}"#define TOPIC "${topicID}"#define ACKS "${acks}"#define COMPRESS_TYPE "${compress_type}"static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stdout, "%% Message delivery failed : %s\\n", rd_kafka_err2str(rkmessage->err));} else {fprintf(stdout, "%% Message delivery successful %zu:%d\\n", rkmessage->len, rkmessage->partition);}}int main(int argc, char **argv) {// 1. Initialize the configuration.rd_kafka_conf_t *conf = rd_kafka_conf_new();rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);char errstr[512];if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// Set the authentication method.if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// 2. Create a handler.rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!rk) {rd_kafka_conf_destroy(conf);fprintf(stdout, "create produce handler failed: %s\\n", errstr);return -1;}// 3. Send data.std::string value = "test lib kafka ---- ";for (int i = 0; i < 100; ++i) {retry:rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(TOPIC),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);if (err) {fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {rd_kafka_poll(rk, 1000);goto retry;}} else {fprintf(stdout, "send message to topic successful : %s\\n", TOPIC);}rd_kafka_poll(rk, 0);}std::cout << "message flush final" << std::endl;rd_kafka_flush(rk, 10 * 1000);if (rd_kafka_outq_len(rk) > 0) {fprintf(stdout, "%d message were not deliverer\\n", rd_kafka_outq_len(rk));}rd_kafka_destroy(rk);return 0;}
/** This demo only provides the easiest way of using the feature. The specific production needs to be implemented in combination with the call method.* During use, the TODO items in the demo need to be replaced with actual values.** Notes:* 1. This demo is verified based on Confluent.Kafka 1.8.2.* 2. The maximum value of `MessageMaxBytes` cannot exceed 5 MB.* 3. This demo adopts the sync mode for production. You can change to the async mode during use based on your business scenario.* 4. You can adjust other parameters during use as instructed at** Confluent.Kafka reference:*/using Confluent.Kafka;namespace Producer{class Producer{private static void Main(string[] args){var config = new ProducerConfig{// TODO: Domain name. For more information, visit The private network port is 9095, and the public network port is 9096.BootstrapServers = "${domain}:${port}",SaslMechanism = SaslMechanism.Plain,SaslUsername = "${logsetID}", // TODO: Logset ID of the topicSaslPassword = "${SecurityId}#${SecurityKey}", // TODO: UIN key of the topicSecurityProtocol = SecurityProtocol.SaslPlaintext,Acks = Acks.None, // TODO: Assign a value based on the actual use case. Valid values: `Acks.None`, `Acks.Leader`, `Acks.All`.MessageMaxBytes = 5242880 // TODO: The maximum size of the request message, which cannot exceed 5 MB.};// deliveryHandlerAction<DeliveryReport<Null, string>> handler =r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");using (var produce = new ProducerBuilder<Null, string>(config).Build()){try{// TODO: Test verification codefor (var i = 0; i < 100; i++){// TODO: Replace the log topic IDproduce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);}produce.Flush(TimeSpan.FromSeconds(10));}catch (ProduceException<Null, string> pe){Console.WriteLine($"send message receiver error : {pe.Error.Reason}");}}}}}
Was this page helpful?