auto.offset.reset
parameter.auto.offset.reset
?auto.offset.reset
parameter defines where to start consumption if the offset of the partition to be consumed cannot be obtained. For example, it specifies how the offset will be initialized if no offset is configured for the broker (such as upon initial consumption or when the offset expired for more than seven days) or how it will be reset if the error OFFSET_OUT_OF_RANGE occurs.auto.offset.reset
parameter has the following valid values:earliest
: Reset to the minimum offset in the partition.latest
: Reset to the maximum offset in the partition. This is the default value.none
: Report an OffsetOutOfRangeException
exception without resetting the offset.OFFSET_OUT_OF_RANGE
occur? LogStartOffset
and LogEndOffset
of partition 1 in topicA
are 100 and 300 respectively, but the offset committed by the client is less than 100 or greater than 300, then the broker will return this error, and the offset will be reset.auto.offset.reset=none
Descriptioncatch
.auto.offset.reset
is set to none
, automatic offset reset can be avoided; however, as the automatic reset mechanism is disabled, when a new partition is added, the client doesn't know where to start consuming the new partition, and an exception will occur. In this case, you need to manually set a consumer group offset and start consuming.auto.offset.reset
to none
for the consumer, you need to capture the exception NoOffsetForPartitionException
and set the offset in catch
on your own. You can select one of the following methods based on your actual business needs:package com.tencent.tcb.operation.ckafka.plain;import com.google.common.collect.Lists;import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;import java.time.Instant;import java.time.temporal.ChronoUnit;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.Properties;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;import org.apache.kafka.clients.consumer.OffsetAndTimestamp;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.config.SaslConfigs;public class KafkaPlainConsumerDemo {public static void main(String args[]) {// Set the path of the JAAS configuration file.JavaKafkaConfigurer.configureSaslPlain();// Load `kafka.properties`.Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();Properties props = new Properties();// Set the access point. Obtain the access point of the corresponding topic in the console.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));// Set the access protocol.props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");// Set the PLAIN mechanism.props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");// Set the maximum interval between two polls.// If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// Set the maximum number of messages that can be polled at a time.// Do not set this parameter to an excessively large value. If polled messages are not all consumed before the next poll starts, load balancing is triggered and lagging occurs.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);// Set the method for deserializing messages.props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// Set the consumer group for the current consumer instance. You need to apply for a consumer group in the console first.// The instances in the same consumer group consume messages in load balancing mode.props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));// Consumption offset Note If `auto.offset.reset` is set to `none`, the consumer group will report an error for failing to find the offset in its first consumption. Therefore, you need to manually set the offset in `catch`.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");// Construct a consumer object. This generates a consumer instance.KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// Set one or more topics to which the consumer group subscribes.// We recommend that you configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics.List<String> subscribedTopics = new ArrayList<String>();// If you want to subscribe to multiple topics, add the topics here.// You must create the topics in the console in advance.String topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic : topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);// Consume messages in loop.while (true){try {ConsumerRecords<String, String> records = consumer.poll(1000);// All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`. We recommend that you create a separate thread to consume messages and then return the result in async mode.for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));}} catch (NoOffsetForPartitionException e) {System.out.println(e.getMessage());// If you set `auto.offset.reset` to `none`, you need to capture the exception and set the offset on your own. You can select one of the following methods based on your actual business needs:// Sample 1. Specify the offset. You need to maintain the offset, which is convenient for retries.Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);consumer.seek(new TopicPartition(topicStr, 0), 0);// Sample 2. Specify to start consumption from the beginningconsumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));// Sample 3. Use the nearest available offsetconsumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));// Sample 4. Obtain and set the offset based on the timestamp. For example, reset the offset to 10 minutes ago.Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();timestampsToSearch.put(new TopicPartition(topicStr, 0), value);Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampsToSearch);for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap.entrySet()) {TopicPartition topicPartition = entry.getKey();OffsetAndTimestamp entryValue = entry.getValue();consumer.seek(topicPartition, entryValue.offset()); // Specify the offset. You need to maintain the offset, which is convenient for retries}}}}/*** Get the earliest and nearest offsets of the topic* @param consumer* @param topicStr* @param beginOrEnd true begin; false end* @return*/private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,boolean beginOrEnd) {Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);List<TopicPartition> tp = new ArrayList<>();Map<Integer, Long> map = new HashMap<>();partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));Map<TopicPartition, Long> topicPartitionLongMap;if (beginOrEnd) {topicPartitionLongMap = consumer.beginningOffsets(tp);} else {topicPartitionLongMap = consumer.endOffsets(tp);}topicPartitionLongMap.forEach((key, beginOffset) -> {int partition = key.partition();map.put(partition, beginOffset);});return map;}}
Was this page helpful?