Ckafka 连接器的数据处理功能提供了根据正则表达式提取消息内容的能力,正则提取采用的是开源的正则提取包 re2 。 Java 的标准正则表达式包 java.util.regex
以及其他被广泛使用的正则表达式包如 PCRE、Perlre和 Python(re),都使用回溯实现策略,即当一个 pattern 出现两个替代方案a|b
的时候,引擎将首先尝试匹配子模式a
,如果匹配失败,它将重置输入流并尝试匹配子模式 b
。
如果这种匹配模式是深度嵌套的,则此策略需要对输入数据进行指数级的嵌套解析。如果输入的字符串很长,则匹配时间可以趋向无穷大。
相比之下,RE2J 算法通过使用非确定有限自动机在输入数据的单次解析中同时检查所有匹配项,从而实现在线性时间完成正则匹配。
数据处理中的正则提取适用于对长数组类型的消息进行特定字段的提取,下面展示几种常见的提取模式。
案例1:对手机号字段进行提取
输入 message:
{"message":
[
{"email":123456@qq.com,"phoneNumber":"13890000000","IDNumber":"130423199301067425"},
{"email":123456789@163.com,"phoneNumber":"15920000000","IDNumber":"610630199109235723"},
{"email":usr333@gmail.com,"phoneNumber":"18830000000","IDNumber":"42060219880213301X"}
]
}
目标 message:
{
"0": "\\"phoneNumber\\":\\"13890000000\\"",
"1": "\\"phoneNumber\\":\\"15920000000\\"",
"2": "\\"phoneNumber\\":\\"18830000000\\""
}
使用的正则表达式为:
"phoneNumber":"(13[0-9]|14[5|7]|15[0|1|2|3|5|6|7|8|9]|18[0|1|2|3|5|6|7|8|9])\\d{8}"
案例2:对 Email 字段进行提取
输入 message:
{"message":
[
{"email":123456@qq.com,"phoneNumber":"13890000000","IDNumber":"130423199301067425"},
{"email":123456789@163.com,"phoneNumber":"15920000000","IDNumber":"610630199109235723"},
{"email":usr333@gmail.com,"phoneNumber":"18830000000","IDNumber":"42060219880213301X"}
]
}
目标 message:
{
"0": "\\"email\\":\\"123456@qq.com\\"",
"1": "\\"email\\":\\"123456789@163.com\\"",
"2": "\\"email\\":\\"usr333@gmail.com\\""
}
使用的正则表达式为:
"email":"\\w+([-+.]\\w+)*@\\w+([-.]\\w+)*\\.\\w+([-.]\\w+)*"
案例3:对身份证字段进行提取
输入 message:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"operation": "INSERT",
"operator": "admin",
"message": "{\\"email\\":\\"123456@qq.com\\",\\"phoneNumber\\":\\"13890000000\\",\\"IDNumber\\":\\"130423199301067425\\"},{\\"email\\":\\"123456789@163.com\\",\\"phoneNumber\\":\\"15920000000\\",\\"IDNumber\\":\\"610630199109235723\\"},{\\"email\\":\\"usr333@gmail.com\\",\\"phoneNumber\\":\\"18830000000\\",\\"IDNumber\\":\\"42060219880213301X\\"}"
}
目标 message,这里希望保留外部字段,并将message字段中的N个 IDNumber字段单独提取出来:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"operation": "INSERT",
"operator": "admin",
"message.0": "130423199301067425",
"message.1": "610630199109235723",
"message.2": "42060219880213301X"
}
使用的正则表达式为:
[1-9]\\d{5}(18|19|20)\\d{2}((0[1-9])|(1[0-2]))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]
这里通过多个处理链进行处理,处理链1的处理结果为:
此时需要对message字段进行二次处理,处理链2的处理结果如下:
处理结果:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"operation": "INSERT",
"operator": "admin",
"message.0": "130423199301067425",
"message.1": "610630199109235723",
"message.2": "42060219880213301X"
}
这里将需要的 IDNumber 字段提取出来后,删除了原来的 message 字段,保留了外部需要的 operation 等字段,以及 message 中的 N 个需要的数据信息。
本页内容是否解决了您的问题?