https://github.com/apache/doris-flink-connector
It essentially maps the Doris
table to DataStream
or Table
.Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
1.0.3 | 1.11+ | 0.15+ | 8 | 2.11,2.12 |
1.1.0 | 1.14+ | 1.0+ | 8 | 2.11,2.12 |
1.2.0 | 1.15+ | 1.0+ | 8 | - |
Stream Load
service. It also supports the configuration of Stream Load
request parameters. For specific parameters, see the Stream Load manual
. The configuration method is as follows:WITH
parameter for the sink.properties.
configuration.DorisExecutionOptions.builder().setStreamLoadProp(Properties)
for configurationCREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password');
-- enable checkpointSET 'execution.checkpointing.interval' = '10s';CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:8030','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label');
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
// enable checkpointenv.enableCheckpointing(10000);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris"); //streamload label prefixbuilder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string sourceList<Tuple2<String, Integer>> data = new ArrayList<>();data.add(new Tuple2<>("doris",1));DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\\t" + t.f1).sinkTo(builder.build());
// enable checkpointenv.enableCheckpointing(10000);//doris sink optionDorisSink.Builder<RowData> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamloadProperties properties = new Properties();properties.setProperty("format", "json");properties.setProperty("read_json_by_line", "true");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setStreamLoadProp(properties); //streamload params//flink rowdata‘s schemaString[] fields = {"city", "longitude", "latitude"};DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata sourceDataStream<RowData> source = env.fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(3);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);return genericRowData;}});source.sinkTo(builder.build());
Key | Default Value | Required | Comment |
fenodes | -- | Y | Doris FE HTTP Address |
table.identifier | -- | Y | Doris table name, e.g., db.tbl |
username | -- | Y | Username to access Doris |
password | -- | Y | Password to access Doris |
doris.request.retries | 3 | N | Number of times of retrying to send requests to Doris |
doris.request.connect.timeout.ms | 30000 | N | Connection timeout period when sending requests to Doris |
doris.request.read.timeout.ms | 30000 | N | Reading timeout period when sending requests to Doris |
doris.request.query.timeout.s | 3600 | N | Timeout period for querying Doris. The default value is 1 hour. -1 means there is no time limit. |
doris.request.tablet.size | Integer. MAX_VALUE | N | Number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated to enhance the parallelism on the Flink side. However, it will also put more pressure on Doris. |
doris.batch.size | 1024 | N | Maximum rows of data that can be read from BE at one time. Increasing this value can reduce the number of times of establishing connections between Flink and Doris, thereby reducing the extra time cost caused by network latency. |
doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default value is 2GB. The unit is bytes. |
doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to the RowBatch required for flink-doris-connector iteration |
doris.deserialize.queue.size | 64 | N | Internal processing queue for asynchronously converting Arrow format. It takes effect when doris.deserialize.arrow.async is true. |
doris.read.field | -- | N | Column name list for reading Doris table, separated by commas |
doris.filter.query | -- | N | Expression for filtering the data to be read. This expression is passed through to Doris. Doris uses this expression to filter the data at the source end. |
sink.label-prefix | -- | Y | Prefix for labels used in stream load, which is required to be globally unique in 2pc scenarios to ensure Flink's EOS semantics. |
sink.properties.* | -- | N | Stream load parameters. For example: 'sink.properties.column_separator' = ', ' Definition column separator, 'sink.properties.escape_delimiters' = 'true' special characters as separators,'\\x01' will be converted into binary 0x01 Importing in JSON Format 'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true' |
sink.enable-delete | TRUE | N | Whether to enable deletion. This option requires the batch deletion feature (enabled in Doris 0.15+ versions by default) is enabled for Doris table, only Unique model is supported. |
sink.enable-2pc | TRUE | N | Whether to enable two-stage submission (2pc), defaulted as true, ensuring Exactly-Once semantics |
Doris Type | Flink Type |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table');-- Supporting synchronizing delete events (sink.enable-delete='true'), the batch deletion feature must be enabled for the Doris tableCREATE TABLE doris_sink (id INT,name STRING)WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true','sink.label-prefix' = 'doris_label');insert into doris_sink select id,name from cdc_mysql_source;
CREATE TABLE bitmap_sink (dt int,page string,user_id int)WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)')
Was this page helpful?