Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Unsupported |
format
to protobuf
, because the messages sent to Kafka via the subscription task is in protobuf
format. Compared with the Kafka connector, the tdsql-subscribe connector contains more authentication information from the subscription task.CREATE TABLE `DataInput` (`id` INT,`name` VARCHAR,`age` INT,) WITH ('connector' = 'tdsql-subscribe', -- Make sure you specify the corresponding connector.'tdsql.database.name' = 'test_case_2022_06_0*', -- Filter subscription messages to consume subscription data of databases whose name matches the regex `test_case_2022_06_0*`.'tdsql.table.name' = 'test_0*', -- Filter subscription messages to consume subscription data of tables whose name matches the regex `test_0*`.'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- Replace it with the topic consumed by the subscription task.'scan.startup.mode' = 'earliest-offset', -- Valid values: `latest-offset`, `earliest-offset`, `specific-offsets, and `group-offsets`.'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- Replace it with the Kafka connection address of your subscription task.'properties.group.id' = 'consumer-grp-subs-xxx-kk','format' = 'protobuf', -- Only protobuf is allowed.'properties.security.protocol'='SASL_PLAINTEXT', -- Authentication protocol.'properties.sasl.mechanism'='SCRAM-SHA-512', -- Authentication method.'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' -- Username and password.);CREATE TABLE `jdbc_upsert_sink_table` (id INT PRIMARY KEY NOT ENFORCED,name STRING,age INT) WITH (-- Specify the parameters for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- Replace it with your MySQL database connection URL.'table-name' = 'sink', --The table into which the data will be written.'username' = 'user', -- The username (with the INSERT permission required) for database access.'password' = 'psw' -- The password for database access.);INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;
Option | Required | Default Value | Description |
connector | Yes | None | Here, it should be 'tdsql-subscribe' . |
topic | Yes | None | The name of the Kafka topic to be read. |
properties.bootstrap.servers | Yes | None | The Kafka bootstrap addresses, separated by comma. |
properties.group.id | Yes | None | The ID of the Kafka consumer group. |
format | Yes | None | The input format of a Kafka message. Only protobuf is supported. |
scan.startup.mode | No | group-offsets | The Kafka consumer start mode. Valid values: latest-offset , earliest-offset , specific-offsets , group-offsets , and timestamp . If 'specific-offsets' is used, specify the offset of each partition, such as 'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300' .If 'timestamp' is used, specify the startup timestamp (in ms), such as 'scan.startup.timestamp-miles' = '1631588815000' . |
scan.startup.specific-offsets | No | None | If scan.startup.mode is set to 'specific-offsets' , this option must be used to specify the specific offset of the startup, such as 'partition:0,offset:42;partition:1,offset:300' . |
scan.startup.timestamp-millis | No | None | If scan.startup.mode is set to 'timestamp' , this option must be used to specify the time point (Unix timestamp in ms) of the startup. |
tdsql.database.name | No | None | The name of the TDSQL database. If this option is set, this connector can consume the binlog data of the database specified here, provided that the subscription task contains the binlog data of this database. This option supports a regex, such as test_case_2022_06_0* . |
tdsql.table.name | No | None | The name of the TDSQL table. If this option is set, this connector can consume the binlog data of the table specified here, provided that the subscription task contains the binlog data of this table. This option supports a regex, such as test_0* or test_1,test_2 . |
tdsql.database.name
or tdsql.table.name
, we recommend you subscribe to all instances in the subscription task. If multiple Stream Compute Service tasks consume different TDSQL tables, each task must use a unique consumer group of the subscription task. You can create consumer groups in the subscription task.utf8
or gbk
.
Was this page helpful?