将 Kafka 中的数据通过 Flume 收集并存储到 Hbase。
首先需要查看 CKafka 的内网 IP 和端口号。登录消息队列 CKafka 的控制台,选择您要使用的 CKafka 实例,在基本消息中查看其内网 IP 为 $kafkaIP,而端口号一般默认为9092。在 topic 管理界面新建一个 topic 为 kafka_test。
创建 flume 的配置文件 hbase_kafka.properties
vim hbase_kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hbase_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# 以下配置 sink
agent.sinks.hbase_sink.channel = mem_channel
agent.sinks.hbase_sink.table = foo_table
agent.sinks.hbase_sink.columnFamily = cf
agent.sinks.hbase_sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
创建 hbase 表
hbase shell
create 'foo_table','cf'
运行 flume
./bin/flume-ng agent --conf ./conf/ -f hbase_kafka.properties -n agent -Dflume.root.logger=INFO,console
运行 kafka producer
[hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
hello
hbase_test
本页内容是否解决了您的问题?