tencent cloud

文档反馈

模拟上下游 Datagen Logger Blackhole

最后更新时间:2024-04-19 12:23:42

    调试 Source 和 Sink 介绍

    当需要检验作业是否可以正常运行、逻辑是否正确时,为了减少外部系统的部署开销,以及避免干扰因素,我们可以使用一些调试专用的 Connector。

    版本说明

    Flink 版本
    说明
    1.11
    支持
    1.13
    支持
    1.14
    支持
    1.16
    支持

    Datagen Source

    Datagen 是 Flink 自带的随机数据生成器,它可以作为数据源直接引用。详细的使用方式可参考 Flink 官方文档
    下面是 Datagen 数据源的一个示例,它生成的数据含有两个字段:第一个字段 id 是一个随机数,第二个字段 name 是一个随机字符串。

    DDL 定义

    CREATE TABLE datagen_source_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1' -- 每秒产生的数据条数
    );

    WITH 参数

    参数
    是否必选
    默认参数
    数据类型
    描述
    connector
    必须
    (none)
    String
    指定要使用的连接器,这里是 'datagen'。
    rows-per-second
    可选
    10000
    Long
    每秒生成的行数,用以控制数据发出速率。
    fields.#.kind
    可选
    random
    String
    指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。
    fields.#.min
    可选
    (Minimum value of type)
    (Type of field)
    随机生成器的最小值,适用于数字类型。
    fields.#.max
    可选
    (Maximum value of type)
    (Type of field)
    随机生成器的最大值,适用于数字类型。
    fields.#.length
    可选
    100
    Integer
    随机生成器生成字符的长度,适用于 char、varchar、string。
    fields.#.start
    可选
    (none)
    (Type of field)
    序列生成器的起始值。
    fields.#.end
    可选
    (none)
    (Type of field)
    序列生成器的结束值。

    Logger Sink

    Logger Sink 是腾讯云 Oceanus 提供的一个自定义 Logger 示例,它可以将最终的结果数据写入 TaskManager 的日志文件中,后续可以通过 Flink UI 或者控制台的日志面板查看这些日志的输出。
    1. 使用 Logger Sink 前,需要先 下载 JAR 包如果您希望自定义输出逻辑,也可以自行修改并编译构建程序包
    2. 将下载的 JAR 包上传到程序包,具体可参考 依赖管理
    3. 在 SQL 作业中引用该程序包。

    DDL 定义

    CREATE TABLE logger_sink_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'logger',
    'print-identifier' = 'DebugData'
    );

    WITH 参数

    参数
    是否必选
    数据类型
    描述
    connector
    必须
    String
    指定要使用的连接器,这里是 'logger'。
    print-identifier
    可选
    String
    日志打印的前缀信息。
    all-changelog-mode
    可选
    Boolean
    启用后,不会过滤 -U 数据,可用来模拟 ClickHouse Collapsing 模式的数据流。
    records-per-second
    可选
    Integer
    可指定每秒输出多少条数据,起到限流的作用。
    mute-output
    可选
    Boolean
    丢弃所有输出,只做条数统计(类似增强版的 Blackhole Sink)。

    监控指标说明

    Oceanus 为 Logger Sink 增加了很多实用的统计指标。单击 Flink UI 的运行图中的 Logger Sink 算子,即可搜索并查看指标:
    numberOfInsertRecords:获取输出的 +I 消息数。
    numberOfDeleteRecords:获取输出的 -D 消息数。
    numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
    numberOfUpdateAfterRecords:获取输出的 +U 消息数。
    Flink 内置了输出到 STDOUT(标准输出)的 Print Sink,但是由于打印的格式不符合 Oceanus 日志采集器的规则,目前不能很好地展示在界面上。我们建议使用上述 Logger Sink 来代替 Print Sink。

    DDL 定义

    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );

    WITH 参数

    参数
    是否必选
    默认值
    数据类型
    描述
    connector
    必选
    (none)
    String
    指定要使用的连接器,此处应为 'print'。
    print-identifier
    可选
    (none)
    String
    配置一个标识符作为输出数据的前缀。
    standard-error
    可选
    false
    Boolean
    如果 format 需要打印为标准错误而不是标准输出,则为 True。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持