kafkastore.bootstrap.servers=PLAINTEXT://xxxxkafkastore.topic=schemasdebug=true
kafkastore.bootstrap.servers=SASL_PLAINTEXT://ckafka-xxxx.ap-xxx.ckafka.tencentcloudmq.com:50004 kafkastore.security.protocol=SASL_PLAINTEXT kafkastore.sasl.mechanism=PLAIN kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ckafka-xxxx#xxxx' password='xxxx';kafkastore.topic=schemasdebug=true
<blockquote class="rno-document-tips rno-document-tips-explain"> <div class="rno-document-tips-body"> <i class="rno-document-tip-icon"></i> <div class="rno-document-tip-title">说明</div> <div class="rno-document-tip-desc"><p>bootstrap.servers:接入网络,在 <a href="https://console.tencentcloud.com/ckafka">CKafka 控制台</a> 的实例详情页面<strong>接入方式</strong>模块的网络列复制。<br> <img src="https://main.qcloudimg.com/raw/6b12eca18662d26a334d55b743c825ef.png" alt=""></p></div> </div></blockquote>
bin/schema-registry-start etc/schema-registry/schema-registry.properties
{"type": "record","name": "User","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "age", "type": "int"}]}
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \\--data '{"schema": "{\\"type\\": \\"record\\", \\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"id\\", \\"type\\": \\"int\\"}, {\\"name\\": \\"name\\", \\"type\\": \\"string\\"}, {\\"name\\": \\"age\\", \\"type\\": \\"int\\"}]}"}' \\http://127.0.0.1:8081/subjects/test/versions
package schemaTest;import java.util.Properties;import java.util.Random;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class SchemaProduce {public static final String USER_SCHEMA = "{\\"type\\": \\"record\\", \\"name\\": \\"User\\", " +"\\"fields\\": [{\\"name\\": \\"id\\", \\"type\\": \\"int\\"}, " +"{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}, {\\"name\\": \\"age\\", \\"type\\": \\"int\\"}]}";public static void main(String[] args) throws Exception {Properties props = new Properties();// 添加CKafka实例的接入地址props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 使用 Confluent 实现的 KafkaAvroSerializerprops.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");// 添加 schema 服务的地址,用于获取 schemaprops.put("schema.registry.url", "http://127.0.0.1:8081");Producer<String, GenericRecord> producer = new KafkaProducer<>(props);Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(USER_SCHEMA);Random rand = new Random();int id = 0;while(id < 100) {id++;String name = "name" + id;int age = rand.nextInt(40) + 1;GenericRecord user = new GenericData.Record(schema);user.put("id", id);user.put("name", name);user.put("age", age);ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test", user);producer.send(record);Thread.sleep(1000);}producer.close();}}
package schemaTest;import java.util.Collections;import java.util.Properties;import org.apache.avro.generic.GenericRecord;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class SchemaProduce {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx"); //CKafka实例的接入地址props.put("group.id", "schema");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 使用Confluent实现的KafkaAvroDeserializerprops.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://127.0.0.1:8081");KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test"));try {while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(10);for (ConsumerRecord<String, GenericRecord> record : records) {GenericRecord user = record.value();System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "+ "partition = " + record.partition() + ", " + "offset = " + record.offset());}}} finally {consumer.close();}}}
本页内容是否解决了您的问题?