CKafka is compatible with production/consumption APIs of 0.9 or higher (currently purchasable versions include 2.4.1, 2.8.1, and 3.2.3). If integrating a legacy self-built Kafka (for example, version 0.8), you need to make corresponding modifications to the API. This article will compare the 0.8 version Kafka with higher versions from the production and consumption ends, and provide methods for modification.
Kafka Producer
Overview
In Kafka version 0.8.1, the Producer API was rewritten. This client is the officially recommended version, which has better performance and more features. The community will maintain the new version of the Producer API.
Comparison between New and Old Versions of Producer API
New version Producer API Demo
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));
producer.close();
Old version Producer API Demo
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
producer.close();
You can see that the method of use of the new and old versions is basically the same, with only some differences in parameter configuration. The price paid for modification is small.
Compatibility Note
For Kafka, the Producer API of version 0.8.x can all be connected to CKafka smoothly without modification. It is recommended to use the new version of Kafka Producer API.
Kafka Consumer
Overview
Two kinds of consumer APIs are provided in open-source Apache Kafka version 0.8, namely:
High Level Consumer API (disable configuration details)
Simple Consumer API (configuration details support specification)
Kafka version 0.9.x introduced the New Consumer, which fuses the characteristics of two kinds of Consumer APIs in the Old Consumer (version 0.8), reducing the load on ZooKeeper.
Therefore, the following context gives the way to convert the version 0.8 Consumer to the version 0.9 New Consumer.
Comparison between New and Old Versions of Consumer API
Version 0.8 Consumer API
If you only need data without considering message offset-related processing, the High Level API can meet general consumption requirements. The High Level Consumer API unfolds around the logical concept of Consumer Group, disables Offset management, and has Broker exception handling and Consumer Cloud Load Balancer (CLB) features. It enables developers to quickly get started with the Consumer client.
Pay attention to the following points when using the High Level Consumer:
If the number of consumption threads is larger than the count of Partitions, some consumption threads will fail to obtain data.
If the number of Partitions is larger than the number of threads, some threads will consume multiple Partitions.
Changes in Partition and consumers will impact Rebalance.
If the user cares about message offset and wishes to perform repeated consumption, skip read, or other features, or specify certain partitions for consumption and ensure more consumption semantics, the Low Level Consumer API is recommended. However, the user needs to handle Offset and Broker exceptions themselves.
When using the Low Level Consumer, pay attention to the following points:
Manually track and maintain Offset, control consumption progress.
Search for the Leader of the corresponding Partition of the Topic, and handle changes in the Partition.
Version 0.9 New Consumer API
Kafka version 0.9.x introduced the New Consumer, which fuses the characteristics of two kinds of Consumer APIs in the Old Consumer, provides both advanced API coordination and lower-level access for consumers, and builds custom consumption policies. The New Consumer also simplifies the consumer client, introduces a central Coordinator, resolves the Herd Effect and Split Brain issues caused by separate connections to ZooKeeper, and reduces the load on ZooKeeper.
Advantages:
Coordinator introduction
The current version of the High-Level Consumer has problems with Herd Effect and Split Brain. Placing failure detection and Rebalance logic into a highly available central Coordinator can solve these two issues. At the same time, it can greatly reduce the load on ZooKeeper.
Allow oneself to allocate Partition
In order to keep some status of each local partition unchanged, it is necessary to keep the mapping of Partition unchanged. Additionally, in some scenarios, it is to associate Consumer with region-related Broker.
Allow oneself to manage Offset.
You can manage Offset as needed to achieve semantics such as repeated and jump consumption.
Trigger the callback specified by the user after Rebalance
Non-blocking Consumer API
Differences in Functions between New and Old Versions of Consumer API
|
High Level Consumer | Before 0.9 | Supported | Not supported. | Supported | Supported | Supported | Herd Effect and Split Brain |
Simple Consumer | Before 0.9 | Not supported. | Supported | Not supported. | Not supported. | Not supported. | Need to process multiple abnormal situations. |
New Consumer | After 0.9 | Supported | Supported | Supported | Supported | Supported | Mature, current version recommended. |
Switch Old Consumer to New Consumer
New Consumer
//The main change in the config is that the ZooKeeper parameter has been replaced.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Compared with the old consumer, it is simpler to create a consumer here.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
Old Consumer (High Level)
// The old consumer requires ZooKeeper.
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test");
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
// Need to create a connector.
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
// Create a message stream
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("foo", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams =
connector.createMessageStreams(topicCountMap);
// Get data
KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
MessageAndMetadata<byte[], byte[]> msg = null;
while (iterator.hasNext()) {
msg = iterator.next();
System.out.println(//
" group " + props.get("group.id") + //
", partition " + msg.partition() + ", " + //
new String(msg.message()));
}
You can see that writing a New Consumer is simpler. The main change is replacing the input of ZooKeeper parameters with the input of Kafka addresses. Meanwhile, the New Consumer has also added parameter configurations for interacting with the Coordinator. Generally, using the default configuration is sufficient.
Compatibility Note
CKafka is consistent with the higher version of Kafka in the open-source community. It supports the rewritten New Consumer API and blocks the interaction between the Consumer client and Zookeeper (Zookeeper no longer exposes to users). The New Consumer addresses the Herd Effect and Split Brain issues that existed in the direct interaction with Zookeeper, and integrates the characteristics of the original Old Consumer, making the consumption process more reliable.