Configuration Item | Description |
channels | The configured channel |
type | It must be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | Kafka broker server address |
kafka.consumer.group.id | ID of Kafka's consumer group |
kafka.topics | Data source topics in Kafka |
batchSize | Size of each write into the channel |
batchDurationMillis | The maximum write interval |
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSourcetier1.sources.source1.channels = channel1tier1.sources.source1.batchSize = 5000tier1.sources.source1.batchDurationMillis = 2000tier1.sources.source1.kafka.bootstrap.servers = localhost:9092tier1.sources.source1.kafka.topics = test1, test2tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Configuration Item | Description |
channel | The configured channel |
type | It must be org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | Kafka broker server |
kafka.topics | Data target topics in Kafka |
kafka.flumeBatchSize | Size of each written batch |
kafka.producer.acks | Production policy of Kafka producer |
a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = mytopica1.sinks.k1.kafka.bootstrap.servers = localhost:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1
flume_test
.flume-kafka-sink.properties
. Below is a simple demo (configured in the conf
folder in the extracted directory) for Java. If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. In this demo, the source is tail -F flume-test
, which is the newly added information in the file.# Demo for using Kafka as the sinkagentckafka.source = exectailagentckafka.channels = memoryChannelagentckafka.sinks = kafkaSink# Set the source type based on different requirements. If you have a special source, you can configure it by yourself. The simplest example is used here.agentckafka.sources.exectail.type = execagentckafka.sources.exetail.command = tail -F ./flume.testagentckafka.sources.exectail.batchSize = 20# Set the source channelagentckafka.sources.exectail.channels = memoryChannel# Set the sink type. It is set to Kafka hereagentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink# Set the ip:port provided by CKafkaagentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # Configure the instance IP address# Set the topic to which data is to be imported. Create the topic in the CKafka console in advanceagentckafka.sinks.kafkaSink.topic = flume test #Configure the topic# Set the sink channelagentckafka.sinks.kafkaSink.channel = memoryChannel# Use the default configuration for the channel# Each channel's type is definedagentckafka.channels.memoryChannel.type = memoryagentckafka.channels.memoryChannel.keep-alive = 10# Other config values specific to each type of channel (sink or source) can be defined as well# In this case, it specifies the capacity of the memory channelagentckafka.channels.memoryChannel.capacity = 1000agentckafka.channels.memoryChannel.transactionCapacity = 1000
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
flume-test
file. At this time, the messages will be written by Flume to CKafka../kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
bootstrap-server
field and the name of the topic just created for topic
.flume_test
.flume-kafka-source.properties
. Below is a simple demo (configured in the conf
folder in the extracted directory). If there is no special requirement, simply replace your own instance IP address and topic in the configuration file. The sink is logger
in this example../bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
logs/flume.log
.
Was this page helpful?