Parameter | Description |
User authentication mode | Currently, only SASL_PLAINTEXT is supported. |
hosts | Consumption over the private network: `kafkaconsumer-${region}.cls.tencentyun.com:9095`.Consumption over the public network: `kafkaconsumer-${region}.cls.tencentcs.com:9096`. For more information, see Available Regions. |
topic | Topic name provided in the CLS console for consumption over Kafka, which can be copied by clicking the button next to it, such as `XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`. |
username | Configure it as `${logsetID}`, i.e., logset ID, such as `0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX`. You can copy the logset ID in the log topic list. |
password | Configure it as ${SecretId}#${SecretKey}, such as `XXXXXXXXXXXXXX#YYYYYYYY`. Log in to the CAM console and click Access Key on the left sidebar to get the key information. You can use either the API key or project key. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account. |
;
after the ${SecretId}#${SecretKey}
of the jaas.config
in the following sample code; otherwise, an error will be reported.import uuidfrom kafka import KafkaConsumer,TopicPartition,OffsetAndMetadataconsumer = KafkaConsumer(#Topic name provided in the CLS console for consumption over Kafka, which can be copied in the console, such as `XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`.'Your consumption topic',group_id = uuid.uuid4().hex,auto_offset_reset='earliest',# Service address + port (9096 for the public network and 9095 for the private network). In this example, consumption is performed over the private network. Enter this field accordingly.bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',# The username is the logset ID, such as `ca5cXXXXdd2e-4ac0af12-92d4b677d2c6`.sasl_plain_username = "${logsetID}",#The password is a string of your `SecretId#SecretKey`, such as `AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac`. Note that `#` is required. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account.sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (1,1,1))print('begin')for message in consumer:print('begins')print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))print('end')
CREATE TABLE `nginx_source`( # Fields in the log`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- Kafka partition`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#Topic name provided in the CLS console for consumption over Kafka, which can be copied in the console, such as `XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`.'topic' = 'Your consumption topic',# Service address + port (9096 for the public network and 9095 for the private network). In this example, consumption is performed over the private network. Enter this field accordingly.'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# Replace it with the name of your consumer group'properties.group.id' = 'The name of your consumer group','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# The username is the logset ID, such as `ca5cXXXXdd2e-4ac0af12-92d4b677d2c6`.#The password is a string of your `SecretId#SecretKey`, such as `AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac`. Note that `#` is required. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account. Be sure not to omit the `;` at the end of the `jaas.config`; otherwise, an error will be reported.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
flink-connector-kafka
exists in flink lib
, directly register a Kafka table in sql
. The dependency is as follows:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.14.4</version></dependency>
CREATE TABLE `nginx_source`(# Fields in the log`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,# Kafka partition`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#Topic name provided in the CLS console for consumption over Kafka, which can be copied in the console, such as `XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`.'topic' = 'Your consumption topic',# Service address + port (9096 for the public network and 9095 for the private network). In this example, consumption is performed over the private network. Enter this field accordingly.'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# Replace it with the name of your consumer group'properties.group.id' = 'The name of your consumer group','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# The username is the logset ID, such as `ca5cXXXXdd2e-4ac0af12-92d4b677d2c6`.#The password is a string of your `SecretId#SecretKey`, such as `AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac`. Note that `#` is required. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account. Be sure not to omit the `;` at the end of the `jaas.config`; otherwise, an error will be reported.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
select count(*) , host from nginx_source group by host;
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1#Configure the sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000# Service address + port (9096 for the public network and 9095 for the private network). In this example, consumption is performed over the private network. Enter this field accordingly.a1.sources.source_kafka.kafka.bootstrap.servers = $kafkaconsumer-${region}.cls.tencentyun.com:9095#Topic name provided in the CLS console for consumption over Kafka, which can be copied in the console, such as `XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`.a1.sources.source_kafka.kafka.topics = Your consumption topic#Replace it with the name of your consumer groupa1.sources.source_kafka.kafka.consumer.group.id = The name of your consumer groupa1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN# The username is the logset ID, such as `ca5cXXXXdd2e-4ac0af12-92d4b677d2c6`.#The password is a string of your `SecretId#SecretKey`, such as `AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac`. Note that `#` is required. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account. Be sure not to omit the `;` at the end of the `jaas.config`; otherwise, an error will be reported.a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}"password="${SecretId}#${SecretKey}";// Configure the sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100// Bind the source and sink to the channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1
문제 해결에 도움이 되었나요?