tencent cloud

文档反馈

数据简单清洗

最后更新时间:2024-01-09 14:56:36

    操作场景

    在使用 DataHub 在进行数据流入流出服务时,可能会遇到如下情况:
    需要对消息中的特定字段进行解析,得到相关信息。
    需要多次迭代处理消息中某一字段。
    需要处理为未经过结构化的原始消息后,才能继续使用。
    需要处理多层嵌套格式的消息。
    目前 CKafka 团队的技术专家经过长期的经验总结,全新推出了数据处理 V2.0 版本。 该版本在完美兼容原有数据处理 V1.0 的基础上,还增加了以下特性:
    插件丰富:数据处理支持丰富的处理预制插件,利于快速生成所需的消费格式。
    链式处理:数据处理支持消息链式处理,即上一次的处理结果能够作为本次处理的输入参数,利于处理复杂结构。
    可视预览:数据处理支持实时 JSON 结构化预览,利于迅速定位排查问题。
    类型转换:数据处理支持不同数据类型间的转换,利于校验数据格式。

    运行原理

    整个数据处理过程如下图所示,各个组件结构说明如下:
    
    
    
    数据处理组件集群中,多个 worker 共同组成一个消费者组,批量读取源 topic 中的消息,并且对于每条消息顺序执行处理操作。
    对于每条消息,根据控制台设置的处理链顺序,依次展开消息字段的嵌套结构,并进行替换/截取/数据转换/时间格式化等操作。
    下一条处理链读取上一条处理链结果,并且进行本轮的链式处理,并将处理结果投递到下一轮。
    执行完最后一轮处理链后的结果,投递到设置的目标 topic 中,至此完成该条消息的数据处理操作。

    前提条件

    需开通 CKafka 服务。
    需消息格式为 JSON 格式或字符串格式,目前暂不支持其他编码协议。

    实际应用

    处理字符串类型格式日志

    Nginx 格式日志通常如下所示,这种 Nginx 默认格式也被称作 combined,可以从中解析出请求者信息,报文信息,请求状态等信息,以便于对数据进行进一步分析。
    66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1" 404 177 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    
    由于 Nginx 的 combined 格式采用空格和 - 分隔符来分隔数据,因此按照下文顺序设计解析流程:
    1. 首先使用 "分隔符 进行初步数据解析,此时数据处理将会自动将日志转换成 JSON 结构
    {
    "0": "66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] ",
    "1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
    "2": " 404 177 ",
    "5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    }
    
    2. 从上文 JSON 结构中可知,键名为 02 的字段由于 - 和空格的影响,还存在连接的耦合数据。因此再分别对这两个键进行 - 和空格进行分隔符拆分。 拆分后的 JSON 结果如下所示:
    {
    "1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
    "5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
    "0.0": "66.249.65.159 ",
    "0.2": " [06/Nov/2014:19:10:38 +0600] ",
    "2.1": "404",
    "2.2": "177"
    }
    
    3. 由于时间格式前后还带有 [] 方括号,再使用一次分隔符进行截取,截取后的 JSON 如下所示:
    {
    "1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
    "5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
    "0.0": "66.249.65.159 ",
    "0.2": "06/Nov/2014:19:10:38 +0600",
    "2.1": "404",
    "2.2": "177"
    }
    
    4. 由于所有字段都已经被合适拆分,最后根据对应字段属性,给相应映射字段的 key 设置名称即可。修改后最终结果如下所示:
    {
    "request": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
    "http_user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
    "remote_addr": "66.249.65.159 ",
    "dateTime": "06/Nov/2014:19:10:38 +0600",
    "status": "404",
    "body_bytes_sent ": "177"
    }
    

    处理嵌套类型格式日志

    TKE 采集格式通常如下所示,TKE 采集器会将 metadata 数据放入 JSON 结构中的 kubernetes 字段中,采集到的日志放入 log 字段中。大致结构如下所示:
    {
    "@timestamp": 1648803500.63659,
    "@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
    "log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
    "kubernetes": {
    "pod_name": "tke-es-687995d557-n29jr",
    "namespace_name": "default",
    "pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
    "labels": {
    "k8s-app": "tke-es",
    "pod-template-hash": "687995d557",
    "qcloud-app": "tke-es"
    },
    "annotations": {
    "qcloud-redeploy-timestamp": "1648016531476",
    "tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
    },
    "host": "10.0.96.47",
    "container_name": "nginx",
    "docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
    "container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
    "container_image": "nginx"
    }
    }
    
    当日志采集后投递进 ElasticSearch 时,由于支持嵌套 JSON 结构,因此不需要对数据做出太大改动。但当需要投递到数据库时,此时就需要将数据转换成能够识别的单层 JSON 格式,因此按照下文顺序设计解析流程:
    1. 使用 JSONPATH 语句处理第一层嵌套的 JSON 结构 $.kubernetes,解析模式选择 JSON。首先将嵌套 JSON 结构转换为单层 JSON 结构。测试后结果如下所示:
    {
    "@timestamp": 1.64880350063659E9,
    "@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
    "log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
    "$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
    "$.kubernetes.namespace_name": "default",
    "$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
    "$.kubernetes.labels": {
    "k8s-app": "tke-es",
    "pod-template-hash": "687995d557",
    "qcloud-app": "tke-es"
    },
    "$.kubernetes.annotations": {
    "qcloud-redeploy-timestamp": "1648016531476",
    "tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
    },
    "$.kubernetes.host": "10.0.96.47",
    "$.kubernetes.container_name": "nginx",
    "$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
    "$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
    "$.kubernetes.container_image": "nginx"
    }
    
    2. 依次处理第二层嵌套结构 $.kubernetes.annotations$.kubernetes.labels。在处理链中使用 Map 方式选中这两个名称,即可将嵌套格式转换成单层 JSON 格式。处理后如下所示:
    {
    "@timestamp": 1648803500.63659,
    "@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
    "log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
    "$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
    "$.kubernetes.namespace_name": "default",
    "$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
    "$.kubernetes.host": "10.0.96.47",
    "$.kubernetes.container_name": "nginx",
    "$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
    "$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
    "$.kubernetes.container_image": "nginx",
    "$.kubernetes.labels.k8s-app": "tke-es",
    "$.kubernetes.labels.pod-template-hash": "687995d557",
    "$.kubernetes.labels.qcloud-app": "tke-es",
    "$.kubernetes.annotations.qcloud-redeploy-timestamp": "1648016531476",
    "$.kubernetes.annotations.tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
    }
    
    3. 最后修改相应映射字段的 key 为所需名称,并且删去不需要的字段。此时单击添加处理链后,打开处理上层所有结果的按钮,整理优化后可以参考如下所示:
    {
    "@timestamp": 1.64880350063659E9,
    "@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
    "log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
    "pod_name": "tke-es-687995d557-n29jr",
    "namespace_name": "default",
    "pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
    "host": "10.0.96.47",
    "container_name": "nginx",
    "docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba"
    }
    
    说明:
    在使用 JSONPath 处理参数时,当 key 中本身带有英文句号 . 时,需要在路径中方括号和单引号进行隔离。
    例如 {"key1.key2":"value1"} 中,要想取得对应字段,则需要使用 $.['key1.key2'] 进行获取相应键值。

    处理字符串序列化 JSON 格式日志

    有时 JSON 格式在传输过程中,由于格式或性能等需要,需要转义成字符串格式的形式,此处把这种格式叫做 Raw JSON。需要在数据处理中重新将字符串反序列化成 JSON 格式。 此处以 MongoDB 中的 Raw JSON 格式为例,大致结构如下所示:
    {
    "key": " {\\n \\"categories\\": [\\"dev\\"],\\n \\"created_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"icon_url\\": \\"https://assets.chucknorris.host/img/avatar/chuck-norris.png\\",\\n \\"id\\": \\"elgv2wkvt8ioag6xywykbq\\",\\n \\"updated_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"url\\": \\"https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq\\",\\n \\"value\\": \\"Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris.\\"\\n }\\n"
    }
    
    如果在数据处理中就将 Raw JSON 转换成普通的 JSON 格式,就能按照字段格式直接投递到目标下游中。大致处理思路如下所示:
    1. 首先设置 解析模式 为 JSON。这样能够将消息先预读取成内部的 MAP 格式。解析后如下所示:
    {
    "key": " {\\n \\"categories\\": [\\"dev\\"],\\n \\"created_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"icon_url\\": \\"https://assets.chucknorris.host/img/avatar/chuck-norris.png\\",\\n \\"id\\": \\"elgv2wkvt8ioag6xywykbq\\",\\n \\"updated_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"url\\": \\"https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq\\",\\n \\"value\\": \\"Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris.\\"\\n }\\n"
    }
    
    2. 单击 添加处理链,使用 MAP 方式选中 key,解析方式选择 JSON。这样数据处理就能将 RAW JSON 自动转换成 JSON 形式。解析后如下所示:
    {
    "key.categories": [
    "dev"
    ],
    "key.created_at": "2020-01-05 13:42:19.324003",
    "key.icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
    "key.id": "elgv2wkvt8ioag6xywykbq",
    "key.updated_at": "2020-01-05 13:42:19.324003",
    "key.url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
    "key.value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
    }
    
    3. 最后单击 添加处理链 后,打开处理上层所有结果的按钮,修改相应映射字段的 key 为所需名称,并且删去不需要的字段。整理优化后可以参考如下所示:
    {
    "categories": [
    "dev"
    ],
    "created_at": "2020-01-05 13:42:19.324003",
    "icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
    "id": "elgv2wkvt8ioag6xywykbq",
    "updated_at": "2020-01-05 13:42:19.324003",
    "url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
    "value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
    }
    
    说明:
    目前 RAW JSON 只支持解析 MAP 类型的数据。当最外层为 List 类型时,例如 "[\\"test1\\",\\"test2\\"]",或者 "[{\\"key\\":\\"value\\"}]",由于无法解析合适的键值,因此将会提示解析失败。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持