notify
function will be called for the pulled messages in sequence, and the SDK ensures that each message will be pushed once and only once. If the m.ackAsConsumed()
function is not called, notify
will still be called for messages, as pull and acknowledgment are async.BEGIN
and COMMIT
messages) must be acknowledged but can be acknowledged repeatedly.1
, 2
, 3
, 4
, and 5
messages but calls the m.ackAsConsumed()
function only for 1
, 2
, and 5
, the SDK will acknowledge the consumption of 1
and 2
to the server. If the client fails at this point, the SDK will obtain messages starting from message 3
.
As message pull and acknowledgment are async, the SDK will keep pulling new messages and notifying the client even if some messages are not acknowledged. However, if the number of unacknowledged messages exceeds a threshold (currently 8,000
), the SDK will stop pulling new messages.record_id
and checkpoint
, and the SDK actually acknowledges a message by acknowledging its checkpoint
.package com.qcloud.biz;import com.qcloud.dts.context.NetworkEnv;import com.qcloud.dts.context.SubscribeContext;import com.qcloud.dts.message.ClusterMessage;import com.qcloud.dts.message.DataMessage;import com.qcloud.dts.subscribe.ClusterListener;import com.qcloud.dts.subscribe.DefaultSubscribeClient;import com.qcloud.dts.subscribe.SubscribeClient;import java.util.List;public class Main {public static void main(String[] args) throws Exception {// Create a contextSubscribeContext context=new SubscribeContext();// User `secretId` and `secretKey`. We recommend that you use a sub-account key and follow the principle of minimum permission to reduce risks. For information on how to obtain a sub-account key, visit https://www.tencentcloud.com/document/product/598/32675.context.setSecretId("AKID-522dabxxxxxxxxxxxxxxxxxx");context.setSecretKey("AKEY-0ff4cxxxxxxxxxxxxxxxxxxxx");// Specify the channel region, which is required for SDKs on version 2.8.0 or later.// For more information on region values, visit https://www.tencentcloud.com/document/product/236/15833?from_cn_redirect=1#.E5.9C.B0.E5.9F.9F.E5.88.97.E8.A1.A8.context.setRegion("ap-chongqing");// Subscribed `serviceIp` and `servicePort`// Note that the `IP` and `Port` parameters are required for SDKs on versions earlier than 2.8.0 and can be ignored for those on later versions if `region` is set.// context.setServiceIp("10.xx.xx.24");// context.setServicePort(50120);// Set the network environment to private network if the CVM instance where the SDK is running cannot be accessed over the public network. Public network access is used by default.context.setNetworkEnv(NetworkEnv.LAN);// Create a clientSubscribeClient client=new DefaultSubscribeClient(context);// Create a subscription listenerClusterListener listener= new ClusterListener() {@Overridepublic void notify(List<ClusterMessage> messages) throws Exception {// Consume the subscribed datafor(ClusterMessage m:messages){for(DataMessage.Record.Field f:m.getRecord().getFieldList()){if (f.getType().equals(DataMessage.Record.Field.Type.BLOB)){System.out.println("["+f.getType()+"]["+f.getFieldname()+"] the original value:");byte[] theRawBytesValue = f.getValueAsBytes();}if(f.getType().equals(DataMessage.Record.Field.Type.INT8)){// If this value is null, `f.getValueAsInteger()` will return null.System.out.println(f.getValueAsInteger());}if(f.getType().equals(DataMessage.Record.Field.Type.JSON)){// JSON data can be returned only if the source instance is MySQL 5.7.System.out.println(f.getValueAsString());}if(f.getType().equals(DataMessage.Record.Field.Type.STRING)){// If this value is null, `f.getValueAsString()` will return null.System.out.println(f.getValueAsString());// The original encoding of the fieldSystem.out.println(f.getFieldEnc());}else{// The `f.getValue()` method will be disused soon.String value = f.getValue() == null ? "Null": f.getValue();String msg = "["+f.getType()+"]"+f.getFieldname()+"[encoding:"+f.getFieldEnc()+"]"+"[value:"+value+"]";System.out.println(msg);}}// Acknowledge the consumptionm.ackAsConsumed();}}@Overridepublic void onException(Exception e){System.out.println("listen exception"+e);}};// Add a listenerclient.addClusterListener(listener);// Configure the requested subscription channelclient.askForGUID("dts-channel-r0M8kKsSyRZmSxQt");// Start the clientclient.start();}}
SubscribeClient
.ClusterListener
, consume the received binlog data, and return an acknowledgment after consumption.ClusterListener
listener allows you to manipulate the received binlog data as needed and filter it by type; for example, you can filter out all drop
statements.secretId
and secretKey
are values of keys associated with your Tencent Cloud account, which can be viewed in Access Key> API Key Management in the CAM console. The SDK uses these two parameters to authenticate your operations.name/dts:AuthenticateSubscribeSDK
operation or all DTS operations through the QcloudDTSFullAccess
policy by the root account.serviceIp
, servicePort
, and channelId
parameters are related to binlog subscription and can be viewed on the Data Subscription page in the DTS console after the subscription is configured in the TencentDB for MySQL console.serviceIp
is the IP in the service address, servicePort
is the port number in the service address, and channelId
is the channel ID in the DTS console.SubscribeContext
classsecretId
and secretKey
) and the IP address and port of the subscription service.secretId
Parameter | Type | Description |
secretId | String | Security credential secretId , which can be viewed in Access Key > API Key Management in the CAM console. |
secretKey
Parameter | Type | Description |
secretKey | String | Security credential secretKey , which can be viewed in Access Key > API Key Management in the CAM console. |
Parameter | Type | Description |
serviceIp | String | Subscription service IP address, which can be viewed on the subscription channel configuration page in the console. |
Parameter | Type | Description |
servicePort | String | The port number of the subscription service, which can be viewed on the subscription channel configuration page in the console. |
SubscribeClient
and DefaultSubscribeClient
APIsDefaultSubscribeClient
class implements the SubscribeClient
API.
This class is used to construct the subscription SDK client, that is, the consumer of binlog messages.DefaultSubscribeClient
provides sync and async acknowledgment implementations for different user needs. In sync mode, an acknowledgment is synchronously returned each time the client consumes a binlog message, ensuring that message consumption acknowledgments can be received by the server as soon as possible. However, the overall SDK performance is not as good as in async mode. In async mode, the consumer acknowledges messages asynchronously, that is, messages are pulled and acknowledged asynchronously. You can select either mode as needed.DefaultSubscribeClient
Parameter | Type | Description |
context | SubscribeContext | Your SDK configuration information |
isSynce | boolean | Whether the sync consumption mode is used for the SDK |
DefaultSubscribeClient
instance.llegalArgumentException
: This exception will be reported if any parameter is invalid in the parameter context you submitted, such as missing or incorrectly formatted security credentials, service IP, or port.Exception
: This exception will be reported in case of an internal error during SDK initialization.DefaultSubscribeClient
Parameter | Type | Description |
context | SubscribeContext | Your SDK configuration information |
DefaultSubscribeClient
instance, which uses the async message acknowledgment mode by default.llegalArgumentException
: This exception will be reported if any parameter is invalid in the parameter context you submitted, such as missing or incorrectly formatted security credentials, service IP, or port.Exception
: This exception will be reported in case of an internal error during SDK initialization.ClusterListener
listener to SubscribeClient
to subscribe to the incremental data in the channel.Parameter | Type | Description |
listener | ClusterListener | The listener used for the consumer client. The main process for binlog data consumption should be implemented in ClusterListener . |
IllegalArgumentException
: This exception will be reported if the submitted listener
parameter is empty.Exception
: This exception will be reported if more than one listeners are added to the SDK.Parameter | Type | Description |
channelId | String | Subscription channel ID, which can be viewed on the subscription channel configuration page in the console. |
Exception
: This exception will be reported in case of an internal error during SDK startup.Parameter | Type | Description |
waitSeconds | int | Wait time in seconds before the SDK is forced to stop. |
stop
function without the timeout parameter will wait until the thread stops, which may take a long time subject to system scheduling; therefore, we recommend that you use the stop
function with the timeout parameter in scenarios where the specific restart time is required.Exception
: This exception will be reported in case of an internal error during SDK stop.ClusterListener
APInotify
function of this API to consume the subscribed data and implement the onException
function to handle any possible exceptions during consumption.ClusterListener
of data consumption via the notify
function when the data is received.
Function prototype
public abstract void notify(List<ClusterMessage> messages) throws ExceptionParameter | Type | Description |
messages | List<ClusterMessage> | Subscribed data array. For more information on ClusterMessage implementation, see its definition. |
onException
function for custom handling as needed.onException
.Parameter | Type | Description |
exception | Exception | Exception class in the Java standard library |
ClusterMessage
classClusterMessage
class delivers the consumed data through the notify
function. Each ClusterMessage
saves the data records of a transaction in TencentDB for MySQL, and each record in the transaction is saved via Record
.ClusterMessage
Type | Description |
Record | Change record, which corresponds to a specific record in a transaction, such as begin , commit , update , and insert . |
SubscribeClient
. This function must be called after consumption; otherwise, the SDK will receive duplicate data due to incorrect logic.ackAsConsumed
for all received messages, including those the business logic may not care about; otherwise, the SDK will stop pulling new data after a certain number of messages remain unacknowledged.Exception
: This exception will be reported in case of an internal error during acknowledgment.Record
classClusterMessage
. A record can be a begin
, commit
, update
, or .Parameter | Type | Description |
key | String | Attribute value name |
Attribute Key Value | Description |
record_id | Record ID, which is an auto-increment string in a channel but is not necessarily continuous. |
source_type | The engine type of the database instance of the record, which is mysql currently. |
source_category | Record type, which is full_recorded currently. |
timestamp | The time when the record is stored into binlog, which is also the time when the SQL statement is executed in TencentDB. |
sdkInfo | The offset of the binlog file of the record in the format of file_offset@file_name , where file_name is the numeric suffix of the binlog file. |
record_type | The operation type of the record, mainly including insert , update , delete , replace , ddl , begin , commit , and heartbeat . |
db | The database name of the record update table, which is empty for a DDL record. |
table_name | The name of the record update table, which is empty for a DDL record. |
record_encoding | Record encoding |
primary | The name of the primary key column of the record update table |
fields_enc | The encoding of each field value of the record, which is empty if the type is not character. Multiple fields are separated by comma. |
gtid | The GTID of the transaction of the record |
Type | Description |
String | Attribute value |
Type | Description |
DataMessage.Record.Type | Record type, which can be insert , delete , update , replace , ddl , begin , commit , or heartbeat . heartbeat is the heartbeat table internally defined by DTS to check whether the subscription channel is healthy. In theory, a heartbeat is generated per second. |
Type | Description |
String | The checkpoint of a record in the binlog in the format of binlog_offset@binlog_fid . Here, binlog_offset is the change record offset in the binlog file, and binlog_fid is the binlog filename. |
Type | Description |
String | Timestamp string |
Type | Description |
String | Database name string |
Type | Description |
String | Data table name string |
Type | Description |
String | Primary key column name. Separate multiple names by semicolon for composite primary keys. |
Type | Description |
DBType | Currently, DTS only supports DBType.MYSQL , that is, TencentDB for MySQL. |
Type | Description |
int | The number of fields in the record, which is the same as or twice (for update record) that of columns in the corresponding table. |
Type | Description |
Boolean | If it is the first record in the transaction, True is returned; otherwise, False is returned. |
Type | Description |
List<Field> | Field array. For more information, see the definition of the Field class. |
INSERT
records, the Field
values in List
follow the sequence defined by the subscribed table. Values of records in Field
are inserted values, that is, post-images.DELETE
records, the Field
values in List
follow the sequence defined by the subscribed table. Values of records in Field
are values before deletion, that is, pre-images.UPDATE
records, List
contains values before and after modification, that is, pre-images and post-images. Pre-images (values before modification) are in even positions in the List
, while post-images are in odd positions. The list of pre-images and post-images also follows the sequence defined by the subscribed table. Therefore, the number of Field
values in List
is twice that of columns in the corresponding subscribed table.Field
classField
class defines the attributes of a field such as encoding, type, name, value, and primary key status.Type | Description |
String | Field encoding, which is a string. |
Type | Description |
String | Field name, which is a string. |
Type | Description |
Field.Type | Field.Type is an enumeration type corresponding to the data types supported by MySQL, including INT8 , INT16 , INT24 , INT32 , INT64 , DECIMAL , FLOAT , DOUBLE , NULL , TIMESTAMP , DATE , TIME , DATETIME , YEAR , BIT , ENUM , SET , BLOB , GEOMETRY , STRING , and UNKNOWN . |
Type | Description |
ByteString | Field value, which is NULL if empty. |
Type | Description |
Boolean | If the field is a primary key, True is returned; otherwise, False is returned. |
Was this page helpful?