tencent cloud

All product documents
TDMQ for CKafka
Connecting Flume to CKafka
Last updated: 2024-01-09 14:56:36
Connecting Flume to CKafka
Last updated: 2024-01-09 14:56:36
Apache Flume is a distributed, reliable, and highly available log collection system that supports a wide variety of data sources such as HTTP, log files, JMS, and listening ports. It can efficiently collect, aggregate, move, and store massive amounts of log data to a specified storage system like Kafka, HDFS, and Solr search server.
Flume is structured as follows:

Agents are the smallest unit that runs independently in Flume. A Flume agent is a JVM composed of three main components: source, sink, and channel.



Flume and Kafka
When you store data in a downstream storage module or compute module such as HDFS or HBase, you need to consider a lot of complex factors such as the number of concurrent writes, system load, and network delay. As a flexible distributed system, Flume provides various APIs and customizable pipelines.
In the production process, Kafka can act as a cache when the production and consumption are at different paces. It has a high throughput thanks to the partition structure and data appending feature. It is also very fault-tolerant because of the replication structure.
Therefore, Flume and Kafka can work together to meet most requirements in production environments.

Connecting Flume to Open-Source Kafka

Preparations

Download Apache Flume (v1.6.0 or later is compatible with Kafka).
Download Kafka (v0.9.x or later is required as v0.8 is no longer supported).
Confirm that Kafka's source and sink components are already in Flume.

Connection method

Kafka can be used as a source or sink to import or export messages.
Using Kafka as a Source
Using Kafka as a Sink
Configure Kafka as the message source, that is, pull data as a consumer from Kafka into a specified sink. The main configuration items are as follows:
Configuration Item
Description
channels
The configured channel
type
It must be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers
Kafka broker server address
kafka.consumer.group.id
ID of Kafka's consumer group
kafka.topics
Data source topics in Kafka
batchSize
Size of each write into the channel
batchDurationMillis
The maximum write interval
Sample:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
For more information, visit Apache Flume's official website.
Configure Kafka as the message receiver, that is, push data to the Kafka server as a producer for subsequent operations. The main configuration items are as follows:
Configuration Item
Description
channel
The configured channel
type
It must be org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers
Kafka broker server
kafka.topics
Data target topics in Kafka
kafka.flumeBatchSize
Size of each written batch
kafka.producer.acks
Production policy of Kafka producer
Sample:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
For more information, visit Apache Flume's official website.

Connecting Flume to CKafka

Using CKafka as a Sink
Using CKafka as a Source

Step 1. Obtain the CKafka instance access address

1. Log in to the CKafka console.
2. Select Instance List on the left sidebar and click the ID of the target instance to enter the instance details page.
3. You can obtain the instance access address in the Access Mode module on the Basic Info tab page.




Step 2. Create a topic

1. On the instance details page, select the Topic Management tab at the top.
2. On the topic management page, click Create to create a topic named flume_test.




Step 3. Configure Flume

1. Download the Apache Flume toolkit and decompress it.
2. Write the configuration file flume-kafka-sink.properties. Below is a simple demo (configured in the conf folder in the extracted directory) for Java. If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. In this demo, the source is tail -F flume-test, which is the newly added information in the file.



The sample code is as shown below:
# Demo for using Kafka as the sink
agentckafka.source = exectail
agentckafka.channels = memoryChannel
agentckafka.sinks = kafkaSink

# Set the source type based on different requirements. If you have a special source, you can configure it by yourself. The simplest example is used here.
agentckafka.sources.exectail.type = exec
agentckafka.sources.exetail.command = tail -F ./flume.test
agentckafka.sources.exectail.batchSize = 20
# Set the source channel
agentckafka.sources.exectail.channels = memoryChannel

# Set the sink type. It is set to Kafka here
agentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# Set the ip:port provided by CKafka
agentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # Configure the instance IP address
# Set the topic to which data is to be imported. Create the topic in the CKafka console in advance
agentckafka.sinks.kafkaSink.topic = flume test #Configure the topic
# Set the sink channel
agentckafka.sinks.kafkaSink.channel = memoryChannel

# Use the default configuration for the channel
# Each channel's type is defined
agentckafka.channels.memoryChannel.type = memory
agentckafka.channels.memoryChannel.keep-alive = 10

# Other config values specific to each type of channel (sink or source) can be defined as well
# In this case, it specifies the capacity of the memory channel
agentckafka.channels.memoryChannel.capacity = 1000
agentckafka.channels.memoryChannel.transactionCapacity = 1000
3. Run the following command to start Flume:
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
4. Write messages to the flume-test file. At this time, the messages will be written by Flume to CKafka.



5. Start the CKafka client for consumption.
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
Note:
Enter the access address of the CKafka instance just created for the bootstrap-server field and the name of the topic just created for topic.
You can see that the messages have been consumed.




Step 1. Obtain the CKafka instance access address

1. Log in to the CKafka console.
2. Select Instance List on the left sidebar and click the ID of the target instance to enter the instance details page.
3. You can obtain the instance access address in the Access Mode module on the Basic Info tab page.




Step 2. Create a topic

1. On the instance details page, select the Topic Management tab at the top.
2. On the topic management page, click Create to create a topic named flume_test.



Step 3. Configure Flume

1. Download the Apache Flume toolkit‌ and decompress it.
2. Write the configuration file flume-kafka-source.properties. Below is a simple demo (configured in the conf folder in the extracted directory). If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. The sink is logger in this example.



3. Run the following command to start Flume:
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
4. View the logger output information. The default path is logs/flume.log.



Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon