config := &kafka.ConfigMap{"bootstrap.servers": "localhost","api.version.request": true,}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": -1, //ack mode, with the default value being -1."client.id": "rdkafka", //Client ID."compression.type": "none", // Specify compression type."compression.level": -1, //Compression level."batch.num.messages": 10000, // By default, a batch can buffer up to 10,000 messages to form a MessageSet for batch sending, enhancing performance."batch.size": 1000000, //The limit for the total size of a MessageSet batch, by default not exceeding 1,000,000 bytes."queue.buffering.max.ms": 5, //The delay before transmitting message batches (MessageSets) to the Broker to buffer messages, 5 ms by default."queue.buffering.max.messages": 100000, //The total number of messages buffered by the Producer should not exceed 100,000."queue.buffering.max.kbytes": 1048576, //MessageSets for the Producer buffering messages."message.send.max.retries": 2147483647, //Number of retries, 2,147,483,647 by default."retry.backoff.ms": 100, //Retry interval, with the default value being 100 ms."socket.timeout.ms": 60000, //Session timeout period, with the default value being 60 s.}producer, err := kafka.NewProducer(config)if err != nil {panic(fmt.Sprintf("Failed to create producer: %s", err))}// Use producer to send messages and perform other operations...// Close producerproducer.Close()}
{"Compression","CompressionLZ4"}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Producer.p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks": "1","compression.type": "none","batch.num.messages": "1000",})if err != nil {fmt.Printf("Failed to create producer: %s\\n", err)return}// Send message.for i := 0; i < 10; i++ {topic := "test-topic"value := fmt.Sprintf("hello world %d", i)message := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(value),}p.Produce(message, nil)}// Close Kafka Producer.p.Flush(15 * 1000)p.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Consumer.c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset":"earliest","fetch.min.bytes":1, // Minimum pulling byte count."fetch.max.bytes":52428800, // Maximum pulling byte count."fetch.wait.max.ms":"500", //If there are no new messages to consume, wait 500 ms by default."enable.auto.commit":true, //Enable automatic offset commit, with the default value being true."auto.commit.interval.ms":5000, //Interval between automatic offset commit, with the default value being 5 s."max.poll.interval.ms": 300000, // Maximum delay between two poll operations for the Consumer, with the default value being 5 minutes."session.timeout.ms": 45000, //Session time, with the default value being 45 s."heartbeat.interval.ms": 3000, //Heartbeat time, with the default value being 3 s.})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to topics.c.SubscribeTopics([]string{"test-topic"}, nil)// Manually commit offset.for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Close Kafka consumer.c.Close()}
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": true, //Whether to enable auto-commit offset. True means yes."auto.commit.interval.ms": 5000, //Interval for auto-commit offsets. Value of 5000 milliseconds (i.e., 5 seconds) means the offset is automatically committed every 5 seconds."max.poll.interval.ms": 300000, //Maximum wait time for the consumer in a single poll operation. Value of 300,000 milliseconds (i.e., 5 minutes) means the consumer waits up to 5 minutes in a single poll operation."session.timeout.ms": 10000, //Specify the session timeout period between the Consumer and the broker. Value is set to 10 seconds."heartbeat.interval.ms": 3000, //Specify the interval for the consumer to send heartbeat messages. Value is set to 3,000 milliseconds (i.e., 3 seconds).})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to topics.c.SubscribeTopics([]string{"test-topic"}, nil)// Automatically commit offset.for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Close Kafka Consumer.c.Close()
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka")func main() {// Configure Kafka Consumerc, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "test-group","auto.offset.reset": "earliest","enable.auto.commit": false,"max.poll.interval.ms": 300000,"session.timeout.ms": 10000,"heartbeat.interval.ms": 3000,})if err != nil {fmt.Printf("Failed to create consumer: %s\\n", err)return}// Subscribe to topics.c.SubscribeTopics([]string{"test-topic"}, nil)// Manually commit offsets.for {ev := c.Poll(100)if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Received message: %s\\n", string(e.Value))c.CommitMessage(e)case kafka.Error:fmt.Printf("Error: %v\\n", e)}}// Close Kafka Consumer.c.Close()
Feedback