tencent cloud

All product documents
Stream Compute Service
TDSQL for MySQL
Last updated: 2023-11-08 14:53:32
TDSQL for MySQL
Last updated: 2023-11-08 14:53:32

Overview

The tdsql-subscribe connector is dedicated to the subscription of TDSQL for MySQL data. It allows integrating the incremental binlog data from TDSQL for MySQL as instructed in Creating TDSQL for MySQL Data Subscription. Before using this connector, make sure a data subscription task has been successfully configured.
Note
The tdsql-subscribe connector is under beta testing. If you need to try it, submit a ticket.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Unsupported
1.16
Unsupported

‌## Limits

The tdsql-subscribe connector can be used as a source but not a sink of a stream.

Defining a table in DDL

When the tdsql-subscribe connector is used as a source, most of its WITH parameters are similar to those of the Kafka connector, and all connection parameters can be found in the subscription task. Please note that when using the tdsql-subscribe connector, you must set format to protobuf, because the messages sent to Kafka via the subscription task is in protobuf format. Compared with the Kafka connector, the tdsql-subscribe connector contains more authentication information from the subscription task.

As a source

Input in protobuf

CREATE TABLE `DataInput` (
`id` INT,
`name` VARCHAR,
`age` INT,
) WITH (
'connector' = 'tdsql-subscribe', -- Make sure you specify the corresponding connector.
'tdsql.database.name' = 'test_case_2022_06_0*', -- Filter subscription messages to consume subscription data of databases whose name matches the regex `test_case_2022_06_0*`.
'tdsql.table.name' = 'test_0*', -- Filter subscription messages to consume subscription data of tables whose name matches the regex `test_0*`.
'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- Replace it with the topic consumed by the subscription task.
'scan.startup.mode' = 'earliest-offset', -- Valid values: `latest-offset`, `earliest-offset`, `specific-offsets, and `group-offsets`.
'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- Replace it with the Kafka connection address of your subscription task.
'properties.group.id' = 'consumer-grp-subs-xxx-kk',
'format' = 'protobuf', -- Only protobuf is allowed.
'properties.security.protocol'='SASL_PLAINTEXT', -- Authentication protocol.
'properties.sasl.mechanism'='SCRAM-SHA-512', -- Authentication method.
'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' -- Username and password.
);
CREATE TABLE `jdbc_upsert_sink_table` (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT
) WITH (
-- Specify the parameters for database connection.
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- Replace it with your MySQL database connection URL.
'table-name' = 'sink', --The table into which the data will be written.
'username' = 'user', -- The username (with the INSERT permission required) for database access.
'password' = 'psw' -- The password for database access.
);
INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;

WITH parameters

Option
Required
Default Value
Description
connector
Yes
None
Here, it should be 'tdsql-subscribe'.
topic
Yes
None
The name of the Kafka topic to be read.
properties.bootstrap.servers
Yes
None
The Kafka bootstrap addresses, separated by comma.
properties.group.id
Yes
None
The ID of the Kafka consumer group.
format
Yes
None
The input format of a Kafka message. Only protobuf is supported.
scan.startup.mode
No
group-offsets
The Kafka consumer start mode.
Valid values: latest-offset, earliest-offset, specific-offsets, group-offsets, and timestamp. If 'specific-offsets' is used, specify the offset of each partition, such as 'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'.
If 'timestamp' is used, specify the startup timestamp (in ms), such as 'scan.startup.timestamp-miles' = '1631588815000'.
scan.startup.specific-offsets
No
None
If scan.startup.mode is set to 'specific-offsets', this option must be used to specify the specific offset of the startup, such as 'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millis
No
None
If scan.startup.mode is set to 'timestamp', this option must be used to specify the time point (Unix timestamp in ms) of the startup.
tdsql.database.name
No
None
The name of the TDSQL database. If this option is set, this connector can consume the binlog data of the database specified here, provided that the subscription task contains the binlog data of this database. This option supports a regex, such as test_case_2022_06_0*.
tdsql.table.name
No
None
The name of the TDSQL table. If this option is set, this connector can consume the binlog data of the table specified here, provided that the subscription task contains the binlog data of this table. ‌This option supports a regex, such as test_0* or test_1,test_2.
Note
To use tdsql.database.name or tdsql.table.name, we recommend you subscribe to all instances in the subscription task. If multiple Stream Compute Service tasks consume different TDSQL tables, each task must use a unique consumer group of the subscription task. You can create consumer groups in the subscription task.

Notes

1. If a subscription task is configured with multiple databases and tables or one database and multiple tables, the tables must be in the same schema to correctly integrate the data of the subscription task.
2. Chinese characters in a source table can be encoded only using utf8 or gbk.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support