package com.qcloud.biz;import com.qcloud.dts.context.NetworkEnv;import com.qcloud.dts.context.SubscribeContext;import com.qcloud.dts.message.ClusterMessage;import com.qcloud.dts.message.DataMessage;import com.qcloud.dts.subscribe.ClusterListener;import com.qcloud.dts.subscribe.DefaultSubscribeClient;import com.qcloud.dts.subscribe.SubscribeClient;import java.util.List;public class Main {public static void main(String[] args) throws Exception {//创建一个 contextSubscribeContext context=new SubscribeContext();//用户 secretId、secretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参考https://www.tencentcloud.com/document/product/598/32675context.setSecretId("AKID-522dabxxxxxxxxxxxxxxxxxx");context.setSecretKey("AKEY-0ff4cxxxxxxxxxxxxxxxxxxxx");// 设置 channel 所在的 region,2.8.0 以后的 SDK 必须设置 region 参数// region 值参照:https://www.tencentcloud.com/document/product/236/15833?from_cn_redirect=1#.E5.9C.B0.E5.9F.9F.E5.88.97.E8.A1.A8context.setRegion("ap-chongqing");// 订阅的 serviceIp 和 servicePort// 注意:2.8.0以前的 SDK 需要设置 IP 和 Port 两个参数,2.8.0以后的版本如果设置了 region 参数则可以省略// context.setServiceIp("10.xx.xx.24");// context.setServicePort(50120);// 如果运行 SDK 的 CVM 不能访问外网,设置网络环境为内网;默认为外网。context.setNetworkEnv(NetworkEnv.LAN);//创建客户端SubscribeClient client=new DefaultSubscribeClient(context);//创建订阅 listenerClusterListener listener= new ClusterListener() {@Overridepublic void notify(List<ClusterMessage> messages) throws Exception {//消费订阅到的数据for(ClusterMessage m:messages){for(DataMessage.Record.Field f:m.getRecord().getFieldList()){if (f.getType().equals(DataMessage.Record.Field.Type.BLOB)){System.out.println("["+f.getType()+"]["+f.getFieldname()+"] the original value:");byte[] theRawBytesValue = f.getValueAsBytes();}if(f.getType().equals(DataMessage.Record.Field.Type.INT8)){// 如果该值为 null,f.getValueAsInteger() 返回 nullSystem.out.println(f.getValueAsInteger());}if(f.getType().equals(DataMessage.Record.Field.Type.JSON)){// 源实例为 mysql5.7 才可能返回 JSON 类型的数据System.out.println(f.getValueAsString());}if(f.getType().equals(DataMessage.Record.Field.Type.STRING)){// 如果该值为 null,f.getValueAsString() 返回 nullSystem.out.println(f.getValueAsString());// 字段原始的编码System.out.println(f.getFieldEnc());}else{// f.getValue() 方法即将废弃String value = f.getValue() == null ? "Null": f.getValue();String msg = "["+f.getType()+"]"+f.getFieldname()+"[encoding:"+f.getFieldEnc()+"]"+"[value:"+value+"]";System.out.println(msg);}}//消费完之后,确认消费m.ackAsConsumed();}}@Overridepublic void onException(Exception e){System.out.println("listen exception"+e);}};//添加监听者client.addClusterListener(listener);//设置请求的订阅通道client.askForGUID("dts-channel-r0M8kKsSyRZmSxQt");//启动客户端client.start();}}
SubscribeClient
。ClusterListener
,消费收到的 Binlog 订阅数据,消费完之后返回确认消息。ClusterListener
中,可以根据用户自身的需求,对收到的数据进行操作,还可以对收到 Binlog 数据根据类型进行过滤,例如过滤掉所有drop
语句等。secretId
和secretKey
是跟用户腾讯云账号关联的密钥值,可以在控制台的访问管理 > 访问密钥 > API 密钥管理查看,SDK 用这两个参数来对用户操作进行鉴权。name/dts:AuthenticateSubscribeSDK
操作的权限,或者赋予 DTS 所有操作的权限QcloudDTSFullAccess
。serviceIp
、servicePort
、channelId
都是与用户 Binlog 订阅相关的,在云数据库 MySQL 相应页面配置好订阅内容后,可在 DTS 控制台 的数据订阅页查看。serviceIp
即数据订阅控制台服务地址里的 IP、servicePort
即服务地址里的端口号、channelId
即通道 ID。参数名 | 类型 | 参数含义 |
secretId | String | 安全凭证 secretId,可以在控制台的访问管理 > 访问密钥 > API 密钥管理查看 |
参数名 | 类型 | 参数含义 |
secretKey | String | 安全凭证 secretKey,可以在控制台的访问管理 > 访问密钥 > API 密钥管理查看 |
参数名 | 类型 | 参数含义 |
serviceIp | String | 订阅服务的 IP 地址,可以在控制台的订阅通道配置页面查看 |
参数名 | 类型 | 参数含义 |
servicePort | String | 订阅服务的端口号,可以在控制台的订阅通道配置页面查看 |
参数名 | 类型 | 参数含义 |
context | SubscribeContext | 用户 SDK 的配置信息 |
isSynce | boolean | 表示 SDK 是否使用同步消费模式 |
参数名 | 类型 | 参数含义 |
context | SubscribeContext | 用户 SDK 的配置信息 |
参数名 | 类型 | 参数含义 |
listener | ClusterListener | 消费客户端需要使用的监听者,消费 Binlog 消息的主流程应该实现在 ClusterListener 这里面 |
参数名 | 类型 | 参数含义 |
channelId | String | 订阅通道的 ID,可以在控制台的订阅通道配置页面查看 |
参数名 | 类型 | 参数含义 |
waitSeconds | int | 等待时间,单位为秒,表示等待多久开始强制停止 SDK 的运行 |
参数名 | 类型 | 参数含义 |
messages | List<ClusterMessage> | 订阅数据数组,ClusterMessage 具体实现详见其定义 |
参数名 | 类型 | 参数含义 |
exception | Exception | Java 标准库中的 Exception 类 |
类型 | 参数含义 |
Record | 变更记录,对应某个事务中某条具体的记录,例如 begin,commit,update,insert 等 |
参数名 | 类型 | 参数含义 |
key | String | 属性值的名称 |
属性键值 Key | 说明 |
record_id | Record 的 ID,这个 ID 在通道内按字符串比较自增有序,不保证连续 |
source_type | Record 对应数据库实例的引擎类型,目前取值为:mysql |
source_category | Record 的类型,目前取值为:full_recorded |
timestamp | Record 落 binlog 的时间,这个时间同时也是这条 SQL 在 TencentDB 中执行的时间 |
sdkInfo | Record 对应的 binlog 文件的位点,格式为:file_offset@file_name,filen_name为binlog文件的数字后缀 |
record_type | Record 对应的操作类型,主要取值包括:insert/update/delete/replace/ddl/begin/commit/heartbeat |
db | Record 更新表,对应的数据库名;DDL 类型的记录,此字段为空 |
table_name | Record 更新表的表名;DDL 类型的记录,此字段为空 |
record_encoding | Record 对应的编码 |
primary | Record 更新表的主键列名 |
fields_enc | Record 每个字段值的编码,各个字段之间用逗号隔开,如果非字符类型那么取值为空 |
gtid | Record 所在事务的 GTID 值 |
类型 | 参数含义 |
String | 属性值 |
类型 | 参数含义 |
DataMessage.Record.Type | 记录类型 DataMessage.Record.Type 可能的取值包括:insert、delete、update、replace、ddl、begin、commit、heartbeat。其中 heartbeat 为数据传输内部定义的心跳表,主要用于检查订阅通道是否健康,理论上每秒都会产生一条 heartbeat。 |
类型 | 参数含义 |
String | 记录在 Binlog 中的 Checkpoint,格式为:binlog_offset@binlog_fid。其中 binlog_offset 为变更记录在 binlog 文件中的偏移量,binlog_fid 为 binlog 文件名。 |
类型 | 参数含义 |
String | 时间戳字符串 |
类型 | 参数含义 |
String | 数据库名称字符串 |
类型 | 参数含义 |
String | 数据表名称字符串 |
类型 | 参数含义 |
String | 主键列名,如果是联合主键,这些列名之间用分号分隔 |
类型 | 参数含义 |
DBType | 目前数据传输仅支持 TencentDB for MySQL,为 DBType.MYSQL |
类型 | 参数含义 |
int | Record 字段的个数,与该字段对应的表的列数相等,或者是列数的两倍(对于更新操作的 Record) |
类型 | 参数含义 |
Boolean | 如果是事务中的第一条日志,返回 True,否则返回 False |
类型 | 参数含义 |
List<Field> | Field 数组,具体见 Field 类定义 |
类型 | 参数含义 |
String | String 类型的字段编码 |
类型 | 参数含义 |
String | String 类型的字段名称 |
类型 | 参数含义 |
Field.Type | Field.Type 是一个枚举类型,对应 MySQL 支持的数据类型,包括: INT8, INT16, INT24, INT32, INT64, DECIMAL, FLOAT, DOUBLE, NULL, TIMESTAMP, DATE, TIME, DATETIME, YEAR, BIT, ENUM, SET, BLOB, GEOMETRY, STRING, UNKOWN |
类型 | 参数含义 |
ByteString | 字段的值,当值为空时,值为NULL |
类型 | 参数含义 |
Boolean | 如果字段为主键,返回 True,否则返回 False |
本页内容是否解决了您的问题?