tencent cloud

文档反馈

Java SDK

最后更新时间:2024-07-04 16:00:59

    背景

    TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
    Kafka Clients:Kafka 自带的客户端,通过 Java 实现,是 Kafka 生产和消费标准协议的客户端。
    本文着重介绍上述 Java 客户端的关键参数,实践教程以及常见问题。

    生产者实践

    版本选择

    Kafka 客户端和集群之间的兼容性非常重要,通常情况下,较新版本的客户端可以兼容较旧版本的集群,但反之则不一定成立。一般情况下,CKafka实例 Broker 在部署后是明确的,因此可以直接根据 Broker 的版本选择相匹配的客户端的版本。
    Java 生态中,广泛使用 Spring Kafka,其中 Spring Kafka 版本和 Kafka Broker 版本的对应关系,可以参见 Spring 官方网址的版本对应关系
    生产者参数与调优

    生产者参数

    在使用 Kafka Client 客户端写入 Kafka 时候,需要配置如下关键参数,相关的参数和默认值如下:
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 创建Kafka生产者配置
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //Kafka集群的地址列表,格式为host1:port1,host2:port2。生产者会使用这个列表来找到集群并建立连接。
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // 设置生产者关键参数以及默认值
    props.put(ProducerConfig.ACKS_CONFIG, "1");//acks,默认值为1,消息确认的级别。0表示不等待确认;1表示等待Leader副本写入;all或者-1表示等待所有副本写入。
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//batch.size,批量发送的大小,单位为字节。生产者会将多个消息打包成一个批次发送,以提高性能。默认大小16384字节。
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//buffer.memory,生产者用于缓存待发送消息的内存大小,单位为字节。默认33554432,也就是32MB
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "");//client.id,客户端ID。这个ID可以用于在服务端日志中识别消息来源。
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。
    props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);//connections.max.idle.ms连接的最大空闲时间,单位为毫秒。超过这个时间的空闲连接会被关闭。默认540s
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);//delivery.timeout.ms消息的最大投递时间,单位为毫秒。超过这个时间的未确认消息会被认为发送失败。默认120s
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");//interceptor.classes拦截器类列表。生产者会在发送消息前后调用这些拦截器。
    props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//linger.ms延迟发送的时间,单位为毫秒。生产者会等待一段时间以便将更多消息打包成一个批次发送。
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);//max.block.ms,生产者在获取元数据或缓存空间时的最大阻塞时间,单位为毫秒。
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);//max.request.size,请求的最大大小,单位为字节
    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);//metadata.max.age.ms元数据的最大寿命,单位为毫秒。超过这个时间的元数据会被刷新。
    props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");//metric.reporters度量报告器类列表。生产者会使用这些报告器来报告度量信息。
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");//partitioner.class分区器类。生产者会使用这个分区器来决定每个消息发送到哪个分区。
    props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);//receive.buffer.bytes接收缓冲区的大小,单位为字节。
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);//send.buffer.bytes发送缓冲区的大小,单位为字节。
    props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);//reconnect.backoff.max.ms重连最大间隔时间,单位为毫秒。
    props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);//reconnect.backoff.ms重连间隔时间,单位毫秒
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//request.timeout.ms请求的超时时间,单位为毫秒。
    props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);//retries发送失败时的重试次数。
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);//retry.backoff.ms重试的间隔时间,单位为毫秒。
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
    props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");//client.dns.lookupDNS查找策略。可选值为default、use_all_dns_ips、resolve_canonical_bootstrap_servers_only。
    
    // 创建生产者
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // 发送消息
    for (int i = 0; i < 100; i++) {
    String key = "key-" + i;
    String value = "value-" + i;
    
    // 创建消息记录
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    // 发送消息
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("消息发送成功:key=" + key + ", value=" + value);
    } else {
    System.err.println("消息发送失败:" + exception.getMessage());
    }
    }
    });
    }
    
    // 关闭生产者
    producer.close();
    }
    }

    参数说明调优

    关于 acks 参数优化

    acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为1,表示消息发送给 Leader Broker 后,Leader 确认消息写入后即返回。acks参数还有以下可选值:
    0: 不等待任何确认,直接返回。
    1: 等待 Leader 副本确认写入后返回。
    -1或者 all: 等待 Leader 副本以及相关的 Follower 副本确认写入后返回。
    由上可知,在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
    在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性,但是会牺牲写入吞吐和性能,时延会增加。
    在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐量。

    关于 Batch 相关参数优化

    默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
    因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。在高吞吐场景下,可以配合计算和设置:
    batch.size:默认16K。
    linger.ms:默认为0,可以适当增加耗时,如设置100ms,尽可能聚合更多消息批量发送消息。
    buffer.memory:默认32MB,对于大流量 Producer,在堆内存充足情况可以设置更大,如设置256MB。

    关于事务参数优化

    
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。 //是否需要幂等,在事务场景下需要设置为true
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。在事务场景不要超过5
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒,可以适当延长事务时间。
    需要强调,事务因为要保障消息的 exactly once 语义,因此会额外付出更多的计算资源。
    对于事务场景,可是适当增加事务超时时间,容忍高吞吐场景下,写入延时带来的抖动。

    关于压缩参数优化

    Kafka Java Client 支持如下压缩参数:
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。
    
    目前支持以下几种压缩配置:
    none:不使用压缩算法。
    gzip:使用 GZIP 压缩算法。
    snappy:使用 Snappy 压缩算法。
    lz4:使用 LZ4 压缩算法。
    zstd:使用 ZSTD 压缩算法。
    要在 Kafka Java 客户端中使用压缩消息,需要在创建生产者时设置 compression.type 参数。例如,要使用 LZ4 压缩算法,可以将compression.type 设置为lz4。
    Kafka 压缩消息是一种用计算换带宽的优化方式,虽然 Kafka 压缩消息的压缩和解压缩,发生在客户端,但是由于Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 gzip 压缩,服务端对该压缩消息校验的计算成本会非常大,在某种程度上可能会出现得不偿失的情况,因为计算的增加导致 Broker CPU 利用率很高,降低了其他请求的处理能力,导致整体性能更低。这种情况建议可以使用如下方式规避 Broker 的校验:
    1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
    {"Compression","lz4"}
    2. 在 Producer 端将 messageCompression 当成正常消息发送。
    3. 在 Consumer 段读取消息 key,获取使用的压缩方式,独立进行解压缩。

    创建生产者实例

    如果应用程序需要更高的吞吐量,可以使用异步发送,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,可以使用同步发送,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

    同步发送

    在 Kafka Java Client 客户端中,同步发送的示例代码如下:
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerSyncExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) {
    // 创建Kafka生产者配置
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // 设置生产者参数
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    // 创建生产者
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // 同步发送消息
    for (int i = 0; i < 10; i++) {
    String key = "sync-key-" + i;
    String value = "sync-value-" + i;
    
    // 创建消息记录
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    try {
    // 发送消息并等待结果
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("同步发送成功:key=" + key + ", value=" + value);
    } catch (InterruptedException | ExecutionException e) {
    System.err.println("同步发送失败:" + e.getMessage());
    }
    }
    
    // 关闭生产者
    producer.close();
    }
    }

    异步发送

    异步发送:异步发送消息时不会阻塞当前线程,生产者的吞吐量较高,但是需要通过回调函数来处理消息结果,示例如下:
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerAsyncExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    
    public static void main(String[] args) {
    // 创建Kafka生产者配置
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    // 设置生产者参数
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    // 创建生产者
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // 异步发送消息
    for (int i = 0; i < 10; i++) {
    String key = "async-key-" + i;
    String value = "async-value-" + i;
    
    // 创建消息记录
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
    
    // 发送消息并设置回调函数
    producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
    System.out.println("异步发送成功:key=" + key + ", value=" + value);
    } else {
    System.err.println("异步发送失败:" + exception.getMessage());
    }
    }
    });
    }
    
    // 关闭生产者
    producer.close();
    }
    }
    
    

    消费者实践

    消费者参数

    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerDemo {
    
    public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers",Kafka集群的地址,没有默认值
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer",消息键的反序列化方式,没有默认值
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer",消息值的反序列化方式,没有默认值
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id",消费者组ID,没有默认值
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset",位点不存在时的处理方式,"latest"表示从最新的消息开始消费,默认值为"latest"
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit",是否自动提交位点,默认值为"true"
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms",自动提交位点的间隔时间,单位为毫秒,默认值为"5000"
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms",消费者组成员的会话超时时间,单位为毫秒,默认值为"10000"
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records",单次poll的最大消息数,默认值为"500"
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms",两次poll操作间的最大允许间隔时间,单位为毫秒,默认值为"300000"
    properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes",服务器返回的最小数据,如果设置大于1,服务器会等待直到累计的数据量大于这个值,默认值为"1"
    properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes",服务器返回的最大数据量,单位为字节,默认值为"52428800"
    properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms",服务器等待满足fetch.min.bytes条件的最大时间,单位为毫秒,默认值为"500"
    properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms",心跳间隔时间,单位为毫秒,默认值为"3000"
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id",客户端ID,没有默认值
    properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms",客户端请求超时时间,单位为毫秒,默认值为"30000"
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("test-topic"));
    
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    }
    } finally {
    consumer.close();
    }
    }
    }

    参数调优

    1. 在使用 Kafka 消费者时,我们可以通过调整一些参数来优化性能。以下是一些常见的参数调优方案:
    fetch.min.bytes:如果不确定消息最低大小,这个参数建议设置为1,如果明确消息最小值,可以设置该值为最小消息大小。
    max.poll.records:这个参数可以根据应用的处理能力进行调整。如果您的应用可以处理更多的记录,可以将这个参数设置为更大的值,以减少 poll操作的次数。
    auto.commit.interval.ms:这个参数可以根据您的应用的需求进行调整,一般自动提交位点场景,建议保持默认值5000ms。注意,过于频繁的位点提交会影响性能,额外占用 Broker 的计算资源。
    client.id:可以为每个消费者设置一个唯一的 ID,以便在监控和日志中区分不同的消费者。
    以上是一些常见的参数调优方案,但具体的最佳设置可能会根据您的应用的特性和需求有所不同。在调优参数时,请记住始终进行性能测试,以确保您的设置可以达到预期的效果。
    2. 对于 rebalance 时间频繁和消费线程阻塞问题,参考以下说明参数优化:
    session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
    max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *<max.poll.interval.ms>的积。
    max.poll.interval.ms:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

    创建消费者实例

    Kafka Java Client 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

    自动提交位点

    自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。注意,自动提交位点时间间隔 auto.commit.interval.ms 不要设置太短,否则容易导致 Broker CPU 偏高,影响其他请求处理。
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerAutoCommitExample {
    
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "test-topic";
    private static final String GROUP_ID = "test-group";
    
    public static void main(String[] args) {
    // 创建Kafka消费者配置
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    // 开启自动提交位点
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 自动提交间隔,单位:5000毫秒
    
    // 创建消费者
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    // 订阅主题
    consumer.subscribe(Collections.singletonList(TOPIC));
    
    // 消费消息
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
    record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    }
    } finally {
    // 关闭消费者
    consumer.close();
    }
    }
    }
    
    

    手动提交位点

    手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerManualCommitExample {
    
    public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("test-topic"));
    
    int count = 0;
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    count++;
    if (count % 10 == 0) {
    consumer.commitSync();
    System.out.println("Committed offsets.");
    }
    }
    }
    } finally {
    consumer.close();
    }
    }
    }

    Assign 消费

    Kafka Java Client 的 assign 消费模式允许消费者直接指定订阅的分区,而不是通过订阅主题来自动分配分区。这种模式适用于需要手动控制消费分区的场景,例如:为了实现特定的负载均衡策略,或者在某些情况下跳过某些分区。一般流程为使用 assign 方法来手动指定消费者消费的分区,通过seek 方法来设置开始消费的位点,然后执行消费逻辑,使用示例如下:
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerAssignAndSeekApp {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("enable.auto.commit", "false");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    String topic = "my-topic";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    
    // 设置消费的起始位点
    long startPosition0 = 10L;
    long startPosition1 = 20L;
    consumer.seek(partition0, startPosition0);
    consumer.seek(partition1, startPosition1);
    
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync(); // 手动提交位点
    }
    } finally {
    consumer.close();
    }
    }
    }

    Kafka Java Client 生产消费常见问题

    Kafka Java Producer 无法成功发送消息
    首先排查 Kafka 集群的 IP 和端口能够正常连接,若不能请先解决通信问题。
    其次检查是否正确配置接入点,版本是否和 Broker 版本匹配,可以参考最佳实践的发送 demo。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持