import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class KafkaProducerExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //Kafka集群的地址列表,格式为host1:port1,host2:port2。生产者会使用这个列表来找到集群并建立连接。props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置生产者关键参数以及默认值props.put(ProducerConfig.ACKS_CONFIG, "1");//acks,默认值为1,消息确认的级别。0表示不等待确认;1表示等待Leader副本写入;all或者-1表示等待所有副本写入。props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//batch.size,批量发送的大小,单位为字节。生产者会将多个消息打包成一个批次发送,以提高性能。默认大小16384字节。props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//buffer.memory,生产者用于缓存待发送消息的内存大小,单位为字节。默认33554432,也就是32MBprops.put(ProducerConfig.CLIENT_ID_CONFIG, "");//client.id,客户端ID。这个ID可以用于在服务端日志中识别消息来源。props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);//connections.max.idle.ms连接的最大空闲时间,单位为毫秒。超过这个时间的空闲连接会被关闭。默认540sprops.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);//delivery.timeout.ms消息的最大投递时间,单位为毫秒。超过这个时间的未确认消息会被认为发送失败。默认120sprops.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");//interceptor.classes拦截器类列表。生产者会在发送消息前后调用这些拦截器。props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//linger.ms延迟发送的时间,单位为毫秒。生产者会等待一段时间以便将更多消息打包成一个批次发送。props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);//max.block.ms,生产者在获取元数据或缓存空间时的最大阻塞时间,单位为毫秒。props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);//max.request.size,请求的最大大小,单位为字节props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);//metadata.max.age.ms元数据的最大寿命,单位为毫秒。超过这个时间的元数据会被刷新。props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");//metric.reporters度量报告器类列表。生产者会使用这些报告器来报告度量信息。props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");//partitioner.class分区器类。生产者会使用这个分区器来决定每个消息发送到哪个分区。props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);//receive.buffer.bytes接收缓冲区的大小,单位为字节。props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);//send.buffer.bytes发送缓冲区的大小,单位为字节。props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);//reconnect.backoff.max.ms重连最大间隔时间,单位为毫秒。props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);//reconnect.backoff.ms重连间隔时间,单位毫秒props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//request.timeout.ms请求的超时时间,单位为毫秒。props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);//retries发送失败时的重试次数。props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);//retry.backoff.ms重试的间隔时间,单位为毫秒。props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");//client.dns.lookupDNS查找策略。可选值为default、use_all_dns_ips、resolve_canonical_bootstrap_servers_only。// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "value-" + i;// 创建消息记录ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);// 发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功:key=" + key + ", value=" + value);} else {System.err.println("消息发送失败:" + exception.getMessage());}}});}// 关闭生产者producer.close();}}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。 //是否需要幂等,在事务场景下需要设置为trueprops.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。在事务场景不要超过5props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒,可以适当延长事务时间。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。
{"Compression","lz4"}
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class KafkaProducerSyncExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) {// 创建Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置生产者参数props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 同步发送消息for (int i = 0; i < 10; i++) {String key = "sync-key-" + i;String value = "sync-value-" + i;// 创建消息记录ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);try {// 发送消息并等待结果RecordMetadata metadata = producer.send(record).get();System.out.println("同步发送成功:key=" + key + ", value=" + value);} catch (InterruptedException | ExecutionException e) {System.err.println("同步发送失败:" + e.getMessage());}}// 关闭生产者producer.close();}}
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerAsyncExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";public static void main(String[] args) {// 创建Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置生产者参数props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 异步发送消息for (int i = 0; i < 10; i++) {String key = "async-key-" + i;String value = "async-value-" + i;// 创建消息记录ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);// 发送消息并设置回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("异步发送成功:key=" + key + ", value=" + value);} else {System.err.println("异步发送失败:" + exception.getMessage());}}});}// 关闭生产者producer.close();}}
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers",Kafka集群的地址,没有默认值properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer",消息键的反序列化方式,没有默认值properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer",消息值的反序列化方式,没有默认值properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id",消费者组ID,没有默认值properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset",位点不存在时的处理方式,"latest"表示从最新的消息开始消费,默认值为"latest"properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit",是否自动提交位点,默认值为"true"properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms",自动提交位点的间隔时间,单位为毫秒,默认值为"5000"properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms",消费者组成员的会话超时时间,单位为毫秒,默认值为"10000"properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records",单次poll的最大消息数,默认值为"500"properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms",两次poll操作间的最大允许间隔时间,单位为毫秒,默认值为"300000"properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes",服务器返回的最小数据,如果设置大于1,服务器会等待直到累计的数据量大于这个值,默认值为"1"properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes",服务器返回的最大数据量,单位为字节,默认值为"52428800"properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms",服务器等待满足fetch.min.bytes条件的最大时间,单位为毫秒,默认值为"500"properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms",心跳间隔时间,单位为毫秒,默认值为"3000"properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id",客户端ID,没有默认值properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms",客户端请求超时时间,单位为毫秒,默认值为"30000"KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} finally {consumer.close();}}}
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerAutoCommitExample {private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String TOPIC = "test-topic";private static final String GROUP_ID = "test-group";public static void main(String[] args) {// 创建Kafka消费者配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 开启自动提交位点props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 自动提交间隔,单位:5000毫秒// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(TOPIC));// 消费消息try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}} finally {// 关闭消费者consumer.close();}}}
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerManualCommitExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("test-topic"));int count = 0;try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());count++;if (count % 10 == 0) {consumer.commitSync();System.out.println("Committed offsets.");}}}} finally {consumer.close();}}}
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Arrays;import java.util.Properties;public class KafkaConsumerAssignAndSeekApp {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "my-topic";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);consumer.assign(Arrays.asList(partition0, partition1));// 设置消费的起始位点long startPosition0 = 10L;long startPosition1 = 20L;consumer.seek(partition0, startPosition0);consumer.seek(partition1, startPosition1);try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); // 手动提交位点}} finally {consumer.close();}}}
本页内容是否解决了您的问题?