package com.tencent.tcb.operation.ckafka.plain;
import com.google.common.collect.Lists;
import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
public class KafkaPlainConsumerDemo {
public static void main(String args[]) {
JavaKafkaConfigurer.configureSaslPlain();
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (NoOffsetForPartitionException e) {
System.out.println(e.getMessage());
Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
consumer.seek(new TopicPartition(topicStr, 0), 0);
consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));
consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
.offsetsForTimes(timestampsToSearch);
for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp entryValue = entry.getValue();
consumer.seek(topicPartition, entryValue.offset());
}
}
}
}
private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
boolean beginOrEnd) {
Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
List<TopicPartition> tp = new ArrayList<>();
Map<Integer, Long> map = new HashMap<>();
partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
Map<TopicPartition, Long> topicPartitionLongMap;
if (beginOrEnd) {
topicPartitionLongMap = consumer.beginningOffsets(tp);
} else {
topicPartitionLongMap = consumer.endOffsets(tp);
}
topicPartitionLongMap.forEach((key, beginOffset) -> {
int partition = key.partition();
map.put(partition, beginOffset);
});
return map;
}
}
本页内容是否解决了您的问题?