tencent cloud

피드백

Data Processing Rule Description

마지막 업데이트 시간:2024-11-07 11:40:36

    Overview

    It is usually necessary to perform some cleaning operations on data, such as formatting raw data, parsing specific fields, and converting data format, when CKafka Connector is used to handle data transfer tasks. Developers often need to build their own data cleaning services (ETL).
    Logstash is a free and open server-side data processing pipeline that can collect data from multiple sources, transform the data, and send the data to the respective "storage". Logstash has many filter plugins, making it a widely used and powerful data transformation tool.
    However, building, configuring, and maintaining Logstash services increases the difficulty of development and Ops. Therefore, CKafka provides data processing services comparable to Logstash. Developers only need to create data processing tasks in the console. The data processing services allow users to edit the corresponding data processing rules, create chained processing tasks, and preview the data processing result.
    

    Feature List

    Logstash
    Data Processing Services of Connector
    Feature
    Codec.json
    Filter.grok
    Filter.mutate.split
    Filter.date
    Filter.json
    Filter.mutate.convert
    Filter.mutate.gsub
    Filter.mutate.strip
    Filter.mutate.join
    Filter.mutate.rename
    Filter.mutate.update
    Filter.mutate.replace
    Filter.mutate.add_field
    Filter.mutate.remove_field
    Filter.mutate.copy
    Filter.mutate.merge
    
    TODO
    Filter.mutate.uppercase
    
    TODO
    Filter.mutate.lowercase
    
    TODO

    Introduction to Operation Methods

    Data Parsing

    Logstash processing method:
    // Codec.json
    input {
    file {
    path => "/var/log/nginx/access.log_json""
    codec => "json"
    }
    }
    // Filter.grok
    filter {
    grok {
    match => {
    "message" => "\\s+(?<request_time>\\d+(?:\\.\\d+)?)\\s+"
    }
    }
    }
    // Filter.mutate.split
    filter {
    split {
    field => "message"
    terminator => "#"
    }
    }
    Connector processing method: Select the corresponding data parsing mode, and click the button to preview the result:

    Date Format Processing

    Logstash processing method:
    // Filter.date
    filter {
    date {
    match => ["client_time", "yyyy-MM-dd HH:mm:ss"]
    }
    }
    Connector processing method:
    1.1 Assign a value to a field by presetting the current system time:
    1.2 Process the data by using the Process value feature:
    

    Internal JSON Struct Parsing

    Logstash processing method:
    // Filter.json
    filter {
    json {
    source => "message"
    target => "jsoncontent"
    }
    }
    Connector processing method: Select the Map operation for a specific field to parse the field into a JSON object:

    Data Modification

    Logstash processing method:
    // Filter.mutate.convert
    filter {
    mutate {
    convert => ["request_time", "float"]
    }
    }
    // Filter.mutate.gsub
    filter {
    mutate {
    gsub => ["urlparams", ",", "_"]
    }
    }
    
    // Filter.mutate.strip
    filter {
    mutate {
    strip => ["field1", "field2"]
    }
    }
    
    // Filter.mutate.join
    filter {
    mutate {
    join => { "fieldname" => "," }
    }
    }
    
    Connector processing method: Select the corresponding value processing mode to set the rule:
    
    1.1 Select the data format to change the data format of the corresponding field with one click:
    1.2 Use JSONPath syntax to concatenate elements. For example, use the $.concat($.data.Response.SubnetSet[0].VpcId,&quot;#&quot;,$.data.Response.SubnetSet[0].SubnetId,&quot;#&quot;,$.data.Response.SubnetSet[0].CidrBlock)) syntax to concatenate VPC and subnet attributes, which can be separated with the # character.
    1.3 The result is as follows:

    Field Modification

    Logstash processing method:
    // Filter.mutate.rename
    filter {
    mutate {
    rename => ["syslog_host", "host"]
    }
    }
    // Filter.mutate.update
    filter {
    mutate {
    update => { "sample" => "My new message" }
    }
    }
    // Filter.mutate.replace
    filter {
    mutate {
    replace => { "message" => "%{source_host}: My new message" }
    }
    }
    // Filter.mutate.add_field
    filter {
    mutate {
    split => { "hostname" => "." }
    add_field => { "shortHostname" => "%{[hostname][0]}" }
    }
    }
    // Filter.mutate.remove_field
    filter {
    mutate {
    remove_field => ["field_name"]
    }
    }
    // Filter.mutate.copy
    filter {
    mutate {
    copy => { "source_field" => "dest_field" }
    }
    }
    Connector processing method:

    Examples

    Example 1: Multi-Level Field Parsing

    Input message:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "beat": {
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "version": "5.6.9"
    },
    "input_type": "log",
    "message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
    "offset": 3030131,
    }
    Output message :
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "userId": 888,
    "userName": "testUser"
    }
    Connector configuration:
    1.1 The configuration of processing chain 1 is as follows:
    1.2 The result of processing chain 1 is as follows:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
    "hostname": "test-server",
    "ip": "6.6.6.6"
    }
    1.3 The configuration of processing chain 2 is as follows:
    1.4 The result of processing chain 2 is as follows:
    {
    "@timestamp": "2022-02-26T22:25:33.210Z",
    "input_type": "log",
    "hostname": "test-server",
    "ip": "6.6.6.6",
    "userId": 888,
    "userName": "testUser"
    }

    Example 2: Non-JSON Data Parsing

    Input message:
    region=Shanghai$area=a1$server=6.6.6.6$user=testUser$timeStamp=2022-02-26T22:25:33.210Z
    Output message:
    {
    "region": "Shanghai",
    "area": "a1",
    "server": "6.6.6.6",
    "user": "testUser",
    "timeStamp": "2022-02-27 06:25:33",
    "processTimeStamp": "2022-06-27 11:14:49"
    }
    Connector configuration:
    1.1 Use delimiter $ to parse the original message:
    1.2 The preliminary parsing result is as follows:
    {
    "0": "region=Shanghai",
    "1": "area=a1",
    "2": "server=6.6.6.6",
    "3": "user=testUser",
    "4": "timeStamp=2022-02-26T22:25:33.210Z"
    }
    1.3 Use delimiter = for further parsing:
    1.4 The parsing result is as follows:
    {
    "0": "region=Shanghai",
    "1": "area=a1",
    "2": "server=6.6.6.6",
    "3": "user=testUser",
    "4": "timeStamp=2022-02-26T22:25:33.210Z",
    "0.region": "Shanghai",
    "1.area": "a1",
    "2.server": "6.6.6.6",
    "3.user": "testUser",
    "4.timeStamp": "2022-02-26T22:25:33.210Z"
    }
    1.5 Edit and delete fields, adjust the timestamp format, and add the field of the current system time:
    The final result is as follows:
    {
    "region": "Shanghai",
    "area": "a1",
    "server": "6.6.6.6",
    "user": "testUser",
    "timeStamp": "2022-02-27 06:25:33",
    "processTimeStamp": "2022-06-27 11:14:49"
    }
    
    
    문의하기

    고객의 업무에 전용 서비스를 제공해드립니다.

    기술 지원

    더 많은 도움이 필요하시면, 티켓을 통해 연락 바랍니다. 티켓 서비스는 연중무휴 24시간 제공됩니다.

    연중무휴 24시간 전화 지원