Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Unsupported |
USE MyDBGOEXEC sys.sp_cdc_enable_table@source_schema = N'dbo', -- Specifies the schema of the source table.@source_name = N'MyTable', -- Specifies the name of the table that you want to capture.@role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the `sysadmin` or `db_owner` role also have access to the specified change tables. Set the value of `@role_name` to `NULL` to allow only members in the `sysadmin` or `db_owner` role to have full access to captured information.@filegroup_name = NULL, -- Specifies the filegroup where SQL Server places the change table for the captured table. It can be set to `NULL`. If it is set to a specific name, `filegroup_name` must be defined for the current database. If it is set to `NULL`, the default filegroup is used.@supports_net_changes = 0 -- Whether to support querying net changes for the captured table. If the table has a primary key or a unique index using the parameter ID `@index_name`, `supports_net_changes` defaults to `1`. Otherwise, it defaults to `0`. If it is `0`, only the function for querying all changes is generated. If it is `1`, the function for querying net changes is also generated. Setting `supports_net_changes` to `1` requires specifying `index_name` or a defined primary key of the source table.GO
USE MyDBGOEXEC sys.sp_cdc_help_change_data_captureGO
-- register a SqlServer table 'orders' in Flink SQLCREATE TABLE orders (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'sqlserver-cdc','hostname' = 'localhost','port' = '1433','username' = 'sa','password' = 'Password!','database-name' = 'inventory','schema-name' = 'dbo','table-name' = 'orders');-- read snapshot and binlogs from orders tableSELECT * FROM orders;
Option | Required | Default Value | Type | Description |
connector | Yes | None | String | The value must be sqlserver-cd . |
hostname | Yes | None | String | IP address or hostname of the SQLServer database. |
username | Yes | None | String | Username to use when connecting to the SQLServer database. |
password | Yes | None | String | Password to use when connecting to the SQLServer database. |
database-name | Yes | None | String | Database name of the SQLServer database to monitor. |
schema-name | Yes | None | String | Schema name of the source table. Regexes in Java are supported. For example, dbo.* can match dbo , dbo1 , and dbo_test . We recommend you place a regex in brackets to prevent errors when it is combined with table-name . |
table-name | Yes | None | String | Table name of the SQLServer database to monitor. Regexes in Java are supported. We recommend you place a regex in brackets to prevent errors when it is combined with schema-name . |
port | No | 1433 | Integer | Integer port number of the SQLServer database. |
server-time-zone | No | UTC | String | The session time zone in database server, such as "Asia/Shanghai". |
debezium.* | No | None | String | Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'initial_only' . For details, see Debezium's SQLServer Connector properties. |
Key | Data Type | Description |
table_name | STRING NOT NULL | Name of the database that contains the row. |
schema_name | STRING NOT NULL | Name of the schema that contains the row. |
database_name | STRING NOT NULL | Name of the table that contains the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. For the data during the snapshot phase, the value of this option is 0 . |
CREATE TABLE products (table_name STRING METADATA FROM 'table_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,db_name STRING METADATA FROM 'database_name' VIRTUAL,operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,id INT NOT NULL,name STRING,description STRING,weight DECIMAL(10,3)) WITH ('connector' = 'sqlserver-cdc','hostname' = 'localhost','port' = '1433','username' = 'sa','password' = 'Password!','database-name' = 'inventory','schema-name' = 'dbo','table-name' = 'products');
SQLServer Type | Flink SQL Type |
char(n) | CHAR(n) |
varchar(n) nvarchar(n) nchar(n) | VARCHAR(n) |
text ntext xml | STRING |
decimal(p, s) money smallmoney | DECIMAL(p, s) |
numeric | NUMERIC |
float real | DOUBLE |
bit | BOOLEAN |
int | INT |
tinyint | SMALLINT |
smallint | SMALLINT |
bigint | BIGINT |
date | DATE |
time(n) | TIME(n) |
datetime2 datetime smalldatetime | TIMESTAMP(n) |
datetimeoffset | TIMESTAMP_LTZ(3) |
execution.checkpointing.interval: 10minexecution.checkpointing.tolerable-failed-checkpoints: 100restart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 2147483647
Was this page helpful?