Overview
TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
This document describes the key parameters, practical tutorials and FAQs of the tRpc-Go-Kafka client.
Optimization Practice
tRpc-GO-Kafka encapsulates the open-source Kafka SDK, using features such as the tRPC-Go interceptor to integrate into the tRPC-Go ecosystem. Therefore, see Sarama Go for practical tutorial: FAQs
Producer Issues
1. When messages are produced via CKafka, the error Message contents does not match its CRC
occurs.
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
By default, the plugin enables gzip compression. Add the parameter compression=none
to the target to disable compression.
target: kafka://ip1:port1?compression=none
2. How to configure sequential production for the same user?
Add the partitioner
parameter to the client, with options including random (default), roundrobin, and hash (partitioning by key).
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. How to execute asynchronous production?
Add the async=1
parameter to the client
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. How to write data callback via asynchronous production?
You need to rewrite callback functions for the success/failure of asynchronous production writing data in the code, for example:
func init() {
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
}
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
}
}
Consumer Issues
1. What will happen if Handle is set to return non-nil during consumption?
It will sleep for 3 s and then retry consumption. This is not recommended, because returning an error will lead to infinite retries of consumption. Retry logic for failures should be set by business side.
2. When consuming messages with CKafka, the error client has run out of available brokers to talk to
occurs.
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
First, check if the brokers are reachable, then check the supported Kafka client version, and try adding parameters in the configuration file address, for example, version=0.10.2.0
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. When multiple producers are producing messages, will the failure of some producers in establishing a connection make the normal producers time out?
Update the version to v0.2.18. For lower versions, when a producer is created, plugins lock first, then establish a connection, and unlock thereafter. If there are some abnormal producers which take a long time to establish connection, it will cause other normal production requests to fail in locking when accessing producers, result in timeout eventually. This behavior has been fixed in v0.2.18.
4. When consuming messages, you receive a prompt: The provider group protocol type is incompatible with the other members
.
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
For the same consumer group, the client re-grouping strategy is different. You can modify the strategy
parameter, valid values including sticky (by default), range, and roundrobin.
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. How to inject custom configuration (remote configuration)?
If you need to specify your configuration in code, first configure trpc_go.yaml
with fake_address
, then use the kafka.RegisterAddrConfig
method for injection. Configure trpc_go.yaml
as follows:
Before the service is started, inject custom configuration:
func main() {
s := trpc.NewServer()
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
s.Serve()
}
6. How to access the underlying Sarama's context information?
By using kafka.GetRawSaramaContext, you can access the underlying Sarama ConsumerGroupSession and ConsumerGroupClaim. However, exposing these two APIs here is just to facilitate users to monitor logs. Only their read methods should be used, as calling any write methods here is undefined behavior, which may cause unknown results.
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
Instance:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
return nil
}
Overview
TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
This document describes the key parameters and best practices for the tRpc-Go-Kafka client, as well as FAQs.
Optimization Practice
tRPC-GO-Kafka encapsulates the open-source Kafka SDK, using features such as the tRPC-Go interceptor to integrate into the tRPC-Go ecosystem. Therefore, for best practices, refer to Sarama Go: FAQs
Producer Issues
1. When CKafka is used to produce messages, the error Message contents does not match its CRC
occurs.
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
By default, the plugin enables gzip compression. To disable compression, add the compression=none
parameter to the target.
target: kafka://ip1:port1?compression=none
2. How to configure sequential production for the same user?
Add the partitioner
parameter to the client, with options including random (default), roundrobin, and hash (partitioning by key).
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. How to execute asynchronous production?
Add the async=1
parameter to the client
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. How to write data callback via asynchronous production?
You need to rewrite callback functions for the success/failure of asynchronous production writing data in the code, for example:
import (
"git.code.oa.com/vicenteli/trpc-database/kafka"
)
func init() {
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
}
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
}
}
Consumer Issues
1. What will happen if Handle is set to return non-nil during consumption?
It will sleep for 3 s and then retry consumption. This is not recommended, because returning an error will lead to infinite retries of consumption. Retry logic for failures should be set by business side.
2. When messages are consumed with CKafka, the error client has run out of available brokers to talk to
occurs.
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
First, check if the brokers are reachable, then check the supported Kafka client version, and try adding parameters in the configuration file address, for example, version=0.10.2.0.
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. When multiple producers are producing messages, will the failure of some producers in establishing a connection make the normal producers time out?
Update the version to v0.2.18. For lower versions, when creating a producer, plugins lock first, then establish a connection, and unlock thereafter. If there are some abnormal producers which take a long time to establish connection, it will cause other normal production requests to fail in locking when accessing producers, result in timeout eventually. This behavior has been fixed in v0.2.18.
4. When messages are consumed, you receive a prompt: The provider group protocol type is incompatible with the other members
.
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
For the same consumer group, the client re-grouping strategy is different. You can modify the strategy
parameter, valid values including: sticky (default), range, roundrobin.
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. How to inject your custom configuration (remote configuration)?
If you need to specify your configuration in code, first configure trpc_go.yaml
with fake_address
, then use the kafka.RegisterAddrConfig
method for injection. Configure trpc_go.yaml
as follows:
Before the service is started, inject custom configuration:
func main() {
s := trpc.NewServer()
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
s.Serve()
}
6. How to access the underlying Sarama's context information?
By using kafka.GetRawSaramaContext, you can access the underlying Sarama ConsumerGroupSession and ConsumerGroupClaim. However, exposing these two APIs here is just to facilitate users to monitor logs. Only their read methods should be used, as calling any write methods here is undefined behavior, which may cause unknown results.
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
Instance:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
return nil
}
Was this page helpful?