tencent cloud

All product documents
Elastic MapReduce
Storing Kafka Data in HDFS or COS Through Flume
Last updated: 2025-02-12 16:42:49
Storing Kafka Data in HDFS or COS Through Flume
Last updated: 2025-02-12 16:42:49

Scenario Description

Collecting and saving the data in Kafka to HDFS or COS via Flume

Development Preparations

This task requires access to CKafka, so you need to create a CKafka instance first. For more information, see Message Queue CKafka.
Create an EMR cluster. When creating the EMR cluster, you need to select the Flume component on the software configuration page and enable access to COS on the basic configuration page.

Using the Kafka Toolkit in the EMR Cluster

First, you need to check the private IP and port number of CKafka. Log in to the CKafka console, select the CKafka instance you want to use, and find the private IP ($kafkaIP) and port number (generally 9092) in the basic information section. Create a topic named kafka_test on the topic management page.

Configuring Flume

1. Create a Flume configuration file kafka.properties.
vim kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink
# The following is for source configuration.
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# The following is for sink configuration.
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d (or cosn://bucket/xxx)
agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp=true
agent.sinks.hdfs_sink.hdfs.writeFormat=Text
# The following is for channel configuration.
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
2. Run Flume.
./bin/flume-ng agent --conf ./conf/ -f kafka.properties -n agent -Dflume.root.logger=INFO,console
3. Run Kafka producer.
[hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
test
hello

Testing

Enter information on the Kafka producer client and press Enter.
Check whether the corresponding directory and file hadoop fs -ls /data/flume/kafka/ have been generated in HDFS.

References

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon