Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Supported |
1.16 | Supported |
CREATE TABLE `kafka_json_source_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.-- Define the data format (JSON).'format' = 'json','json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);
CREATE TABLE `kafka_csv_source_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.-- Define the data format (CSV).'format' = 'csv');
CREATE TABLE `kafka_debezium_source_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.-- Define the data format (JSON data output by Debezium).'format' = 'debezium-json');
CREATE TABLE `kafka_source`(aid BIGINT COMMENT 'unique id',charname string,`ts` timestamp(6),origin_database STRING METADATA FROM 'value.database' VIRTUAL,origin_table STRING METADATA FROM 'value.table' VIRTUAL,origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,`batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,`is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,`sql` STRING METADATA FROM 'value.sql' VIRTUAL,origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL) WITH ('connector' = 'kafka', -- Make sure you specify the corresponding connector.'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.'scan.startup.mode' = 'latest-offset','scan.topic-partition-discovery.interval' = '5s','format' = 'canal-json','canal-json.ignore-parse-errors' = 'false', -- Ignore JSON parse errors.'canal-json.source.append-mode' = 'true' -- Only Flink 1.13 or later is supported.);
CREATE TABLE `kafka_json_sink_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Output', -- Replace this with the topic you want to write to.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.-- Define the data format (JSON).'format' = 'json','json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);
CREATE TABLE `kafka_csv_sink_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Output', -- Replace this with the topic you want to write to.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.-- Define the data format (CSV).'format' = 'csv');
CREATE TABLE `kafka_canal_json_sink_table`(aid BIGINT COMMENT 'unique id',charname string,`ts` timestamp(6),origin_database STRING METADATA FROM 'value.database',origin_table STRING METADATA FROM 'value.table',origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',`type` STRING METADATA FROM 'value.operation-type',`batch_id` bigint METADATA FROM 'value.batch-id',`isDdl` BOOLEAN METADATA FROM 'value.is-ddl',`old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',`pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',`sql` STRING METADATA FROM 'value.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp') WITH ('connector' = 'kafka', -- Make sure you specify the corresponding connector.'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.'format' = 'canal-json');
Option | Required | Default Value | Description |
connector | Yes | - | Here, it should be 'kafka' . |
topic | Yes | - | The name of the Kafka topic to read from or write to. |
properties.bootstrap.servers | Yes | - | The Kafka bootstrap server addresses, separated with commas. |
properties.group.id | Yes for source connectors | - | The Kafka consumer group ID. |
format | Yes | - | The input and output format of Kafka messages. Currently, valid values include csv , json , avro , debezium-json , and canal-json . For Flink 1.13, maxwell-json is supported. |
scan.startup.mode | No | group-offsets | The Kafka consumer startup mode. Valid values include latest-offset , earliest-offset , specific-offsets , group-offsets , and timestamp .'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300' . If 'specific-offsets' s used, you need to specify the offsets of each partition.'scan.startup.timestamp-millis' = '1631588815000' . If 'timestamp' is used, you need to specify the start timestamp (milliseconds). |
scan.startup.specific-offsets | No | - | If scan.startup.mode is 'specific-offsets' , you need to use this option to specify the start offset, for example, 'partition:0,offset:42;partition:1,offset:300' . |
scan.startup.timestamp-millis | No | - | If scan.startup.mode is 'timestamp' , you need to use this option to specify the start timestamp (Unix format in milliseconds). |
sink.partitioner | No | - | The Kafka output partitioning. Currently, the following types of partitioning are supported: fixed : One Flink partition corresponds to not more than one Kafka partition.round-robin : One Flink partition is distributed in turn to different Kafka partitions.Custom partitioning: You can also implement partitioning logic by inheriting the FlinkKafkaPartitioner class. |
Option | Required | Default Value | Description |
json.fail-on-missing-field | No | false | If this is true , the job will fail in case of missing parameters. If this is false (default), the missing parameters will be set to null and the job will continue to be executed. |
json.ignore-parse-errors | No | false | If this is true , when there is a parse error, the field will be set to null and the job will continue to be executed. If this is false , the job will fail in case of a parse error. |
json.timestamp-format.standard | No | SQL | The JSON timestamp format. The default value is SQL , in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision} . You can also set it to ISO-8601 , and the format will be yyyy-MM-ddTHH:mm:ss.s{precision} . |
Option | Required | Default Value | Description |
csv.field-delimiter | No | , | The field delimiter, which is comma by default. |
csv.line-delimiter | No | U&'\\000A' | The line delimiter, which is \\n by default (in SQL, you must use U&'\\000A' ). You can also set it to \\r (in SQL, you need to use U&'\\000D' ). |
csv.disable-quote-character | No | false | Whether to disable quote characters. If this is true , 'csv.quote-character' cannot be used. |
csv.quote-character | No | " | The quote characters. Text inside quotes will be viewed as a whole. The default value is " . |
csv.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true , fields will be set to null in case of parse failure. |
csv.allow-comments | No | false | Whether to ignore comment lines that start with # and output them as empty lines (if this is true , make sure you set csv.ignore-parse-errors to true as well). |
csv.array-element-delimiter | No | ; | The array element delimiter, which is ; by default. |
csv.escape-character | No | - | The escape character. By default, escape characters are disabled. |
csv.null-literal | No | - | The string that will be seen as null. |
Option | Required | Default Value | Description |
debezium-json.schema-include | No | false | Whether to include schemas. If you specified 'value.converter.schemas.enable' when configuring Kafka Connect with Debezium, the JSON data sent by Debezium will include schema information, and you need to set this option to true . |
debezium-json.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true , fields will be set to null in case of parse failure. |
debezium-json.timestamp-format.standard | No | SQL | The JSON timestamp format. The default value is SQL , in which case the format will be yyyy-MM-dd HH:mm:ss.s{precision} . You can also set it to ISO-8601 , and the format will be yyyy-MM-ddTHH:mm:ss.s{precision} . |
Option | Required | Default Value | Description |
canal-json.source.append-mode | No | false | Whether to support append streams. You can set this to true when, for example, writing Canal JSON data from Kafka to Hive. This option is only supported for Flink 1.13 clusters. |
debezium-json.ignore-parse-errors | No | false | Whether to ignore parse errors. If this is true , fields will be set to null in case of parse failure. |
canal-json.* | No | - |
meta.
for the metadata column.Column | Data Type | Description |
Database | STRING NOT NULL | The name of the database to which the row belongs. |
table | STRING NOT NULL | The name of the table to which the row belongs. |
event-timestamp | TIMESTAMP_LTZ(3) NOT NULL | The time when the row was changed in the database. |
batch-id | BIGINT | The batch ID of the binlog. |
is-ddl | BOOLEAN | Whether it is a DDL statement. |
mysql-type | MAP | The database structure. |
update-before | ARRAY | The field value before it was modified. |
pk-names | ARRAY | The primary key field. |
sql | STRING | Null. |
sql-type | MAP | The mappings between the sql_type fields and Java data types. |
ingestion-timestamp | TIMESTAMP_LTZ(3) NOT NULL | The time when the row was received and processed. |
operation-type | STRING | The operation type, such as INSERT or DELETE . |
CREATE TABLE `kafka_json_source_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Input', -- Replace this with the topic you want to consume data from.'scan.startup.mode' = 'latest-offset', -- Valid values include latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.-- Define the data format (JSON).'format' = 'json','json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);CREATE TABLE `kafka_json_sink_table` (`id` INT,`name` STRING) WITH (-- Define Kafka parameters.'connector' = 'kafka','topic' = 'Data-Output', -- Replace this with the topic you want to write to.'properties.bootstrap.servers' = '172.28.28.13:9092', -- Replace this with your Kafka connection address.-- Define the data format (JSON).'format' = 'json','json.fail-on-missing-field' = 'false' -- If this is 'false', no errors will occur even when parameters are missing.'json.ignore-parse-errors' = 'true' -- If this is 'true', all parse errors will be ignored.);insert into kafka_json_sink_table select * from kafka_json_source_table;
CREATE TABLE `source`(`aid` bigint,`charname` string,`ts` timestamp(6),`database_name` string METADATA FROM 'value.database_name',`table_name` string METADATA FROM 'value.table_name',`op_ts` timestamp(3) METADATA FROM 'value.op_ts',`op_type` string METADATA FROM 'value.op_type',`batch_id` bigint METADATA FROM 'value.batch_id',`is_ddl` boolean METADATA FROM 'value.is_ddl',`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',`pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',`sql` STRING METADATA FROM 'value.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',primary key (`aid`) not enforced) WITH ('connector' = 'mysql-cdc' ,'append-mode' = 'true','hostname' = '$IP','port' = '$PORT','username' = '$USERNAME','password' = '$PASSWORD','database-name' = 't_wr','table-name' = 't1','server-time-zone' = 'Asia/Shanghai','server-id' = '5500-5510');CREATE TABLE `kafka_canal_json_sink`(aid BIGINT COMMENT 'unique id',charname string,`ts` timestamp(6),origin_database STRING METADATA FROM 'value.database',origin_table STRING METADATA FROM 'value.table',origin_ts TIMESTAMP(3) METADATA FROM 'value.event-timestamp',`type` STRING METADATA FROM 'value.operation-type',`batch_id` bigint METADATA FROM 'value.batch-id',`isDdl` BOOLEAN METADATA FROM 'value.is-ddl',`old` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type',`pk_names` ARRAY<STRING> METADATA FROM 'value.pk-names',`sql` STRING METADATA FROM 'value.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'value.sql-type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp')WITH ('connector' = 'kafka', -- Valid values include 'kafka' and 'kafka-0.11'. Make sure you specify the corresponding built-in connector.'topic' = 'TOPIC', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '$IP:$PORT', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.'format' = 'canal-json');insert into kafka_canal_json_sink select * from source;
CREATE TABLE `source`(`aid` bigint,`charname` string,`ts` timestamp(3),origin_database STRING METADATA FROM 'value.database' VIRTUAL,origin_table STRING METADATA FROM 'value.table' VIRTUAL,origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,origin_type STRING METADATA FROM 'value.operation-type' VIRTUAL,`batch_id` bigint METADATA FROM 'value.batch-id' VIRTUAL,`is_ddl` boolean METADATA FROM 'value.is-ddl' VIRTUAL,origin_old ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before' VIRTUAL,`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql-type' VIRTUAL,origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,`sql` STRING METADATA FROM 'value.sql' VIRTUAL,origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,WATERMARK FOR `origin_es` AS `origin_es` - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', -- Make sure you specify the corresponding connector.'topic' = '$TOPIC', -- Replace this with the topic you want to consume data from.'properties.bootstrap.servers' = '$IP:PORT', -- Replace this with your Kafka connection address.'properties.group.id' = 'testGroup', -- (Required) The group ID.'scan.startup.mode' = 'latest-offset','scan.topic-partition-discovery.interval' = '10s','format' = 'canal-json','canal-json.source.append-mode' = 'true', -- This is only supported for Flink 1.13.'canal-json.ignore-parse-errors' = 'false');CREATE TABLE `kafka_canal_json` (`aid` bigint,`charname` string,`ts` timestamp(9),origin_database STRING,origin_table STRING,origin_es TIMESTAMP(9),origin_type STRING,`batch_id` bigint,`is_ddl` boolean,origin_old ARRAY<MAP<STRING, STRING>>,`mysql_type` MAP<STRING, STRING>,origin_pk_names ARRAY<STRING>,`sql` STRING,origin_sql_type MAP<STRING, INT>,`ingestion_ts` TIMESTAMP(9),dt STRING,hr STRING) PARTITIONED BY (dt, hr)with ('connector' = 'hive','hive-version' = '3.1.1','hive-database' = 'testdb','partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='30 min','sink.partition-commit.policy.kind'='metastore,success-file');insert into kafka_canal_json select *,DATE_FORMAT(`origin_es`,'yyyy-MM-dd'),DATE_FORMAT(`origin_es`,'HH')from `source`;
CREATE TABLE `YourTable` (...) WITH (...'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN',...);
username
should be instance ID
+ #
+ the username you set, and password
is the password you configured.krb5.conf
and emr.keytab
in the following paths./etc/krb5.conf/var/krb5kdc/emr.keytab
jar cvf kafka-xxx.jar krb5.conf emr.keytab
vim kafka-xxx.jar
). Make sure the JAR file includes the following information and has the correct structure.META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.conf
klist -kt /var/krb5kdc/emr.keytab# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
CREATE TABLE `YourTable` (...) WITH (...'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'GSSAPI','properties.sasl.kerberos.service.name' = 'hadoop',...);
security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FDsecurity.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.confsecurity.kerberos.login.contexts: KafkaClientfs.hdfs.hadoop.security.authentication: kerberos
Was this page helpful?