$kafkaIP
in the basic information section, and the port number is generally 9092 by default. Create a topic named kafka_test
on the topic management page.hive_kafka.properties
.vim hive_kafka.propertiesagent.sources = kafka_sourceagent.channels = mem_channelagent.sinks = hive_sink# The following code is used to configure the sourceagent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.kafka_source.channels = mem_channelagent.sources.kafka_source.batchSize = 5000agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092agent.sources.kafka_source.kafka.topics = kafka_test# The following code is used to configure the sinkagent.sinks.hive_sink.channel = mem_channelagent.sinks.hive_sink.type = hiveagent.sinks.hive_sink.hive.metastore = thrift://172.16.32.51:7004agent.sinks.hive_sink.hive.database = defaultagent.sinks.hive_sink.hive.table = weblogsagent.sinks.hive_sink.hive.partition = asia,india,%y-%m-%d-%H-%Magent.sinks.hive_sink.useLocalTimeStamp = trueagent.sinks.hive_sink.round = trueagent.sinks.hive_sink.roundValue = 10agent.sinks.hive_sink.roundUnit = minuteagent.sinks.hive_sink.serializer = DELIMITEDagent.sinks.hive_sink.serializer.delimiter = ","agent.sinks.hive_sink.serializer.serdeSeparator = ','agent.sinks.hive_sink.serializer.fieldnames =id,msg# The following code is used to configure the channelagent.channels.mem_channel.type = memoryagent.channels.mem_channel.capacity = 100000agent.channels.mem_channel.transactionCapacity = 100000
grep "hive.metastore.uris" -C 2 /usr/local/service/hive/conf/hive-site.xml
<property><name>hive.metastore.uris</name><value>thrift://172.16.32.51:7004</value></property>
create table weblogs ( id int , msg string )partitioned by (continent string, country string, time string)clustered by (id) into 5 bucketsstored as orc TBLPROPERTIES ('transactional'='true');
TBLPROPERTIES ('transactional'='true')
is set.hive-site.xml
.<property><name>hive.support.concurrency</name><value>true</value></property><property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property><property><name>hive.txn.manager</name><value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value></property><property><name>hive.compactor.initiator.on</name><value>true</value></property><property><name>hive.compactor.worker.threads</name><value>1</value></property><property><name>hive.enforce.bucketing</name><value>true</value></property>
hadoop-hive
log will prompt that the Metastore cannot be connected to. Please ignore this error. Because of the startup order of the processes, Metastore will be started before HiveServer2.hive-hcatalog-streaming-xxx.jar
of Hive to the lib
directory of Flume.cp -ra /usr/local/service/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-2.3.3.jar /usr/local/service/flume/lib/
./bin/flume-ng agent --conf ./conf/ -f hive_kafka.properties -n agent -Dflume.root.logger=INFO,console
[hadoop@172 kafka]$ ./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test1,hello2,hi
Was this page helpful?