tencent cloud

All product documents
TDMQ for CKafka
DocumentationTDMQ for CKafkaDevelopment GuideIntegrating Legacy Self-Built Kafka
Integrating Legacy Self-Built Kafka
Last updated: 2025-04-09 10:26:29
Integrating Legacy Self-Built Kafka
Last updated: 2025-04-09 10:26:29
CKafka is compatible with production/consumption APIs of 0.9 or higher (currently purchasable versions include 2.4.1, 2.8.1, and 3.2.3). If integrating a legacy self-built Kafka (for example, version 0.8), you need to make corresponding modifications to the API. This article will compare the 0.8 version Kafka with higher versions from the production and consumption ends, and provide methods for modification.

Kafka Producer

Overview

In Kafka version 0.8.1, the Producer API was rewritten. This client is the officially recommended version, which has better performance and more features. The community will maintain the new version of the Producer API.

Comparison between New and Old Versions of Producer API

New version Producer API Demo
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
producer.close();
Old version Producer API Demo
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
producer.close();
You can see that the method of use of the new and old versions is basically the same, with only some differences in parameter configuration. The price paid for modification is small.

Compatibility Note

For Kafka, the Producer API of version 0.8.x can all be connected to CKafka smoothly without modification. It is recommended to use the new version of Kafka Producer API.

Kafka Consumer

Overview

Two kinds of consumer APIs are provided in open-source Apache Kafka version 0.8, namely:
High Level Consumer API (disable configuration details)
Simple Consumer API (configuration details support specification)
Kafka version 0.9.x introduced the New Consumer, which fuses the characteristics of two kinds of Consumer APIs in the Old Consumer (version 0.8), reducing the load on ZooKeeper.
Therefore, the following context gives the way to convert the version 0.8 Consumer to the version 0.9 New Consumer.

Comparison between New and Old Versions of Consumer API

Version 0.8 Consumer API

High Level Consumer API (see Demo)
If you only need data without considering message offset-related processing, the High Level API can meet general consumption requirements. The High Level Consumer API unfolds around the logical concept of Consumer Group, disables Offset management, and has Broker exception handling and Consumer Cloud Load Balancer (CLB) features. It enables developers to quickly get started with the Consumer client.
Pay attention to the following points when using the High Level Consumer:
If the number of consumption threads is larger than the count of Partitions, some consumption threads will fail to obtain data.
If the number of Partitions is larger than the number of threads, some threads will consume multiple Partitions.
Changes in Partition and consumers will impact Rebalance.
Low Level Consumer API (See Demo)
If the user cares about message offset and wishes to perform repeated consumption, skip read, or other features, or specify certain partitions for consumption and ensure more consumption semantics, the Low Level Consumer API is recommended. However, the user needs to handle Offset and Broker exceptions themselves.
When using the Low Level Consumer, pay attention to the following points:
Manually track and maintain Offset, control consumption progress.
Search for the Leader of the corresponding Partition of the Topic, and handle changes in the Partition.

Version 0.9 New Consumer API

Kafka version 0.9.x introduced the New Consumer, which fuses the characteristics of two kinds of Consumer APIs in the Old Consumer, provides both advanced API coordination and lower-level access for consumers, and builds custom consumption policies. The New Consumer also simplifies the consumer client, introduces a central Coordinator, resolves the Herd Effect and Split Brain issues caused by separate connections to ZooKeeper, and reduces the load on ZooKeeper.
Advantages:
Coordinator introduction
The current version of the High-Level Consumer has problems with Herd Effect and Split Brain. Placing failure detection and Rebalance logic into a highly available central Coordinator can solve these two issues. At the same time, it can greatly reduce the load on ZooKeeper.
Allow oneself to allocate Partition
In order to keep some status of each local partition unchanged, it is necessary to keep the mapping of Partition unchanged. Additionally, in some scenarios, it is to associate Consumer with region-related Broker.
Allow oneself to manage Offset.
You can manage Offset as needed to achieve semantics such as repeated and jump consumption.
Trigger the callback specified by the user after Rebalance
Non-blocking Consumer API

Differences in Functions between New and Old Versions of Consumer API

Type
Introducing Version
Offset Automatically Save
Offset Manage Manually
Automatically Conduct Exception Handling
Rebalance Automatic Handling
Leader Automatic Search
Advantage and Disadvantage
High Level Consumer
Before 0.9
Supported
Not supported.
Supported
Supported
Supported
Herd Effect and Split Brain
Simple Consumer
Before 0.9
Not supported.
Supported
Not supported.
Not supported.
Not supported.
Need to process multiple abnormal situations.
New Consumer
After 0.9
Supported
Supported
Supported
Supported
Supported
Mature, current version recommended.

Switch Old Consumer to New Consumer

New Consumer
//The main change in the config is that the ZooKeeper parameter has been replaced.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Compared with the old consumer, it is simpler to create a consumer here.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
Old Consumer (High Level)
// The old consumer requires ZooKeeper.
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test");
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
// Need to create a connector.
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
// Create a message stream
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("foo", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams =
connector.createMessageStreams(topicCountMap);
// Get data
KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
MessageAndMetadata<byte[], byte[]> msg = null;
while (iterator.hasNext()) {
msg = iterator.next();
System.out.println(//
" group " + props.get("group.id") + //
", partition " + msg.partition() + ", " + //
new String(msg.message()));
}
You can see that writing a New Consumer is simpler. The main change is replacing the input of ZooKeeper parameters with the input of Kafka addresses. Meanwhile, the New Consumer has also added parameter configurations for interacting with the Coordinator. Generally, using the default configuration is sufficient.

Compatibility Note

CKafka is consistent with the higher version of Kafka in the open-source community. It supports the rewritten New Consumer API and blocks the interaction between the Consumer client and Zookeeper (Zookeeper no longer exposes to users). The New Consumer addresses the Herd Effect and Split Brain issues that existed in the direct interaction with Zookeeper, and integrates the characteristics of the original Old Consumer, making the consumption process more reliable.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon