Overview
TDMQ for CKafka is a distributed stream processing platform designed for building real-time data pipelines and streaming applications. It boasts high throughput, low latency, scalability, and fault tolerance.
Sarama: A Kafka library developed by Shopify, providing features such as producers, consumers, and partition consumers. The library performs well, and has an active community support. Confluent-Kafka-Go: A Kafka library developed by Confluent, providing high-level APIs that are easy to use. Based on the librdkafka C library, its performance is excellent, though it can be somewhat complex to install and use. This document describes the key parameters, practical tutorials, and FAQs of the Confluent Go client mentioned above.
Producer Practice
Version Selection
When the Confluent Go SDK is used, you can specify the address of the Kafka cluster through the configuration parameter bootstrap.servers, while the version of the Broker can be set through the api.version.request parameter, enabling the Confluent Go SDK to automatically detect the Broker's version at startup.
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost",
"api.version.request": true,
}
Producer Parameters and Optimization
Producer Parameters
Confluent Go is developed based on librdkafka. When writing to Kafka using the Confluent Go Client, the configuration parameters passed to librdkafka involve the following key parameters, with the relevant parameters and default values as follows:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": -1,
"client.id": "rdkafka",
"compression.type": "none",
"compression.level": -1,
"batch.num.messages": 10000,
"batch.size": 1000000,
"queue.buffering.max.ms": 5,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.kbytes": 1048576,
"message.send.max.retries": 2147483647,
"retry.backoff.ms": 100,
"socket.timeout.ms": 60000,
}
producer, err := kafka.NewProducer(config)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %s", err))
}
producer.Close()
}
Parameter Description and Optimization
acks Parameter Optimization
The acks parameter is used to control the confirmation mechanism when the producer sends messages. The default value of this parameter is -1, which means that after the message is sent to the leader broker, it is not returned until the Leader confirmation and the corresponding follower messages are all written. The acks parameter can be set to values of 0, 1, or -1. In cross-availability zone scenarios, and for topics with a higher number of replicas, the value of the acks parameter will affect the message's reliability and throughput.
In some online business message scenarios, where the throughput requirements are not high, you can set the acks parameter to -1 to ensure that the message is received and confirmed by all replicas before returning, thereby improving the message's reliability.
In scenarios involve big data, such as log collection, or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to increase throughput.
buffering Parameter Optimization (Caching)
By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
Therefore, you can set parameter to optimize the client's message sending throughput. For Confluent kafka Go, a default batching time of 5 ms is provided to buffer messages. If the message is small, you can increase the queue.buffering.max.ms time to an appropriate value.
Compression Parameter Optimization
Confluent Go supports the following compression parameters: none, gzip, snappy, lz4, and zstd.
In the Confluent Kafka Go client, the following compression algorithms are supported:
none: No compression algorithm.
gzip: Compress by GZIP algorithm.
snappy: Compress by Snappy algorithm.
lz4: Compress by LZ4 algorithm.
zstd: Compress by ZSTD algorithm.
To use a compression algorithm in the producer client, you need to set the compression.type parameter when creating the producer. For example, you can set compression.type to lz4 to compress by LZ4 algorithm. The compression algorithm's CPU compression and CPU decompression occur on the client side, which is a way of optimizing by trading computing power for bandwidth. However, the broker incurs extra computation costs for validating compressed messages, especially for Gzip compression, resulting in significant server's computation costs. The increased computation can lead to lower message processing capabilities for the broker and lower bandwidth throughput as a result, which may not be worth it in some cases. In such cases, the following approach is recommended:
1. Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
{"Compression","CompressionLZ4"}
2. On the producer side, send messageCompression as a normal message.
3. On the consumer side, read the message key to access the compression method used, and perform independent decompression.
Creating Producer Instance
If your application requires higher throughput, you can use asynchronous producer to increase the speed of message sending and utilize batch message sending to reduce network overhead and IO consumption. If your application requires higher reliability, you can use synchronous producer to ensure successful message delivery. Additionally, the ACK acknowledgement mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter optimization, refer to Producer Parameters and Optimization.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "1",
"compression.type": "none",
"batch.num.messages": "1000",
})
if err != nil {
fmt.Printf("Failed to create producer: %s\\n", err)
return
}
for i := 0; i < 10; i++ {
topic := "test-topic"
value := fmt.Sprintf("hello world %d", i)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}
p.Produce(message, nil)
}
p.Flush(15 * 1000)
p.Close()
}
Consumer Practice
Consumer Parameters and Optimization
Consumer Parameters
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset":"earliest",
"fetch.min.bytes":1,
"fetch.max.bytes":52428800,
"fetch.wait.max.ms":"500",
"enable.auto.commit":true,
"auto.commit.interval.ms":5000,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()
}
Parameter Description and Optimization
1. max.poll.interval.ms is a configuration parameter for Kafka consumer. It specifies the maximum delay between two poll operations by the consumer. Its primary function is to control the consumer's liveness, i.e., to determine whether the consumer is still active. If the consumer does not perform a poll operation within the time specified by max.poll.interval.ms, then Kafka considers this consumer to have failed and triggers a rebalance for the consumer. Adjust this parameter according to the actual consumption speed. If it is too low, the consumer may frequently trigger rebalances, increasing the burden on Kafka; if it is too high, Kafka may not be able to promptly detect issues with the consumer, thereby affecting message consumption.
2. For general consumption, issues mainly involves frequent rebalancing and consumption thread blocking. For parameter optimization, refer to the following method:
2.1 session.timeout.ms: For versions before v0.10.2, increase this parameter value appropriately, make it greater than the time it takes to consume a batch of data and not exceed 30 s. The recommended value is 25 s; for v0.10.2 and later versions, use the default value of 10 s.
2.2 heartbeat.interval.ms: Default value is 3 s. This value should be less than session.timeout.ms / 3.
2.3 max.poll.interval.ms: Default value is 5 minutes. If there are many partitions and consumers, it is recommended to appropriately increase this value. It should be greater than <max.poll.records> / (<number of messages consumed per second per thread> x <number of consumption threads>).
Note:
If you want to process messages synchronously, that is, pull a message, process it, and then pull the next one, you need to make the following modifications:
Increase the MaxProcessingTime according to your needs.
Monitor for processing times that exceed the MaxProcessingTime, sample and print timeout durations.
3. For automatic offset commit requests, it's recommended not to set auto.commit.interval.ms below 1,000 ms, as too frequent offset requests can cause high broker CPU usage, affecting the read and write operations of other normal services.
Creating Consumer Instance
Confluent Go provides a subscription model to create consumers, and includes manual-commit offset and auto-commit offset as two offset commit methods.
Auto-Commit Offsets
Auto-commit offsets: After pulling messages, the consumer automatically commit offsets without manual intervention. The advantage of this method is it's easy to use, but it may lead to duplicate message consumption or loss.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"auto.commit.interval.ms": 5000,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()
Manual-Commit Offsets
Manual-commit offsets: After processing messages, consumers need to manually commit offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. It should be noted that manual-commit offset can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit an offset after a certain number of messages.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 10000,
"heartbeat.interval.ms": 3000,
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\\n", err)
return
}
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Received message: %s\\n", string(e.Value))
c.CommitMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\\n", e)
}
}
c.Close()
문제 해결에 도움이 되었나요?