配置项 | 说明 |
channels | 自己配置的 Channel |
type | 必须为:org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | Kafka Broker 的服务器地址 |
kafka.consumer.group.id | 作为 Kafka 消费端的 Group ID |
kafka.topics | Kafka 中数据来源 Topic |
batchSize | 每次写入 Channel 的大小 |
batchDurationMillis | 每次写入最大间隔时间 |
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSourcetier1.sources.source1.channels = channel1tier1.sources.source1.batchSize = 5000tier1.sources.source1.batchDurationMillis = 2000tier1.sources.source1.kafka.bootstrap.servers = localhost:9092tier1.sources.source1.kafka.topics = test1, test2tier1.sources.source1.kafka.consumer.group.id = custom.g.id
配置项 | 说明 |
channel | 自己配置的 Channel |
type | 必须为:org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | Kafka Broker 的服务器 |
kafka.topics | Kafka 中数据来源 Topic |
kafka.flumeBatchSize | 每次写入的 Bacth 大小 |
kafka.producer.acks | Kafka 生产者的生产策略 |
a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = mytopica1.sinks.k1.kafka.bootstrap.servers = localhost:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1
# 以kafka作为sink的demoagentckafka.source = exectailagentckafka.channels = memoryChannelagentckafka.sinks = kafkaSink# 设置source类型,根据不同需求而设置。若有特殊source可自行配置,此处使用最简单的例子agentckafka.sources.exectail.type = execagentckafka.sources.exetail.command = tail -F ./flume.testagentckafka.sources.exectail.batchSize = 20# 设置source channelagentckafka.sources.exectail.channels = memoryChannel# 设置sink类型,此处设置为kafkaagentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink# 此处设置ckafka提供的ip:portagentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # 配置实例IP# 此处设置需要导入数据的topic,请先在控制台提前创建好topicagentckafka.sinks.kafkaSink.topic = flume test #配置topic# 设置sink channelagentckafka.sinks.kafkaSink.channel = memoryChannel# Channel使用默认配置# Each channel's type is defined.agentckafka.channels.memoryChannel.type = memoryagentckafka.channels.memoryChannel.keep-alive = 10# Other config values specific to each type of channel(sink or source) can be defined as well# In this case, it specifies the capacity of the memory channelagentckafka.channels.memoryChannel.capacity = 1000agentckafka.channels.memoryChannel.transactionCapacity = 1000
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
logs/flume.log
)。
本页内容是否解决了您的问题?