https://github.com/apache/doris-flink-connector
本质是将 Doris
表映射为 DataStream
或者 Table
。Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
1.0.3 | 1.11+ | 0.15+ | 8 | 2.11,2.12 |
1.1.0 | 1.14+ | 1.0+ | 8 | 2.11,2.12 |
1.2.0 | 1.15+ | 1.0+ | 8 | - |
Stream Load
服务向 Doris 写入数据, 同时也支持 Stream Load
请求参数的配置设置,具体参数可参考Stream Load手册
,配置方法如下:WITH
参数 sink.properties.
配置DorisExecutionOptions.builder().setStreamLoadProp(Properties)
配置CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password');
-- enable checkpointSET 'execution.checkpointing.interval' = '10s';CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:8030','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label');
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
// enable checkpointenv.enableCheckpointing(10000);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris"); //streamload label prefixbuilder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string sourceList<Tuple2<String, Integer>> data = new ArrayList<>();data.add(new Tuple2<>("doris",1));DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\\t" + t.f1).sinkTo(builder.build());
// enable checkpointenv.enableCheckpointing(10000);//doris sink optionDorisSink.Builder<RowData> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:8030").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamloadProperties properties = new Properties();properties.setProperty("format", "json");properties.setProperty("read_json_by_line", "true");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setStreamLoadProp(properties); //streamload params//flink rowdata‘s schemaString[] fields = {"city", "longitude", "latitude"};DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata sourceDataStream<RowData> source = env.fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(3);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);return genericRowData;}});source.sinkTo(builder.build());
Key | Default Value | Required | Comment |
fenodes | -- | Y | Doris FE HTTP 地址 |
table.identifier | -- | Y | Doris 表名,如:db.tbl |
username | -- | Y | 访问 Doris 的用户名 |
password | -- | Y | 访问 Doris 的密码 |
doris.request.retries | 3 | N | 向 Doris 发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | N | 向 Doris 发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | N | 向 Doris 发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力 |
doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销 |
doris.exec.mem.limit | 2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
doris.read.field | -- | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | -- | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤 |
sink.label-prefix | -- | Y | Stream load 导入使用的 label 前缀。2pc 场景下要求全局唯一 ,用来保证 Flink 的 EOS 语义 |
sink.properties.* | -- | N | Stream Load 的导入参数。 例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\\x01'会被转换为二进制的0x01 JSON 格式导入 'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true' |
sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型 |
sink.enable-2pc | TRUE | N | 是否开启两阶段提交(2pc),默认为 true,保证 Exactly-Once 语义 |
Doris Type | Flink Type |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table');-- 支持删除事件同步(sink.enable-delete='true'),需要 Doris 表开启批量删除功能CREATE TABLE doris_sink (id INT,name STRING)WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true','sink.label-prefix' = 'doris_label');insert into doris_sink select id,name from cdc_mysql_source;
CREATE TABLE bitmap_sink (dt int,page string,user_id int)WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)')
本页内容是否解决了您的问题?