tencent cloud

Feedback

Flink Connector (Real-time or Batch Data with Flink)

Last updated: 2024-09-29 09:38:16
    Note
    This document applies to versions after flink-doris-connector 1.1.0.

    Basic Introduction

    Flink Doris Connector allows operations (reading, inserting, modifying, and deleting) on data in Doris storage through Flink, not just import. Since Flink is a unified stream-batch computing engine, both real-time incremental data and batch data can be imported into Doris via the Flink Doris Connector. Repo address: https://github.com/apache/doris-flink-connector It essentially maps the Doris table to DataStream or Table.
    Note
    Modification and deletion are only supported on the Unique Key model. -n Currently, deletion supports data import through Flink CDC for automatic deletion. If other data import methods are used, you'll need to delete data by yourself. For more information on how to use data deletion of Flink CDC, see the last section of this document.

    Version Compatibility

    Connector Version
    Flink Version
    Doris Version
    Java Version
    Scala Version
    1.0.3
    1.11+
    0.15+
    8
    2.11,2.12
    1.1.0
    1.14+
    1.0+
    8
    2.11,2.12
    1.2.0
    1.15+
    1.0+
    8
    -

    Usage

    There are two main ways to read and write Doris data with Flink:
    SQL
    DataStream

    Parameter Configuration

    Flink Doris Connector Sink is internally implemented to write data to Doris through the Stream Load service. It also supports the configuration of Stream Load request parameters. For specific parameters, see the Stream Load manual. The configuration method is as follows:
    SQL uses the WITH parameter for the sink.properties.configuration.
    The usage of DataStream is DorisExecutionOptions.builder().setStreamLoadProp(Properties) for configuration

    SQL

    Source (Doris table as data source)
    CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    )
    WITH (
    'connector' = 'doris',
    'fenodes' = 'FE_IP:8030',
    'table.identifier' = 'database.table',
    'username' = 'root',
    'password' = 'password'
    );
    Sink (Doris table as import target table)
    -- enable checkpoint
    SET 'execution.checkpointing.interval' = '10s';
    CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    )
    WITH (
    'connector' = 'doris',
    'fenodes' = 'FE_IP:8030',
    'table.identifier' = 'db.table',
    'username' = 'root',
    'password' = 'password',
    'sink.label-prefix' = 'doris_label'
    );
    Insert
    INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

    DataStream

    Source
    DorisOptions.Builder builder = DorisOptions.builder()
    .setFenodes("FE_IP:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("password");
    
    DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
    .setDorisOptions(builder.build())
    .setDorisReadOptions(DorisReadOptions.builder().build())
    .setDeserializer(new SimpleListDeserializationSchema())
    .build();
    
    env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
    Sink String Data Stream
    // enable checkpoint
    env.enableCheckpointing(10000);
    
    DorisSink.Builder<String> builder = DorisSink.builder();
    DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    dorisBuilder.setFenodes("FE_IP:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("password");
    
    
    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix
    
    builder.setDorisReadOptions(DorisReadOptions.builder().build())
    .setDorisExecutionOptions(executionBuilder.build())
    .setSerializer(new SimpleStringSerializer()) //serialize according to string
    .setDorisOptions(dorisBuilder.build());
    
    
    //mock string source
    List<Tuple2<String, Integer>> data = new ArrayList<>();
    data.add(new Tuple2<>("doris",1));
    DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
    
    source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\\t" + t.f1)
    .sinkTo(builder.build());
    RowData Data Stream
    // enable checkpoint
    env.enableCheckpointing(10000);
    
    //doris sink option
    DorisSink.Builder<RowData> builder = DorisSink.builder();
    DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    dorisBuilder.setFenodes("FE_IP:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("password");
    
    // json format to streamload
    Properties properties = new Properties();
    properties.setProperty("format", "json");
    properties.setProperty("read_json_by_line", "true");
    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
    .setStreamLoadProp(properties); //streamload params
    
    //flink rowdata‘s schema
    String[] fields = {"city", "longitude", "latitude"};
    DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE()};
    
    builder.setDorisReadOptions(DorisReadOptions.builder().build())
    .setDorisExecutionOptions(executionBuilder.build())
    .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
    .setFieldNames(fields)
    .setType("json") //json format
    .setFieldType(types).build())
    .setDorisOptions(dorisBuilder.build());
    
    //mock rowdata source
    DataStream<RowData> source = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
    @Override
    public RowData map(String value) throws Exception {
    GenericRowData genericRowData = new GenericRowData(3);
    genericRowData.setField(0, StringData.fromString("beijing"));
    genericRowData.setField(1, 116.405419);
    genericRowData.setField(2, 39.916927);
    return genericRowData;
    }
    });
    
    source.sinkTo(builder.build());

    Configuration

    General Configuration Items

    Key
    Default Value
    Required
    Comment
    fenodes
    --
    Y
    Doris FE HTTP Address
    table.identifier
    --
    Y
    Doris table name, e.g., db.tbl
    username
    --
    Y
    Username to access Doris
    password
    --
    Y
    Password to access Doris
    doris.request.retries
    3
    N
    Number of times of retrying to send requests to Doris
    doris.request.connect.timeout.ms
    30000
    N
    Connection timeout period when sending requests to Doris
    doris.request.read.timeout.ms
    30000
    N
    Reading timeout period when sending requests to Doris
    doris.request.query.timeout.s
    3600
    N
    Timeout period for querying Doris. The default value is 1 hour. -1 means there is no time limit.
    doris.request.tablet.size
    Integer. MAX_VALUE
    N
    Number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated to enhance the parallelism on the Flink side. However, it will also put more pressure on Doris.
    doris.batch.size
    1024
    N
    Maximum rows of data that can be read from BE at one time. Increasing this value can reduce the number of times of establishing connections between Flink and Doris, thereby reducing the extra time cost caused by network latency.
    doris.exec.mem.limit
    2147483648
    N
    Memory limit for a single query. The default value is 2GB. The unit is bytes.
    doris.deserialize.arrow.async
    FALSE
    N
    Whether to support asynchronous conversion of Arrow format to the RowBatch required for flink-doris-connector iteration
    doris.deserialize.queue.size
    64
    N
    Internal processing queue for asynchronously converting Arrow format. It takes effect when doris.deserialize.arrow.async is true.
    doris.read.field
    --
    N
    Column name list for reading Doris table, separated by commas
    doris.filter.query
    --
    N
    Expression for filtering the data to be read. This expression is passed through to Doris. Doris uses this expression to filter the data at the source end.
    sink.label-prefix
    --
    Y
    Prefix for labels used in stream load, which is required to be globally unique in 2pc scenarios to ensure Flink's EOS semantics.
    sink.properties.*
    --
    N
    Stream load parameters.
    For example: 'sink.properties.column_separator' = ', ' Definition column separator, 'sink.properties.escape_delimiters' = 'true' special characters as separators,'\\x01' will be converted into binary 0x01
    Importing in JSON Format
    'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'
    sink.enable-delete
    TRUE
    N
    Whether to enable deletion. This option requires the batch deletion feature (enabled in Doris 0.15+ versions by default) is enabled for Doris table, only Unique model is supported.
    sink.enable-2pc
    TRUE
    N
    Whether to enable two-stage submission (2pc), defaulted as true, ensuring Exactly-Once semantics
    Doris Type
    Flink Type
    NULL_TYPE
    NULL
    BOOLEAN
    BOOLEAN
    TINYINT
    TINYINT
    SMALLINT
    SMALLINT
    INT
    INT
    BIGINT
    BIGINT
    FLOAT
    FLOAT
    DOUBLE
    DOUBLE
    DATE
    DATE
    DATETIME
    TIMESTAMP
    DECIMAL
    DECIMAL
    CHAR
    STRING
    LARGEINT
    STRING
    VARCHAR
    STRING
    DECIMALV2
    DECIMAL
    TIME
    DOUBLE
    HLL
    Unsupported datatype
    CREATE TABLE cdc_mysql_source (
    id int
    ,name VARCHAR
    ,PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = 'password',
    'database-name' = 'database',
    'table-name' = 'table'
    );
    
    -- Supporting synchronizing delete events (sink.enable-delete='true'), the batch deletion feature must be enabled for the Doris table
    CREATE TABLE doris_sink (
    id INT,
    name STRING
    )
    WITH (
    'connector' = 'doris',
    'fenodes' = '127.0.0.1:8030',
    'table.identifier' = 'database.table',
    'username' = 'root',
    'password' = '',
    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true',
    'sink.enable-delete' = 'true',
    'sink.label-prefix' = 'doris_label'
    );
    
    insert into doris_sink select id,name from cdc_mysql_source;

    Java Example

    samples/doris-demo/fink-demo/ provides a Java version example for reference, click here.

    Best Practice

    Application Scenario

    The most suitable scenario for using Flink Doris Connector is to synchronize data (from Mysql, Oracle, PostgreSQL, etc) to Doris in real-time/batch, use Flink to combine data in Doris with other data sources for analysis, and Flink Doris Connector can also be used.

    Other

    Flink Doris Connector mainly depends on Checkpoint for stream writing, so the interval of Checkpoint is the visible delay time of data.
    To ensure the Exactly Once semantics of Flink, two-stage submission is enabled in Flink Doris Connector by default. And two-stage submission is enabled in verions after 1.1 by default. For 1.0, it can be enabled by modifying BE parameter, see Stream Load (local file) for more information.

    FAQs

    1. Writing Bitmap Type
    CREATE TABLE bitmap_sink (
    dt int,
    page string,
    user_id int
    )
    WITH (
    'connector' = 'doris',
    'fenodes' = '127.0.0.1:8030',
    'table.identifier' = 'test.bitmap_test',
    'username' = 'root',
    'password' = '',
    'sink.label-prefix' = 'doris_label',
    'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    )
    2. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650] In Exactly-Once scenarios, Flink Job must start from the latest Checkpoint/Savepoint during restarting. Otherwise, the above error will be reported. When Exactly-Once is not required, the problem can be solved by disabling 2PC submission (sink.enable-2pc=false) or by changing to a different sink.label-prefix.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support