tencent cloud

文档反馈

数据处理规则说明

最后更新时间:2024-11-07 11:34:03

    概览

    在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
    Logstash 是一款免费且开放的服务器端数据处理管道,能够从多个数据源采集数据,转换数据,然后将数据发送到相应的“存储库”中。 logstash 拥有丰富的过滤器插件,这使得 logstash 成为了被广泛使用的一款功能强大的数据转换工具。
    然而搭建、配置、维护自己的 logstash 服务会增大开发和运维的难度,为此 Ckafka 提供了一套对标 logstash 的数据处理服务,开发者仅需通过控制台交互界面就可以新建自己的数据处理任务。数据处理服务允许用户编辑相应的数据处理规则,支持构建链式处理,同时可以预览数据处理的结果,帮助用户轻松高效的构建一套数据处理服务,满足数据清洗和转换的需求。
    

    功能对标清单

    Logstash
    连接器数据处理服务
    功能
    Codec.json
    Filter.grok
    Filter.mutate.split
    Filter.date
    Filter.json
    Filter.mutate.convert
    Filter.mutate.gsub
    Filter.mutate.strip
    Filter.mutate.join
    Filter.mutate.rename
    Filter.mutate.update
    Filter.mutate.replace
    Filter.mutate.add_field
    Filter.mutate.remove_field
    Filter.mutate.copy
    Filter.mutate.merge
    
    TODO
    Filter.mutate.uppercase
    
    TODO
    Filter.mutate.lowercase
    
    TODO

    操作方法介绍

    数据解析

    logstash 处理方式:
    // Codec.json
    input {
    file {
    path => "/var/log/nginx/access.log_json""
    codec => "json"
    }
    }
    // Filter.grok
    filter {
    grok {
    match => {
    "message" => "\\s+(?<request_time>\\d+(?:\\.\\d+)?)\\s+"
    }
    }
    }
    // Filter.mutate.split
    filter {
    split {
    field => "message"
    terminator => "#"
    }
    }
    连接器处理方式: 通过选择相应的数据解析模式 ,并一键点击即可预览:

    日期格式处理

    logstash 处理方式:
    // Filter.date
    filter {
    date {
    match => ["client_time", "yyyy-MM-dd HH:mm:ss"]
    }
    }
    连接器处理方式:
    1.1 可以通过预设系统当前时间给某字段赋值:
    1.2 通过 处理 value 功能来对数据内容进行处理:
    

    解析内部 JSON 结构

    logstash 处理方式:
    // Filter.json
    filter {
    json {
    source => "message"
    target => "jsoncontent"
    }
    }
    连接器处理方式: 通过对某字段选择 MAP 操作来对其进行解析,从而把特定字段解析为 JSON 格式:

    数据修改

    logstash 处理方式:
    // Filter.mutate.convert
    filter {
    mutate {
    convert => ["request_time", "float"]
    }
    }
    // Filter.mutate.gsub
    filter {
    mutate {
    gsub => ["urlparams", ",", "_"]
    }
    }
    
    // Filter.mutate.strip
    filter {
    mutate {
    strip => ["field1", "field2"]
    }
    }
    
    // Filter.mutate.join
    filter {
    mutate {
    join => { "fieldname" => "," }
    }
    }
    
    连接器处理方式: 通过选择相应的处理 value 功能一键定义规则:
    
    1.1 通过选择数据格式一键更改相应字段的数据格式:
    1.2 通过 JSONPATH 语法实现 join 的拼接功能,如是用 $.concat($.data.Response.SubnetSet[0].VpcId,&quot;#&quot;,$.data.Response.SubnetSet[0].SubnetId,&quot;#&quot;,$.data.Response.SubnetSet[0].CidrBlock)) 语法拼接 Vpc 和子网的属性,并且通过 # 字符加以分割。
    1.3 结果如下:

    字段修改

    logstash 处理方式:
    // Filter.mutate.rename
    filter {
    mutate {
    rename => ["syslog_host", "host"]
    }
    }
    // Filter.mutate.update
    filter {
    mutate {
    update => { "sample" => "My new message" }
    }
    }
    // Filter.mutate.replace
    filter {
    mutate {
    replace => { "message" => "%{source_host}: My new message" }
    }
    }
    // Filter.mutate.add_field
    filter {
    mutate {
    split => { "hostname" => "." }
    add_field => { "shortHostname" => "%{[hostname][0]}" }
    }
    }
    // Filter.mutate.remove_field
    filter {
    mutate {
    remove_field => ["field_name"]
    }
    }
    // Filter.mutate.copy
    filter {
    mutate {
    copy => { "source_field" => "dest_field" }
    }
    }
    连接器处理方式:

    实际案例演示

    案例1:多级字段解析

    输入 message:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "beat": {
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "version": "5.6.9"
    },
    "input_type": "log",
    "message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
    "offset": 3030131,
    }
    目标 message :
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "userId": 888,
    "userName": "testUser"
    }
    连接器配置方法:
    1.1 处理链 1 配置如下:
    1.2 处理链 1 结果如下:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
    "hostname": "test-server",
    "ip": "6.6.6.6"
    }
    1.3 处理链 2 配置如下:
    1.4 处理链 2 结果如下:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "userId": 888,
    "userName": "testUser"
    }

    案例2:非 JSON 数据解析

    输入 message :
    region=Shanghai$area=a1$server=6.6.6.6$user=testUser$timeStamp=2022-02-26T22:25:33.210Z
    目标 message:
    {
    "region": "Shanghai",
    "area": "a1",
    "server": "6.6.6.6",
    "user": "testUser",
    "timeStamp": "2022-02-27 06:25:33",
    "processTimeStamp": "2022-06-27 11:14:49"
    }
    连接器配置方法:
    1.1 使用分隔符 $ 对原始 message 进行解析
    1.2 初步解析结果:
    {
    "0": "region=Shanghai",
    "1": "area=a1",
    "2": "server=6.6.6.6",
    "3": "user=testUser",
    "4": "timeStamp=2022-02-26T22:25:33.210Z"
    }
    1.3 使用分隔符 = 对结果二次解析:
    1.4 二次解析结果:
    {
    "0": "region=Shanghai",
    "1": "area=a1",
    "2": "server=6.6.6.6",
    "3": "user=testUser",
    "4": "timeStamp=2022-02-26T22:25:33.210Z",
    "0.region": "Shanghai",
    "1.area": "a1",
    "2.server": "6.6.6.6",
    "3.user": "testUser",
    "4.timeStamp": "2022-02-26T22:25:33.210Z"
    }
    1.5 对字段进行编辑、删减,调整时间戳格式,并新增当前系统时间字段:
    最终结果:
    {
    "region": "Shanghai",
    "area": "a1",
    "server": "6.6.6.6",
    "user": "testUser",
    "timeStamp": "2022-02-27 06:25:33",
    "processTimeStamp": "2022-06-27 11:14:49"
    }
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持