tencent cloud

SQLServer CDC
Last updated: 2023-11-08 14:23:39
SQLServer CDC
Last updated: 2023-11-08 14:23:39

Overview

The SQLServer CDC source connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to set up the SQLServer CDC connector.

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported
1.14
Unsupported
1.16
Unsupported

Limits

The SQLServer CDC connector can be used only as a source.

Setting up a SQLServer database

Change data capture (CDC) must be enabled on the ‌SQLServer source tables to capture data changes.
1. Enable CDC on a database as instructed in Setting Change Data Capture (CDC).
2. Enable CDC on a SQLServer source table as instructed in sys.sp_cdc_enable_table.
USE MyDB
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo', -- Specifies the schema of the source table.
@source_name = N'MyTable', -- Specifies the name of the table that you want to capture.
@role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the `sysadmin` or `db_owner` role also have access to the specified change tables. Set the value of `@role_name` to `NULL` to allow only members in the `sysadmin` or `db_owner` role to have full access to captured information.
@filegroup_name = NULL, -- Specifies the filegroup where SQL Server places the change table for the captured table. It can be set to `NULL`. If it is set to a specific name, `filegroup_name` must be defined for the current database. If it is set to `NULL`, the default filegroup is used.
@supports_net_changes = 0 -- Whether to support querying net changes for the captured table. If the table has a primary key or a unique index using the parameter ID `@index_name`, `supports_net_changes` defaults to `1`. Otherwise, it defaults to `0`. If it is `0`, only the function for querying all changes is generated. If it is `1`, the function for querying net changes is also generated. Setting `supports_net_changes` to `1` requires specifying `index_name` or a defined primary key of the source table.
GO
3. Check whether CDC is enabled on the source table as instructed in sys.sp_cdc_help_change_data_capture.
USE MyDB
GO

EXEC sys.sp_cdc_help_change_data_capture
GO

Defining a table in DDL

-- register a SqlServer table 'orders' in Flink SQL
CREATE TABLE orders (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
);

-- read snapshot and binlogs from orders table
SELECT * FROM orders;

WITH parameters

Option
Required
Default Value
Type
Description
connector
Yes
None
String
The value must be sqlserver-cd.
hostname
Yes
None
String
IP address or hostname of the SQLServer database.
username
Yes
None
String
Username to use when connecting to the SQLServer database.
password
Yes
None
String
Password to use when connecting to the SQLServer database.
database-name
Yes
None
String
Database name of the SQLServer database to monitor.
schema-name
Yes
None
String
Schema name of the source table. Regexes in Java are supported. For example, dbo.* can match dbo, dbo1, and dbo_test. We recommend you place a regex in brackets to prevent errors when it is combined with table-name.
table-name
Yes
None
String
Table name of the SQLServer database to monitor. Regexes in Java are supported. We recommend you place a regex in brackets to prevent errors when it is combined with schema-name.
port
No
1433
Integer
Integer port number of the SQLServer database.
server-time-zone
No
UTC
String
The session time zone in database server, such as "Asia/Shanghai".
debezium.*
No
None
String
Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'initial_only'. For details, see Debezium's SQLServer Connector properties.

Available metadata

Key
Data Type
Description
table_name
STRING NOT NULL
Name of the database that contains the row.
schema_name
STRING NOT NULL
Name of the schema that contains the row.
database_name
STRING NOT NULL
Name of the table that contains the row.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
‌It indicates the time that the change was made in the database. For the data during the snapshot phase, the value of this option is 0.
Example:
CREATE TABLE products (
table_name STRING METADATA FROM 'table_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
);

Data type mapping

SQLServer Type
Flink SQL Type
char(n)
CHAR(n)
varchar(n) nvarchar(n) nchar(n)
VARCHAR(n)
text ntext xml
STRING
decimal(p, s) money smallmoney
DECIMAL(p, s)
numeric
NUMERIC
float real
DOUBLE
bit
BOOLEAN
int
INT
tinyint
SMALLINT
smallint
SMALLINT
bigint
BIGINT
date
DATE
time(n)
TIME(n)
datetime2 datetime smalldatetime
TIMESTAMP(n)
datetimeoffset
TIMESTAMP_LTZ(3)

Notes

Can't perform checkpoint during scanning snapshot of tables

During scanning snapshot of database tables, since there is no recoverable position, we can’t perform checkpoints. In order not to perform checkpoints, SQLServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as a failed checkpoint, and by default, this will trigger a failover for the Flink job. So, if the database table is large, it is recommended to add the following Flink configurations to avoid failover because of the timeout checkpoints:
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

Single thread reading

The SQLServer CDC source can't work in parallel reading, because there is only one task that can receive change events.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback