tencent cloud

Feedback

tRpc Go SDK

Last updated: 2024-07-04 16:00:59

    Overview

    TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
    This document describes the key parameters, practical tutorials and FAQs of the tRpc-Go-Kafka client.

    Optimization Practice

    tRpc-GO-Kafka encapsulates the open-source Kafka SDK, using features such as the tRPC-Go interceptor to integrate into the tRPC-Go ecosystem. Therefore, see Sarama Go for practical tutorial:

    FAQs

    Producer Issues

    1. When messages are produced via CKafka, the error Message contents does not match its CRC occurs.
    err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
    By default, the plugin enables gzip compression. Add the parameter compression=none to the target to disable compression.
    target: kafka://ip1:port1?compression=none
    2. How to configure sequential production for the same user?
    Add the partitioner parameter to the client, with options including random (default), roundrobin, and hash (partitioning by key).
    target: kafka://ip1:port1?clientid=xxx&partitioner=hash
    3. How to execute asynchronous production?
    Add the async=1 parameter to the client
    target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
    4. How to write data callback via asynchronous production?
    You need to rewrite callback functions for the success/failure of asynchronous production writing data in the code, for example:
    import (
    "git.code.oa.com/vicenteli/trpc-database/kafka"
    )
    
    func init() {
    // Rewrite the default callback for failures of asynchronous production writing data.
    kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer occurred error.
    }
    
    // Rewrite the default callback for successes of asynchronous production writing data.
    kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer succeed.
    }
    }

    Consumer Issues

    1. What will happen if Handle is set to return non-nil during consumption?
    It will sleep for 3 s and then retry consumption. This is not recommended, because returning an error will lead to infinite retries of consumption. Retry logic for failures should be set by business side.
    2. When consuming messages with CKafka, the error client has run out of available brokers to talk to occurs.
    kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    First, check if the brokers are reachable, then check the supported Kafka client version, and try adding parameters in the configuration file address, for example, version=0.10.2.0
    address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
    3. When multiple producers are producing messages, will the failure of some producers in establishing a connection make the normal producers time out?
    Update the version to v0.2.18. For lower versions, when a producer is created, plugins lock first, then establish a connection, and unlock thereafter. If there are some abnormal producers which take a long time to establish connection, it will cause other normal production requests to fail in locking when accessing producers, result in timeout eventually. This behavior has been fixed in v0.2.18.
    4. When consuming messages, you receive a prompt: The provider group protocol type is incompatible with the other members .
    kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
    For the same consumer group, the client re-grouping strategy is different. You can modify the strategy parameter, valid values including sticky (by default), range, and roundrobin.
    address: ip1:port1?topics=topic12&group=my-group&strategy=range
    5. How to inject custom configuration (remote configuration)?
    If you need to specify your configuration in code, first configure trpc_go.yaml with fake_address, then use the kafka.RegisterAddrConfig method for injection. Configure trpc_go.yaml as follows:
    address: fake_address
    Before the service is started, inject custom configuration:
    func main() {
    s := trpc.NewServer()
    // Use custom addr, which should be injected before the server starts
    cfg := kafka.GetDefaultConfig()
    cfg.Brokers = []string{"127.0.0.1:9092"}
    cfg.Topics = []string{"test_topic"}
    kafka.RegisterAddrConfig("fake_address", cfg)
    kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
    
    s.Serve()
    }
    6. How to access the underlying Sarama's context information?
    By using kafka.GetRawSaramaContext, you can access the underlying Sarama ConsumerGroupSession and ConsumerGroupClaim. However, exposing these two APIs here is just to facilitate users to monitor logs. Only their read methods should be used, as calling any write methods here is undefined behavior, which may cause unknown results.
    // RawSaramaContext stores Sarama ConsumerGroupSession and ConsumerGroupClaim
    // Export this structure to facilitate user monitoring. The content provided is for read-only purposes, as calling any write methods is undefined behavior.
    type RawSaramaContext struct {
    Session sarama.ConsumerGroupSession
    Claim sarama.ConsumerGroupClaim
    }
    Instance:
    func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
    if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
    log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
    }
    // ...
    return nil
    }
    
    
    

    Overview

    TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
    This document describes the key parameters and best practices for the tRpc-Go-Kafka client, as well as FAQs.

    Optimization Practice

    tRPC-GO-Kafka encapsulates the open-source Kafka SDK, using features such as the tRPC-Go interceptor to integrate into the tRPC-Go ecosystem. Therefore, for best practices, refer to Sarama Go:

    FAQs

    Producer Issues

    1. When CKafka is used to produce messages, the error Message contents does not match its CRC occurs.
    err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
    By default, the plugin enables gzip compression. To disable compression, add the compression=none parameter to the target.
    target: kafka://ip1:port1?compression=none
    2. How to configure sequential production for the same user?
    Add the partitioner parameter to the client, with options including random (default), roundrobin, and hash (partitioning by key).
    target: kafka://ip1:port1?clientid=xxx&partitioner=hash
    3. How to execute asynchronous production?
    Add the async=1 parameter to the client
    target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
    4. How to write data callback via asynchronous production?
    You need to rewrite callback functions for the success/failure of asynchronous production writing data in the code, for example:
    import (
    "git.code.oa.com/vicenteli/trpc-database/kafka"
    )
    
    func init() {
    // Rewrite the default callback for failures of asynchronous production writing data.
    kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer occurred error.
    }
    
    // Rewrite the default callback for successes of asynchronous production writing data.
    kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer succeed.
    }
    }

    Consumer Issues

    1. What will happen if Handle is set to return non-nil during consumption?
    It will sleep for 3 s and then retry consumption. This is not recommended, because returning an error will lead to infinite retries of consumption. Retry logic for failures should be set by business side.
    2. When messages are consumed with CKafka, the error client has run out of available brokers to talk to occurs.
    kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    First, check if the brokers are reachable, then check the supported Kafka client version, and try adding parameters in the configuration file address, for example, version=0.10.2.0.
    address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
    3. When multiple producers are producing messages, will the failure of some producers in establishing a connection make the normal producers time out?
    Update the version to v0.2.18. For lower versions, when creating a producer, plugins lock first, then establish a connection, and unlock thereafter. If there are some abnormal producers which take a long time to establish connection, it will cause other normal production requests to fail in locking when accessing producers, result in timeout eventually. This behavior has been fixed in v0.2.18.
    4. When messages are consumed, you receive a prompt: The provider group protocol type is incompatible with the other members .
    kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
    For the same consumer group, the client re-grouping strategy is different. You can modify the strategy parameter, valid values including: sticky (default), range, roundrobin.
    address: ip1:port1?topics=topic12&group=my-group&strategy=range
    5. How to inject your custom configuration (remote configuration)?
    If you need to specify your configuration in code, first configure trpc_go.yaml with fake_address, then use the kafka.RegisterAddrConfig method for injection. Configure trpc_go.yaml as follows:
    address: fake_address
    Before the service is started, inject custom configuration:
    func main() {
    s := trpc.NewServer()
    // Use custom addr, which should be injected before the server is started.
    cfg := kafka.GetDefaultConfig()
    cfg.Brokers = []string{"127.0.0.1:9092"}
    cfg.Topics = []string{"test_topic"}
    kafka.RegisterAddrConfig("fake_address", cfg)
    kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
    
    s.Serve()
    }
    6. How to access the underlying Sarama's context information?
    By using kafka.GetRawSaramaContext, you can access the underlying Sarama ConsumerGroupSession and ConsumerGroupClaim. However, exposing these two APIs here is just to facilitate users to monitor logs. Only their read methods should be used, as calling any write methods here is undefined behavior, which may cause unknown results.
    // RawSaramaContext stores sarama ConsumerGroupSession and ConsumerGroupClaim
    // Export this structure to facilitate user monitoring. The content provided is for read-only purposes, as calling any write methods is undefined behavior.
    type RawSaramaContext struct {
    Session sarama.ConsumerGroupSession
    Claim sarama.ConsumerGroupClaim
    }
    Instance:
    func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
    if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
    log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
    }
    // ...
    return nil
    }
    
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support