tencent cloud

피드백

VPC Access Through SASL_SCRAM

마지막 업데이트 시간:2024-11-07 15:40:43

    Overview

    This document describes how to access CKafka to receive/send messages with the SDK for Java through SASL_SCRAM in a VPC.
    Note:
    SASL_SCRAM_256 (Supported only by instances of versions 1.1.1, 2.4.1, and 2.8.1. For existing instances, upgrade the broker minor version or submit a ticket for application.)
    SASL_SCRAM_512 (Supported only by instances of versions 1.1.1, 2.4.1, and 2.8.1. For existing instances, upgrade the broker minor version or submit a ticket for application.)

    Prerequisites

    You have installed JDK 1.8 or later.
    You have installed Maven 2.5 or later.
    You have downloaded the demo.

    Directions

    Step 1. Create resources in the console

    1. Create an access point.
    1.1 On the Instance List page in the CKafka console, click the target instance ID to enter the instance details page.
    1.2 In Basic Info > Access Mode, click Add a routing policy. In the pop-up window, configure the following items:
    Route Type: Select "Public domain name access".
    Access Mode: Select "SASL_SCRAM".
    
    2. Create a role.
    In ACL Policy Management > User Management, create a role and set the password.
    
    3. Create a topic.
    Create a topic on the Topic Management tab as instructed in Creating Topic.

    Step 2. Add the configuration file

    1. Add the following Java dependent library information to the pom.xml file:
    <dependencies>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.5</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.6.4</version>
    </dependency>
    </dependencies>
    2. Create a JAAS configuration file named ckafka_client_jaas.conf and modify it with the user created in ACL Policy Management > User Management.
    KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="yourinstance#yourusername"
    password="yourpassword";
    };
    Note:
    Set username to a value in the format of instance ID + # + configured username, and password to a configured password.
    3. Create a CKafka configuration file named kafka.properties.
    ## Configure the accessed network by copying the information in the **Network** column in the **Access Mode** section on the instance details page in the console.
    bootstrap.servers=xx.xx.xx.xx:xxxx
    ## Configure the topic by copying the information on the **Topic Management** page in the console
    topic=XXX
    ## Configure the consumer group as needed
    group.id=XXX
    ## SASL Configuration
    java.security.auth.login.config.plain=/xxxx/ckafka_client_jaas.conf
    Parameter
    Description
    bootstrap.servers
    Accessed network, which can be copied in the Network column in the Access Mode section on the instance details page in the console.
    
    topic
    Topic name, which can be copied in Topic Management on the instance details page in the console.
    
    
    
    group.id
    You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page.
    java.security.auth.login.config.plain
    Enter the path of the JAAS configuration file ckafka_client_jaas.conf.
    4. Create the configuration file loading program CKafkaConfigurer.java.
    public class CKafkaConfigurer {
    
    private static Properties properties;
    
    public static void configureSaslPlain() {
    // If you have used the `-D` parameter or another method to set the path, do not set it again here.
    if (null == System.getProperty("java.security.auth.login.config")) {
    // Replace `XXX` with your own path
    System.setProperty("java.security.auth.login.config",
    getCKafkaProperties().getProperty("java.security.auth.login.config.plain"));
    }
    }
    
    public synchronized static Properties getCKafkaProperties() {
    if (null != properties) {
    return properties;
    }
    // Get the content of the configuration file `kafka.properties`.
    Properties kafkaProperties = new Properties();
    try {
    kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
    } catch (Exception e){
    System.out.println("getCKafkaProperties error");
    }
    properties = kafkaProperties;
    return kafkaProperties;
    }
    }

    Step 3. Send messages

    1. Create a message sending program named KafkaSaslProducerDemo.java.
    public class KafkaSaslProducerDemo {
    
    public static void main(String[] args) {
    // Set the path of the JAAS configuration file.
    CKafkaConfigurer.configureSaslPlain();
    
    // Load `kafka.properties`.
    Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();
    
    Properties props = new Properties();
    // Set the access point. Get the access point of the corresponding topic in the console.
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    kafkaProperties.getProperty("bootstrap.servers"));
    
    //
    // Access through SASL_SCRAM
    //
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    // Select `PLAIN` for the SASL mechanism
    props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
    
    // Set the method for serializing Kafka messages.
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    // Set the maximum request wait time.
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
    // Set the number of retries for the client.
    props.put(ProducerConfig.RETRIES_CONFIG, 5);
    // Set the internal retry interval for the client.
    props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
    // If `ack` is 0, the producer will not wait for the acknowledgment from the broker, and the retry configuration will not take effect. If traffic throttling is triggered, the connection will be closed.
    // If `ack` is 1, the broker leader will directly return `ack` without waiting for acknowledgment from all broker followers.
    // If `ack` is `all`, the broker leader will return `ack` only after receiving acknowledgment from all broker followers.
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    // Construct a producer object. Note: The producer object is thread-safe. Generally, one producer object is enough for a process.
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    // Construct a CKafka message.
    String topic = kafkaProperties.getProperty("topic"); // Topic of the message. Enter the topic you created in the console.
    String value = "this is ckafka msg value"; // Message content
    
    try {
    // Obtaining the future objects in batches can accelerate the speed. Do not include too many objects in a batch.
    List<Future<RecordMetadata>> futures = new ArrayList<>(128);
    for (int i = 0; i < 100; i++) {
    // Send the message and obtain a future object.
    ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic,
    value + ": " + i);
    Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
    futures.add(metadataFuture);
    
    }
    producer.flush();
    for (Future<RecordMetadata> future : futures) {
    // Sync the obtained future object.
    RecordMetadata recordMetadata = future.get();
    System.out.println("Produce ok:" + recordMetadata.toString());
    }
    
    } catch (Exception e){
    // If the sending still fails after the internal retries in the client, the system needs to report and handle the error.
    System.out.println("error occurred");
    }
    }
    }
    2. Compile and run KafkaSaslProducerDemo.java to send the message.
    3. View the execution result (output).
    Produce ok:ckafka-topic-demo-0@198
    Produce ok:ckafka-topic-demo-0@199
    4. On the Topic Management tab page on the instance details page in the CKafka console, select the target topic and click More > Message Query to view the message just sent.
    
    
    

    Step 4. Consume messages

    1. Create a program named KafkaSaslConsumerDemo.java for a consumer to subscribe to messages.
    public class KafkaSaslConsumerDemo {
    
    public static void main(String[] args) {
    // Set the path of the JAAS configuration file.
    CKafkaConfigurer.configureSaslPlain();
    
    // Load `kafka.properties`.
    Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();
    
    Properties props = new Properties();
    // Set the access point. Obtain the access point of the corresponding topic in the console.
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    kafkaProperties.getProperty("bootstrap.servers"));
    
    //
    // Access through SASL_SCRAM
    //
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    // Select `PLAIN` for the SASL mechanism
    props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
    
    // Set the consumer timeout period.
    // If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    // Set the maximum time interval between two polls.
    // Before v0.10.1.0, these two concepts were mixed and were both represented by `session.timeout.ms`.
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
    // Set the maximum number of messages that can be polled at a time.
    // Do not set this parameter to an excessively large value. If polled messages are not all consumed before the next poll starts, load balancing is triggered and lagging occurs.
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
    // Set the method for deserializing messages.
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    // Set the consumer group for the current consumer instance. You need to apply for a consumer group in the console first.
    // The instances in the same consumer group consume messages in load balancing mode.
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
    // Construct a consumer object. This generates a consumer instance.
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    // Set one or more topics to which the consumer group subscribes.
    // We recommend that you configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics.
    List<String> subscribedTopics = new ArrayList<String>();
    // If you want to subscribe to multiple topics, add the topics here.
    // You must create these topics in the console in advance.
    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic : topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
    
    // Consume messages in loop
    while (true){
    try {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    // All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`.
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(
    String.format("Consume partition:%d offset:%d", record.partition(),
    record.offset()));
    }
    } catch (Exception e){
    System.out.println("consumer error!");
    }
    }
    }
    }
    2. Compile and run KafkaSaslConsumerDemo.java to consume the message.
    3. View the execution result.
    Consume partition:0 offset:298
    Consume partition:0 offset:299
    4. ‌On the Consumer Group page in the Ckafka console, click the triangle icon on the left of the target consumer group name, enter the topic name in the search box, and click View Details to view the consumption details.
    
    
    
    문의하기

    고객의 업무에 전용 서비스를 제공해드립니다.

    기술 지원

    더 많은 도움이 필요하시면, 티켓을 통해 연락 바랍니다. 티켓 서비스는 연중무휴 24시간 제공됩니다.

    연중무휴 24시간 전화 지원