参数 | 说明 |
用户认证方式 | 目前仅支持 SASL_PLAINTEXT。 |
hosts | 内网消费:kafkaconsumer-${region}.cls.tencentyun.com:9095。外网消费:kafkaconsumer-${region}.cls.tencentcs.com:9096,详细参见 可用域名- Kafka 消费日志。 |
topic | CLS kafka 协议消费控制台给出的主题名称,点击旁边按钮可以复制。例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX。 |
username | 配置为${logsetID},即日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX ,在日志主题列表中可以复制日志集 ID。 |
password | 配置为${SecretId}#${SecretKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY,请登录 腾讯云访问管理 ,在左侧导航栏中单击访问密钥,API 密钥或者项目密钥均可使用,建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。 |
${SecretId}#${SecretKey}
后有(;分号),不要漏填,否则会报错。import uuidfrom kafka import KafkaConsumer,TopicPartition,OffsetAndMetadataconsumer = KafkaConsumer(#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'您的消费主题',group_id = uuid.uuid4().hex,auto_offset_reset='earliest',#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6sasl_plain_username = "${logsetID}",#密码是用户的SecretId#SecretKey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (1,1,1))print('begin')for message in consumer:print('begins')print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))print('end')
CREATE TABLE `nginx_source`( # 日志中字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,列子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.14.4</version></dependency>
CREATE TABLE `nginx_source`(#日志中的字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,#kafka分区`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,列子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
select count(*) , host from nginx_source group by host;
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1#配置Sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写a1.sources.source_kafka.kafka.bootstrap.servers = $kafkaconsumer-${region}.cls.tencentyun.com:9095#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制a1.sources.source_kafka.kafka.topics = 您的消费主题#请替换为您的消费组名称a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}"password="${SecretId}#${SecretKey}";//配置sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100//将source和sink绑定到channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1
本页内容是否解决了您的问题?