tencent cloud

피드백

VPC Access

마지막 업데이트 시간:2024-01-09 15:00:32

    Overview

    This document describes how to access CKafka to receive/send messages with the SDK for Java in a VPC.

    Prerequisites

    Directions

    Step 1. Prepare configurations

    1. Upload the javakafkademo in the downloaded demo to the Linux server.
    2. Log in to the Linux server, enter the javakafkademo directory, and configure related parameters.
    2.1 Add the following dependencies to the pom.xml file:
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.2</version>
    </dependency>
    
    2.2 Create a Kafka configuration file named kafka.properties.
    ## Configure the accessed network by copying the information in the **Network** column in the **Access Mode** section on the instance details** page in the console
    bootstrap.servers=xx.xx.xx.xx:xxxx
    ## Configure the topic by copying the information on the **Topic Management** page in the console
    topic=XXX
    ## Configure the consumer group as needed
    group.id=XXX
    
    Parameter
    Description
    bootstrap.servers
    Accessed network, which can be copied in the Network column in the Access Mode section on the instance details page in the console.
    
    
    
    topic
    Topic name, which can be copied from the Topic Management page in the console.
    
    
    
    group.id
    You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page.
    3. Create a configuration file loading program named CKafkaConfigurer.java.
    public class CKafkaConfigurer {
    
    private static Properties properties;
    
    public synchronized static Properties getCKafkaProperties() {
    if (null != properties) {
    return properties;
    }
    //Obtain the content of the configuration file `kafka.properties`
    Properties kafkaProperties = new Properties();
    try {
    kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
    } catch (Exception e){
    System.out.println("getCKafkaProperties error");
    }
    properties = kafkaProperties;
    return kafkaProperties;
    }
    }
    

    Step 2. Send messages

    1. Write a message production program named CKafkaProducerDemo.java.
    public class CKafkaProducerDemo {
    
    public static void main(String args[]) {
    //Load `kafka.properties`
    Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();
    
    Properties properties = new Properties();
    //Set the access point. Obtain the access point of the topic via the console.
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
    
    //Set the method for serializing Kafka messages. `StringSerializer` is used in this demo.
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    //Set the maximum request wait time
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
    //Set the number of retries for the client
    properties.put(ProducerConfig.RETRIES_CONFIG, 5);
    //Set the retry interval for the client.
    properties.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
    //Construct a producer object
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
    //Construct a Kafka message
    String topic = kafkaProperties.getProperty("topic"); //Topic of the message. Enter the topic you created in the console.
    String value = "this is ckafka msg value"; //Message content.
    
    try {
    //Batch obtaining future objects can speed up the process, but the batch size should not be too large.
    List<Future<RecordMetadata>> futureList = new ArrayList<>(128);
    for (int i = 0; i < 10; i++) {
    //Send the message and obtain a future object
    ProducerRecord<String, String> kafkaMsg = new ProducerRecord<>(topic,
    value + ": " + i);
    Future<RecordMetadata> metadataFuture = producer.send(kafkaMsg);
    futureList.add(metadataFuture);
    
    }
    producer.flush();
    for (Future<RecordMetadata> future : futureList) {
    //Sync the future object obtained
    RecordMetadata recordMetadata = future.get();
    System.out.println("produce send ok: " + recordMetadata.toString());
    }
    } catch (Exception e){
    //If the sending still fails after client internal retries, the system needs to report and handle the error
    System.out.println("error occurred");
    }
    }
    }
    
    2. Compile and run CKafkaProducerDemo.java to send the message.
    3. View the execution result.
    Produce ok:ckafka-topic-demo-0@198
    Produce ok:ckafka-topic-demo-0@199
    
    4. On the Topic Management tab page on the instance details page in the CKafka console, select the topic, and click More > Message Query to view the message just sent.
    
    
    

    Step 3. Consume messages

    1. Create a program named CKafkaConsumerDemo.java for a consumer to subscribe to messages.
    public class CKafkaConsumerDemo {
    
    public static void main(String args[]) {
    //Load `kafka.properties`
    Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();
    
    Properties props = new Properties();
    //Set the access point. Obtain the access point of the topic via the console.
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
    //Set the maximum interval between two polls
    //If the consumer does not return a heartbeat message within the interval, the broker will determine that the consumer is not alive, and then remove the consumer from the consumer group and trigger rebalancing. The default value is 30s.
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    //Set the maximum number of messages that can be polled at a time
    //Do not set this parameter to an excessively large value. If polled messages are not consumed before the next poll, load balancing is triggered and lagging occurs.
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
    //Set the method for deserializing messages
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    //The instances in the same consumer group consume messages in load balancing mode
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
    //Create a consumer object, which means generating a consumer instance
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    //Set one or more topics to which the consumer group subscribes
    //You are advised to configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics
    List<String> subscribedTopics = new ArrayList<>();
    //If you want to subscribe to multiple topics, add the topics here
    //You must create the topics in the console in advance.
    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic : topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
    
    //Consume messages in loop
    while (true) {
    try {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    //All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`
    //You are advised to create a separate thread to consume messages and then return the result in async mode
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(
    String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
    }
    } catch (Exception e){
    System.out.println("consumer error!");
    }
    }
    }
    }
    
    2. Compile and run CKafkaConsumerDemo.java to consume messages.
    3. View the execution result.
    Consume partition:0 offset:298
    Consume partition:0 offset:299
    4. On the Consumer Group tab page in the CKafka console, select the consumer group name, enter the topic name, and click View Details to view the consumption details.
    
    
    
    문의하기

    고객의 업무에 전용 서비스를 제공해드립니다.

    기술 지원

    더 많은 도움이 필요하시면, 티켓을 통해 연락 바랍니다. 티켓 서비스는 연중무휴 24시간 제공됩니다.

    연중무휴 24시간 전화 지원