tencent cloud

Feedback

Storing Kafka Data in HDFS or COS Through Flume

Last updated: 2025-02-12 16:42:49

    Scenario Description

    Collecting and saving the data in Kafka to HDFS or COS via Flume

    Development Preparations

    This task requires access to CKafka, so you need to create a CKafka instance first. For more information, see Message Queue CKafka.
    Create an EMR cluster. When creating the EMR cluster, you need to select the Flume component on the software configuration page and enable access to COS on the basic configuration page.

    Using the Kafka Toolkit in the EMR Cluster

    First, you need to check the private IP and port number of CKafka. Log in to the CKafka console, select the CKafka instance you want to use, and find the private IP ($kafkaIP) and port number (generally 9092) in the basic information section. Create a topic named kafka_test on the topic management page.

    Configuring Flume

    1. Create a Flume configuration file kafka.properties.
    vim kafka.properties
    agent.sources = kafka_source
    agent.channels = mem_channel
    agent.sinks = hdfs_sink
    # The following is for source configuration.
    agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafka_source.channels = mem_channel
    agent.sources.kafka_source.batchSize = 5000
    agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
    agent.sources.kafka_source.kafka.topics = kafka_test
    # The following is for sink configuration.
    agent.sinks.hdfs_sink.type = hdfs
    agent.sinks.hdfs_sink.channel = mem_channel
    agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d (or cosn://bucket/xxx)
    agent.sinks.hdfs_sink.hdfs.rollSize = 0
    agent.sinks.hdfs_sink.hdfs.rollCount = 0
    agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
    agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
    agent.sinks.hdfs_sink.hdfs.fileType=DataStream
    agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp=true
    agent.sinks.hdfs_sink.hdfs.writeFormat=Text
    # The following is for channel configuration.
    agent.channels.mem_channel.type = memory
    agent.channels.mem_channel.capacity = 100000
    agent.channels.mem_channel.transactionCapacity = 10000
    2. Run Flume.
    ./bin/flume-ng agent --conf ./conf/ -f kafka.properties -n agent -Dflume.root.logger=INFO,console
    3. Run Kafka producer.
    [hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
    test
    hello

    Testing

    Enter information on the Kafka producer client and press Enter.
    Check whether the corresponding directory and file hadoop fs -ls /data/flume/kafka/ have been generated in HDFS.

    References

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support