tencent cloud

All product documents
TDMQ for CKafka
Using SDK to Receive/Send Message (Recommended)
Last updated: 2024-01-09 14:45:02
Using SDK to Receive/Send Message (Recommended)
Last updated: 2024-01-09 14:45:02

Overview

This document describes how to access CKafka to receive/send messages with the SDK for Java in a VPC.
For clients in other languages, see SDK Documentation.

Prerequisites

Directions

Step 1. Prepare configurations

1. Upload the downloaded demo to the Linux server under the same VPC, log in to the server, and enter the VPC directory under javakafkademo.
2. Modify kafka.properties in the resources directory under the VPC project.
## Configure the accessed network by copying the information in the **Network** column in the **Access Mode** section on the instance details** page in the console.
bootstrap.servers=xx.xx.xx.xx:xxxx
## Configure the topic by copying the information on the **Topic Management** page in the console
topic=XXX
## Configure the consumer group as needed
group.id=XXX
Parameter
Description
bootstrap.servers
Access network, which can be copied in the Network column in the Access Mode section on the instance details page in the console



topic
Topic name, which can be copied on the Topic Management page in the console.



group.id
You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page.

Step 2. Send messages

1. Compile and run the message production program CKafkaProducerDemo.java.
public class CKafkaProducerDemo {

public static void main(String args[]) {
//Load `kafka.properties`
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties properties = new Properties();
//Set the access point. Obtain the access point of the corresponding topic via the console
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

//Set the method for serializing Kafka messages. `StringSerializer` is used in this demo.
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//Set the maximum time to wait for a request
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//Set the number of retries for the client
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
//Set the interval between retries for the client
properties.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
//Construct a producer object
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

//Construct a Kafka message
String topic = kafkaProperties.getProperty("topic"); //Topic of the message. Enter the topic you created in the console
String value = "this is ckafka msg value"; //Message content.

try {
//Batch obtaining future objects can speed up the process, but the batch size should not be too large.
List<Future<RecordMetadata>> futureList = new ArrayList<>(128);
for (int i = 0; i < 10; i++) {
//Send the message and obtain a future object
ProducerRecord<String, String> kafkaMsg = new ProducerRecord<>(topic,
value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMsg);
futureList.add(metadataFuture);

}
producer.flush();
for (Future<RecordMetadata> future : futureList) {
//Sync the future object obtained
RecordMetadata recordMetadata = future.get();
System.out.println("produce send ok: " + recordMetadata.toString());
}
} catch (Exception e){
//If the sending still fails after client internal retries, the system needs to report and handle the error
System.out.println("error occurred");
}
}
}
2. View the execution result.
Produce ok:ckafka-topic-demo-0@198
Produce ok:ckafka-topic-demo-0@199
3. Go to the [CKafka console[(https://console.tencentcloud.com/ckafka!85c1cf838df0405887dc01b41e7972fc), select the Topic Management tab on the instance details page, select the target topic, and click More > Message Query to view the message just sent.


Step 3. Consume messages

1. Compile and run the message subscription program CKafkaConsumerDemo.java.
public class CKafkaConsumerDemo {

public static void main(String args[]) {
//Load `kafka.properties`
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties props = new Properties();
//Set the access point. Obtain the access point of the topic via the console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//Set the maximum interval between two polls
//If the consumer does not return a heartbeat message within the interval, the broker determines that the consumer is not alive. the broker removes the consumer from the consumer group and triggers rebalancing. The default value is 30s.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//Set the maximum number of messages that can be polled at a time
//Do not set this parameter to an excessively large value. If polled messages are not consumed before the next poll, load balancing is triggered and lagging occurs.
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//Set the method for deserializing messages
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//The instances in the same consumer group consume messages in load balancing mode
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//Construct a consumer object. This generates a consumer instance
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//Set one or more topics to which the consumer group subscribes
//You are advised to configure consumer instances with the same `GROUP_ID_CONFIG` value to subscribe to the same topics
List<String> subscribedTopics = new ArrayList<>();
//If you want to subscribe to multiple topics, add the topics here
//You must create the topics in the console in advance.
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);

//Consume messages in loop
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//All messages must be consumed before the next poll, and the total duration cannot exceed the timeout interval specified by `SESSION_TIMEOUT_MS_CONFIG`
//You are advised to create a separate thread to consume messages and then return the result in async mode
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (Exception e){
System.out.println("consumer error!");
}
}
}
}
2. View the execution result.
Consume partition:0 offset:298
Consume partition:0 offset:299
3. On the Consumer Group tab page in the CKafka console, select the corresponding consumer group name, enter the topic name, and click View Details to view the consumption details.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon