Route Type: Public domain name access
, Access Mode: SASL_PLAINTEXT
.pom.xml
file:<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.6.4</version></dependency></dependencies>
ckafka_client_jaas.conf
and modify it with the user created on the User Management tab page.KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="yourinstance#yourusername"password="yourpassword";};
username
to a value in the format of instance ID
+ #
+ configured username
, and password
to a configured password.kafka.properties
.## Configure the accessed network by copying the information in the **Network** column in the **Access Mode** section on the instance details** page in the consolebootstrap.servers=ckafka-xxxxxxx## Configure the topic by copying the information on the **Topic Management** page in the consoletopic=XXX## Configure the consumer group as neededgroup.id=XXX## SASL configurationjava.security.auth.login.config.plain=/xxxx/ckafka_client_jaas.conf
Parameter | Description |
bootstrap.servers | Accessed network, which can be copied in the Network column in the Access Mode section on the instance details page in the console. |
topic | Topic name, which can be copied in Topic Management on the instance details page in the console. |
group.id | You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page. |
java.security.auth.login.config.plain | Enter the path of the JAAS configuration file ckafka_client_jaas.conf . |
CKafkaConfigurer.java
.public class CKafkaConfigurer {private static Properties properties;public static void configureSaslPlain() {//If you have used the `-D` parameter or another method to set the path, do not set it again hereif (null == System.getProperty("java.security.auth.login.config")) {// Replace `XXX` with your own pathSystem.setProperty("java.security.auth.login.config",getCKafkaProperties().getProperty("java.security.auth.login.config.plain"));}}public synchronized static Properties getCKafkaProperties() {if (null != properties) {return properties;}//Obtain the content of the configuration file `kafka.properties`Properties kafkaProperties = new Properties();try {kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));} catch (Exception e){System.out.println("getCKafkaProperties error");}properties = kafkaProperties;return kafkaProperties;}}
KafkaSaslProducerDemo.java
.public class KafkaSaslProducerDemo {public static void main(String[] args) {//Set the path to the JAAS configuration fileCKafkaConfigurer.configureSaslPlain();//Load `kafka.properties`Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();Properties props = new Properties();//Set the access point. Obtain the access point of the topic via the consoleprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaProperties.getProperty("bootstrap.servers"));//// Public network access through SASL_PLAINTEXT//props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");// Select `PLAIN` for the SASL mechanism.props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");//Set the method for serializing Kafka messagesprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//Set the maximum request wait timeprops.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);//Set the number of retries for the clientprops.put(ProducerConfig.RETRIES_CONFIG, 5);//Set the retry interval for the clientprops.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);// If `ack` is 0, the producer will not wait for acknowledgment from the broker, and the retry configuration will not take effect. If traffic throttling is triggered, the connection will be closed.// If `ack` is 1, the broker leader will directly return `ack` without waiting for acknowledgment from all broker followers// If `ack` is `all`, the broker leader will return `ack` only after receiving acknowledgment from all broker followersprops.put(ProducerConfig.ACKS_CONFIG, "all");//Create a producer object. Note: a producer object is thread-safe, and generally one producer object is sufficient for a process.KafkaProducer<String, String> producer = new KafkaProducer<>(props);//Create a Kafka messageString topic = kafkaProperties.getProperty("topic"); //Topic of the message. Enter the topic you created in the console.String value = "this is ckafka msg value"; //Content of the message.try {//Batch obtaining future objects can speed up the process, but the batch size should not be too largeList<Future<RecordMetadata>> futures = new ArrayList<>(128);for (int i = 0; i < 100; i++) {//Send the message and obtain a future objectProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic,value + ": " + i);Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);futures.add(metadataFuture);}producer.flush();for (Future<RecordMetadata> future : futures) {//Sync the result that the future object is obtainedRecordMetadata recordMetadata = future.get();System.out.println("Produce ok:" + recordMetadata.toString());}} catch (Exception e){//If the sending still fails after client internal retries, the system needs to report and handle the errorSystem.out.println("error occurred");}}}
KafkaSaslProducerDemo.java
to send messages.Produce ok:ckafka-topic-demo-0@198Produce ok:ckafka-topic-demo-0@199
KafkaSaslConsumerDemo.java
for a single consumer to subscribe to messages.public class KafkaSaslConsumerDemo {public static void main(String[] args) {//Set the path to the JAAS configuration fileCKafkaConfigurer.configureSaslPlain();//Load `kafka.properties`Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();Properties props = new Properties();//Set the access point. Obtain the access point of the topic via the console.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaProperties.getProperty("bootstrap.servers"));//// Public network access through SASL_PLAINTEXT//props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");// Select `PLAIN` for the SASL mechanismprops.put(SaslConfigs.SASL_MECHANISM, "PLAIN");// Set the consumer timeout period//If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// Set the maximum time interval between two polls// Before v0.10.1.0, these two concepts were mixed and both represented by `session.timeout.ms`props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);//Set the maximum number of messages that can be polled at a time//Do not set this parameter to an excessively large value. If polled messages are not all consumed before the next poll starts, load balancing is triggered and lagging occurs.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);//Set the method for deserializing messagesprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//Set the consumer group of the current consumer instance after you apply for one in the console//The instances in the same consumer group consume messages in load balancing modeprops.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));//Create a consumer object, which means generating a consumer instanceKafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//Set one or more topics to which the consumer group subscribes//You are advised to subscribe to the same topics if the values of GROUP_ID_CONFIG are the sameList<String> subscribedTopics = new ArrayList<String>();//If you want to subscribe to multiple topics, add the topics here//You must create the topics in the console in advanceString topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic : topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);//Consume messages in loopwhile (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);//All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(),record.offset()));}} catch (Exception e){System.out.println("consumer error!");}}}}
KafkaSaslConsumerDemo.java
to consume messages.Consume partition:0 offset:298Consume partition:0 offset:299
문제 해결에 도움이 되었나요?