Properties props = new Properties();props.put("bootstrap.servers", "localhost:4242");props.put("acks", "all");props.put("retries",0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(0), Integer.toString(0)));producer.close();
Properties props = new Properties();props.put("metadata.broker.list", "broker1:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "example.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);producer.send(data);producer.close();
种类 | 引入版本 | Offset 自动保存 | Offset 自行管理 | 自动进行异常处理 | Rebalance 自动处理 | Leader 自动查找 | 优缺点 |
High Level Consumer | Before 0.9 | 支持 | 不支持 | 支持 | 支持 | 支持 | Herd Effect 和 Split Brain |
Simple Consumer | Before 0.9 | 不支持 | 支持 | 不支持 | 不支持 | 不支持 | 需要处理多种异常情况 |
New Consumer | After 0.9 | 支持 | 支持 | 支持 | 支持 | 支持 | 成熟,当前版本推荐 |
//config中主要变化是 ZooKeeper 参数被替换了Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 相比old consumer 而言,这里创建消费者更加简单KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
// old consumer 需要 ZooKeeperProperties props = new Properties();props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "test");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");ConsumerConfig config = new ConsumerConfig(props);// 需要创建connectorConsumerConnector connector = Consumer.createJavaConsumerConnector(config);// 创建message streamMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put("foo", 1);Map<String, List<KafkaStream<byte[], byte[]>>> streams =connector.createMessageStreams(topicCountMap);// 获取数据KafkaStream<byte[], byte[]> stream = streams.get("foo").get(0);ConsumerIterator<byte[], byte[]> iterator = stream.iterator();MessageAndMetadata<byte[], byte[]> msg = null;while (iterator.hasNext()) {msg = iterator.next();System.out.println(//" group " + props.get("group.id") + //", partition " + msg.partition() + ", " + //new String(msg.message()));}
本页内容是否解决了您的问题?