This document describes the practical tutorial of producing and consuming messages in TDMQ for CKafka to reduce the possibilities of errors in message consumption.
Messages Production
Recommendations for Topic Use
Configuration requirements: It is recommended to use an integral multiple of nodes as replicas to reduce the data skew problem. The minimum number of in-sync replicas should be 2, and the number of in-sync replicas cannot be equal to the number of topic replicas; otherwise, the failure of 1 replica will result in the inability to produce messages.
Creation method: You can choose whether to enable the switch for CKafka's automatic topic creation. After it is enabled, a topic with 3 partitions and 2 replicas will be automatically created when a topic that has not been created is produced or consumed.
Estimating Number of Partitions
To achieve as balanced data distribution as possible, it is recommended that the number of partitions is an integral multiple of the number of nodes. Also, based on the estimated traffic, set the number of partitions at 1 MB/s per partition. For example, for a topic with a throughput of 100 MB, it is recommended to set the number of partitions to 100.
Retry upon Failure
In a distributed environment, due to network and other issues, messages may occasionally fail to be sent. This is probably because the message has been sent successfully but the ACK mechanism failed, or because the message indeed has not been sent successfully.
You can set the following retry parameters based on your business needs:
|
retries | Number of retries, with the default value being 3. For applications that have zero tolerance for data loss, you can consider setting it to Integer.MAX_VALUE (valid and maximum). |
retry.backoff.ms | Retry interval. We recommend you to set it to 1000. |
By doing so, you will be able to deal with the issue where the broker's leader partition can't respond to producer requests immediately.
Asynchronous Sending
The sending API is asynchronous. If you want to receive the result of sending, you can access the sending results using the callback API provided in the send method.
One Producer Corresponding to One App
A producer is thread-safe and can send messages to any topics. Generally, we recommend that one application correspond to one producer.
Acks
Kafka's ACK mechanism refers to the producer's mechanism for acknowledgment of message sending. It is set to Acks on version Kafka 0.10.x or request.required.acks on version 0.8.x. The setting of Acks directly affects the throughput of the Kafka cluster and the reliability of messages.
The Acks parameters are described as follows:
|
acks=0 | No response from the server is required. In this case, the performance is high, but the risk of data loss is great. |
acks=1 | A response will be returned after the primary server node writes successfully. In this case, the performance is moderate, and the risk of data loss is also moderate. A failure of the primary node may cause data loss. |
acks=all | A response will be returned only after the primary server node writes successfully and the nodes in ISR sync successfully. The performance is poor, but the risk of data loss is small. Only a failure of both the primary and secondary nodes will cause data loss. |
We recommend you to select acks=1 generally and select acks=all for important services.
Batch
A TDMQ for CKafka topic generally has multiple partitions. Before the producer client sends a message to the server, it needs to determine the topic and the partition to which the message should be sent. When sending multiple messages to the same partition, the producer client will package the messages into a batch and then send the batch to the server. Additional overheads will be incurred when the batch is processed. In general, a small batch will cause the producer client to generate a large number of requests, which will cause the messages to queue up on the client and the server, lead to an increase in the corresponding device's CPU utilization, and thus increase the delays in message sending and consumption. A suitable batch size can reduce the number of requests sent to the server by the client during message sending, thereby improving the throughput and reducing the delay in message sending.
The batch parameters are described as follows:
|
batch.size | Size of cached messages sent to each partition (which is the sum of bytes of the messages rather than the number of messages). Once the set value is reached, a network request will be triggered, and the producer client will send the messages to the server in batch. |
linger.ms | Maximum amount of time each message is cached. After this time elapses, the producer client will ignore the limit of the batch.size and immediately send the messages to the server. |
buffer.memory | Once the total size of all cached messages exceeds this value, messages will be sent to the server, and the limits of batch.size and linger.ms will be ignored. The default value for buffer.memory is 32 MB, which ensures sufficient performance for a single Producer. |
Note:
If you start multiple producers in the same JVM, it is likely that each producer will use 32 MB of cache space. In this case, OOM (Out of Memory) errors may occur, and you need to consider the value of buffer.memory in order to avoid OOM errors.
You can adjust the values of the parameters based on your specific business needs. The timing when the Producer client sends messages to the server in batches is determined by both batch.size and linger.ms, and you can adjust it based on your specific business needs. To enhance the performance of sending and ensure service stability, it is recommended to set batch.size=16384 and linger.ms=1000.
Key and Value
Each message in TDMQ for CKafka has two fields: key (message identifier) and value (message content).
For ease of tracking, set a unique key for each message, which allows you to track a message and print its sending and consumption logs to learn about its production and consumption conditions.
If you want to send a large number of messages, we recommend you to use the sticky partitioning policy instead of setting keys.
Sticky Partitioning
Only messages sent to the same partition will be placed in the same batch, so one factor that determines how a batch will be formed is the partitioning policy set by the Kafka producer. The Kafka producer allows you to choose a partition that suits your business by setting the partitioner implementation class. If a key is specified for a message, the default policy of the Kafka producer is to hash the message key and then select a partition based on the result of hashing to ensure that messages with the same key are sent to the same partition.
If no Key is specified for a message, the default policy of TDMQ for Kafka below v2.4 is to use all partitions in the topic in loops and send the message to each partition in a round robin manner. However, this default policy has a poor batch performance and may produce a large number of small batches, which increases actual delays. As it was inefficient in partitioning messages without a key, Kafka 2.4 introduced the sticky partitioning policy.
The sticky partitioning policy mainly addresses the problem of small batches caused by the distribution of messages without a key into different partitions. The main practice is to randomly select another partition and use it as much as possible for subsequent messages after the batch is completed for a partition. With this policy, messages will be sent to the same partition in the short run, but from the perspective of the entire execution, messages will be evenly sent to different partitions, which helps avoid skewed partitions while reducing delays and improving the overall service performance.
If a Kafka producer client is used on v2.4 or later, the default partitioning policy is sticky partitioning. If you use an older producer client, you can implement a partitioning policy on your own based on how the sticky partitioning policy works and then make it take effect through the partitioner.class parameter.
For more information on how to implement the sticky partitioning policy, see the following implementation of Java code. The code is implemented by switching from one partition to another at certain time intervals.
public class MyStickyPartitioner implements Partitioner {
private long lastPartitionChangeTimeMillis = 0L;
private int currentPartition = -1;
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
Order in Partition
Within a single partition, messages are stored in the order in which they are sent. Each topic is divided into a number of partitions. If messages are distributed to different partitions, the cross-partition message order cannot be ensured.
If you want messages to be consumed in the sending order, you can specify keys for such messages on the producer. If such messages are sent with the same key, CKafka will select a partition for their storage based on the hash of the key. As a partition can be listened on and consumed by only one consumer, messages will be consumed in the sending order.
Practice Tutorial for CKafka in Ordered Message Scenarios
Ordered Message Scenarios
In CKafka, the main method to ensure message order relies on its partition design and the use of message keys. Sequential message scenarios on the client side can be divided into two categories: global order and partition order. The practice tutorial for CKafka in these scenarios is as follows:
1. Global order: To ensure global order, you need to set the Topic partition to 1 in the CKafka console. The number of replicas can be specified by customers based on their specific use cases and availability requirements to balance costs, with a recommended setting of 2.
Note:
Global order has a throughput limit due to the single partition. Therefore, overall throughput will not be very high. For single partition throughput metrics, see Use Limits. 2. Partition order: To ensure partition order, you can estimate the business traffic of the Topic in the CKafka console, divide it by the single partition traffic, and round the value to obtain the number of partitions. To avoid data skew, the number of partitions should preferably be rounded to a multiple of the number of nodes, ensuring a reasonable final number of partitions. For single partition throughput metrics, see Use Limits. When sending Kafka messages, you need to specify a key. Then, Kafka can calculate a hash value to ensure that messages with the same key are sent to the same partition, thereby ensuring that these messages are ordered within the partition. Note:
Business keys should be distributed as much as possible. If messages are produced with the same key, the partition order will degenerate into global order, thereby reducing overall write throughput.
Practice Tutorial for Parameters
Due to the requirement for ordered messages to be both sequential and non-duplicate, when the producer parameters use the default settings and Kafka message producers send messages in scenarios, such as network jitters, changes in Kafka broker nodes, and partition leader elections, they may encounter issues, such as message duplication and disorder. Therefore, in ordered message scenarios, Kafka producer parameters should be specially set. The key settings are as follows:
enable.idempotence
enable.idempotence indicates whether to enable the idempotence feature. It is recommended to enable the idempotence feature in ordered message scenarios to address issues, such as partition message disorder and duplication. It is recommended that Kafka message producers set enable.idempotence to true. Note that this feature requires that the Kafka broker version be 0.11 or above. That is, the value of Kafka versions should be greater than or equal to 0.11. Since Kafka 3.0, Kafka producers have enable.idempotence=true and acks=all by default. If the Kafka version is 0.11 or above and below 3.0, idempotence is disabled by default. Therefore, in ordered message scenarios, it is recommended to explicitly specify this parameter value to ensure that idempotence is enabled.
acks
Once idempotence is enabled, acks should be explicitly set to all. If it is not set to all, it will fail parameter validation and an error will occur.
max.in.flight.requests.per.connection
By default, Kafka producers will try to send records as quickly as possible. max.in.flight.requests.per.connection indicates the maximum number of requests that can be sent simultaneously for a connection. The default value is 5. If the Kafka version is 0.11 or above and below 1.1 (that is, the value of Kafka versions is greater than or equal to 0.11 and less than 1.1), no optimization is implemented for this aspect in the Kafka broker. Therefore, max.in.flight.requests.per.connection should be set to 1. If the Kafka version is 1.1 or above, throughput optimization will be implemented in the idempotence scenario. The broker maintains a queue to verify the order of messages in 5 concurrent batches, allowing max.in.flight.requests.per.connection to be set to 5 but no more than 5.
Therefore, it is recommended:
Kafka 0.11 or above and below 1.1: Explicitly set max.in.flight.requests.per.connection to 1.
Kafka 1.1 or above: Explicitly set max.in.flight.requests.per.connection to a value from 1 to 5 (including 5). The recommended value is 5.
retries
In ordered message scenarios, it is recommended to specify the retries parameter. The retries parameter has different default values in different versions. For Kafka 2.0 or below, the default value is 0. For Kafka 2.1 or above, the default value is Integer.MAX_VALUE, that is, 2147483647. It is recommended to explicitly set it to Integer.MAX_VALUE in ordered message scenarios.
Summary
In ordered message scenarios, the producer parameters to be enabled are as follows:
Kafka 0.11 or above and below 1.1:
// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "1");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));
Kafka 1.1 or above:
// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "5");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));
Data skew
Kafka Broker data skew problems are often caused by uneven partition distribution or uneven key distribution of producer-sent data, leading to several types of issues:
1. Overall traffic is not rate-limited, but individual nodes have rate limits;
2. Some nodes are overloaded too quickly, leading to low overall Kafka usage and affecting total throughput.
To avoid such issues, you can optimize them in the following ways:
3. Use a reasonable number of partitions, ensuring that the number of partitions is an integral multiple of the number of nodes.
4. Use a reasonable partitioning policy, for example: RoundRobin, Range, and Sticky or a custom partitioning policy to deliver messages evenly.
5. Check if keys are used for sending. If so, try to design a policy to make keys more evenly distributed across partitions.
Message Consumption
Basic Message Consumption Process
1. Poll the data.
2. Execute the consumption logic.
3. Poll the data again.
Load Balancing
Each consumer group can contain multiple consumers with the same group.id value. In this way, consumers in the same consumer group consume the same subscribed topic.
For example, if consumer group A subscribes to topic A and enables three consumer instances C1, C2, and C3, then each message sent to topic A will be eventually delivered to only one of the three instances. By default, CKafka will evenly distribute messages to the consumer instances to achieve consumption load balancing.
The internal principle of CKafka load balancing is to evenly allocate the partitions of the subscribed topic to each consumer. Therefore, the number of consumers should not exceed the number of partitions; otherwise, there will be consumer instances that are not allocated any partitions and are in an idle state. Try to ensure that the number of consumers can be evenly divided by the total number of partitions. Apart from the first startup, rebalance will be triggered whenever consumer instances are restarted, increased, decreased, or the number of partitions changes.
Frequent Rebalancing
If rebalance is triggered frequently, there may be several possible causes:
1. The consumer takes a long time to process messages.
2. The consumption of a particular abnormal message causes the consumer to block or fail.
3. Heartbeat timeout triggers a rebalance.
4. For client versions before v0.10.2: The consumer did not have an independent thread to maintain the heartbeat, instead, heartbeat maintenance was coupled with the poll API. As a result, if the user experiences consumption congestion, it will lead to consumer heartbeat timeout, and trigger a rebalance. For client versions v0.10.2 and later: If the consumption takes too long, and no messages is polled within a certain period (value set by max.poll.interval.ms, with the default value being 5 minutes), the client will actively leave the Queue, triggering a rebalance.
This can be solved by methods such as optimizing consumption processing to increase consumption speed and adjusting parameters:
1. The consumer side needs to be consistent with the broker version.
2. Adjust parameter values according to the following instructions:
session.timeout.ms: For versions before v0.10.2, increase this parameter value appropriately, making it greater than the time it takes to consume a batch of data and not exceed 30 s. It is recommended to set the value to 25 s for v0.10.2 and default value of 10 s for later versions.
max.poll.records: Reduce this parameter value. The recommended value is much less than the product of <the number of messages consumed per second per thread> <the number of consumption threads> <max.poll.interval.ms>.
max.poll.interval.ms: This value should be greater than <max.poll.records> / (<the number of messages consumed per second per thread> x <the number of consumption threads>).
3. Increase the client's consumption speed as much as possible, handle consumption logic in a separate thread, and monitor for any time-consuming operations.
4. Reduce the number of topics that a group subscribes to. It's best for a group not to subscribe to more than 5 topics, ideally subscribing to only one topic.
Subscription Relationship
We recommend that all consumer instances in the same consumer group subscribe to the same topic so as to facilitate troubleshooting.
A Consumer Group Subscribes to Multiple Topics
A consumer group can subscribe to multiple topics, and the messages in such topics will be consumed evenly by the consumers in the consumer group. For example, if consumer group A subscribes to topics A, B, and C, then messages in the three topics will be consumed evenly by the consumers in consumer group A.
Below is the sample code to make a consumer group subscribe to multiple topics:
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
A Topic is Subscribed to by Multiple Consumer Groups
A topic can be subscribed to by multiple consumer groups, and the consumers in such consumer groups independently consume all messages in the topic. For example, if both consumer groups A and B subscribe to topic A, each message sent to topic A will be delivered to both the consumer instances in consumer group A and the consumer instances in consumer group B, and the two processes are independent of each other.
One Consumer Group Corresponding to One Application
We recommended that one consumer group corresponds to one application, i.e., different applications correspond to different sets of code. If you need to write different sets of code in the same application, prepare multiple kafka.properties files, for example, kafka1.properties and kafka2.properties.
Consumer Offset
Each topic has multiple partitions, and each partition counts the total number of current messages, which is called the MaxOffset.
Consumers in TDMQ for CKafka consume each message in a partition in order, recording the number of messages consumed, known as the ConsumerOffset.
The number of remaining unconsumed messages (also known as message backlog) = MaxOffset - ConsumerOffset.
Offset Commit
There are two parameters related to TDMQ for CKafka consumers:
enable.auto.commit: The default value is true.
auto.commit.interval.ms: The default value is 5,000, i.e., 5 s.
As a result of the combination of the two parameters, before the client polls the data, it will always check the time of the last committed offset first, and if the time defined by the auto.commit.interval.ms parameter has elapsed, the client will start an offset commit.
Therefore, if enable.auto.commit is set to true, it is always necessary to ensure that the data polled last time has been consumed before data polling; otherwise, the offset may be skipped.
If you want to control offset commits by yourself, set enable.auto.commit to false and call the commit (offsets) function.
Note:
Try to avoid committing offsets too frequently; otherwise, it will cause high CPU usage on the broker, affecting normal services. For example, set auto.commit.interval.ms to 100 ms for automatic offset commit. For manually offset commit, commit an offset for every message consumed in high throughput scenarios.
Offset Reset
The ConsumerOffset will be reset in the following two scenarios:
The server has no committed offsets (for example, when the client is started for the first time).
A message is pulled from an invalid offset (for example, the MaxOffset in a partition is 10, but the client pulls a message from offset 11).
For a Java client, you can configure a resetting policy through auto.offset.reset. There are three policies:
latest: Consumption will start from the maximum offset.
earliest: Consumption will start from the minimum offset.
none: No resetting will be performed.
Note:
We recommend you to set the resetting policy to latest instead of earliest, so as to avoid starting consumption from the beginning when the offset is invalid, as that may cause a lot of repetitions.
If you manage the offset by yourself, you can set the policy to none.
Message Pull
In the consumption process, the client pulls messages from the server. When the client pulls large messages, you should control the pulling speed and pay attention to the following parameters:
max.poll.records: Set it to 1 if a single message exceeds 1 MB in size.
max.partition.fetch.bytes: Set it to a value slightly greater than the size of a single message.
fetch.max.bytes: Set it to a value slightly greater than the size of a single message.
When messages are consumed over the public network, a disconnection may often occur due to the bandwidth limit of the public network. In this case, you should control the pulling speed and pay attention to the following parameters:
fetch.max.bytes: It is recommended to set it to half of the public network bandwidth (note that the unit of this parameter is bytes, while the unit of public network bandwidth is bits)
max.partition.fetch.bytes: We recommend you to set it to half of the public network bandwidth (note that the unit of this parameter is bytes, while the unit of the public network bandwidth is bits).
Pulling Large Messages
In the consumption process, the client pulls messages from the server. When pulling large messages, you should control the pulling speed and change the configuration:
max.poll.records: The maximum number of messages polled each time. If a single message exceeds 1 MB, it is recommended to set it to 1.
fetch.max.bytes: Set it to a value slightly greater than the size of a single message.
max.partition.fetch.bytes: Set it to a value slightly greater than the size of a single message.
The essence of pulling large messages is to fetch them one by one.
Duplicate Messages and Consumption Idempotency
TDMQ for CKafka consumption uses the at-least-once semantics, that is, a message is delivered at least once, which guarantees that messages will never be lost. However, messages may be delivered more than once. Network issues and client restarts may cause a few duplicate messages. If the application consumer is sensitive to duplicate messages (such as orders and transactions), idempotency should be implemented for the messages.
Taking a database application as an example, the common practice is as follows:
When sending a message, pass in the key as the unique ID.
When consuming a message, determine whether the key has already been consumed; if so, ignore it; otherwise, consume it once.
Of course, if the application itself is not sensitive to a small number of duplicate messages, idempotency is not necessary.
Consumption Failure
In TDMQ for CKafka, messages are consumed from partitions one by one in sequence. If the consumer fails to execute the consumption logic after getting a message, for example, when dirty data is stored on the application server, the message will fail to be processed, and human intervention will be required. There are two methods for dealing with this situation:
The consumer will keep trying to execute the consumption logic upon failure. This method may cause the consumer thread to be jammed by the current message and lead to message heap.
As TDMQ for CKafka is not designed to process failed messages, in practice, it will typically print failed messages or store them in a service (such as a dedicated topic created for storing failed messages), so that you can regularly check failed messages, analyze the causes of failures, and process accordingly.
Consumption Delay
In the consumption process, the client pulls messages from the server. In general, if the client can consume messages timely, there will be no significant delays. If a high delay is detected, you should first check whether messages heap up and speed up consumption accordingly.
Consumption Heap
The common causes of message heap include the following:
Consumption is slower than production. In this case, you should speed up consumption.
Consumption is jammed.
After a message is received, the consumption end will execute the consumption logic and generally make some remote calls. If it waits for the results at the same time, the consumption process may be jammed.
It should be ensured as much as possible that the consumer will not jam the consumption threads. If it needs to wait for the call results, we recommend you to set a wait timeout period, so that the consumer will be treated as a failure after the timeout period elapses.
Speeding up Consumption
Increase the number of consumer instances to improve parallel processing capabilities. If the ratio of consumers to partitions reaches 1:1, you can increase the number of partitions. (Note: In scenarios where flink automatically maintains partitions, new partitions cannot be automatically detected. Code modifications and a restart thereafter may be required after new partitions are added.) You can increase it directly in the process (ensuring each instance corresponds to one thread), or deploy multiple consumer instance processes.
Note:
After the number of instances exceeds the number of partitions, you can't add more instances; otherwise, there will be idle consumer instances.
Increase the number of consumer threads.
1. Define a thread pool.
2. Poll the data.
3. Commit the data into the thread pool for concurrent processing.
4. Poll the data again after a successful concurrent processing result is returned.
Socket Buffers
The default value of the receive.buffer.bytes parameter in Kafka 0.10.x is 64 KB, while the default value of the socket.receive.buffer.bytes parameter in Kafka 0.8.x is 100 KB.
Both the default values are too small for high-throughput environments, especially when the bandwidth-delay product of the network between the broker and the consumer is greater than that of the local area network (LAN).
For high-bandwidth networks with a delay of 1 ms or more and (such as 10 Gbps or higher), it is recommended to set the socket buffer size to either 8 or 16 MB.
Even if you don't have enough memory, you should consider setting this parameter to at least 1 MB. You can also set it to -1, so that the underlying operating system will adjust the buffer size based on the actual network conditions.
However, for consumers that need to start hot partitions, automatic adjustment may not be that fast.
Message Broadcasting
CKafka currently does not support the semantics of message broadcasting. It can be simulated by creating different groups.
Message Filtering
CKafka does not have semantics for message filtering. In practice, you can adopt the following two methods:
If there are not many categories to filter, use multiple topics to achieve the purpose of filtering.
If there are many categories to filter, it is best to filter on the client's business layer.
In practice, choose method according to specific business circumstances, or use a combination of the two methods mentioned above.
No Consumption in Some Partitions
During the consumption process, the consumer may be online but the offsets of some partitions does not progress. Possible causes are as follows:
1. An exception message is reported, which could be an oversized message or a format exception, causing the consumer to convert it into a business offset when pulling messages.
2. When using public network with limited bandwidth, pulling large messages immediately fills up the bandwidth, resulting in failure to pull messages within the timeout period.
3. Consumer hangs, leading to no message pulling.
Solution:
Shut down the consumer, set the offset in the CKafka console to skip some exception messages, or optimize the consumption code and restart consumer consumption thereafter.
Was this page helpful?