FlinkSQL⾃定义UDF解析复杂JSON
2021-06-07 修改
⽩⼲了,flink 1.13 json format 可以直接解析复杂的sql,以如下格式
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,sub_json ROW(sub_name STRING, password STRING, sub_json ROW(sub_name STRING, sub_pass STRING))
) WITH (
'connector' = 'kafka'
,'topic' = 'user_b'
,'properties.bootstrap.servers' = '10.201.1.132:9092'
,'up.id' = 'user_log_1'
,'de' = 'latest-offset'
,'format' = 'json'
,'json.ignore-parse-errors' = 'false'
);
insert into mysql_table_venn_user_log_sink
SELECT user_id, item_id, category_id, sub_json.sub_name, sub_json.password, sub_json.sub_json.sub_name, sub_json.sub_json.sub_pass
FROM user_log;
-------------分割线-------------------
最近⽤ Flink 处理⼀些 json 格式数据的时候,突然发现 1.13 的 json format 没有解析复杂 SQL 的属性了
在 Flink 1.10 的时候,还写了⼀篇博客来介绍⾃定义 json 格式的写法:
这就尴尬了
没发,只能写个⾃定义的 udf 先凑合着⽤了,这两天突然有点想法,写个通⽤的 udf,来解析复杂 json
处理json 的udf 的需求是输⼊多个字段,返回多个字段,但是只有⼀⾏,只能使⽤ UDTF(flink 也就是 table functions)
类型推导
Table(类似于 SQL 标准)是⼀种强类型的 API。因此,函数的参数和返回类型都必须映射到[数据类型]({%link dev/table/types.zh.md %})。
从逻辑⾓度看,Planner 需要知道数据类型、精度和⼩数位数;从 JVM ⾓度来看,Planner 在调⽤⾃定义函数时需要知道如何将内部数据结构表⽰为 JVM 对象。
术语类型推导概括了意在验证输⼊值、派⽣出参数/返回值数据类型的逻辑。
Flink ⾃定义函数实现了⾃动的类型推导提取,通过反射从函数的类及其求值⽅法中派⽣数据类型。如果这种隐式的反射提取⽅法不成功,则可以通过使⽤ @DataTypeHint 和 @FunctionHint 注解相关参数、类或⽅法来⽀持提取过程,下⾯展⽰了有关如果需要更⾼级的类型推导逻辑,实现者可以在每个⾃定义函数中显式重写 getTypeInference() ⽅法。但是,建议使⽤注解⽅式,因为它可使⾃定义类型推导逻辑保持在受影响位置附近,⽽在其他位置则保持默认状态。
⾃动类型推导
⾃动类型推导会检查函数的类和求值⽅法,派⽣出函数参数和结果的数据类型, @DataTypeHint 和 @FunctionHint 注解⽀持⾃动类型推导。
有关可以隐式映射到数据类型的类的完整列表,请参阅[数据类型]({%link dev/table/types.zh.md %}#数据类型注解)。
官⽹⽰例:
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
// use collect(...) to emit a row
collect(Row.of(s, s.length()));
}
}
}
发现scala 的⽀持有点尴尬,因为想做⼀个解析复杂 json 的通⽤UDF,json ⾥⾯的字段是不固定的,如果将返回值放到数组中,如: collect(Row.of(arr(0), arr(1))) 这样返回会报
错: Scala collections are not supported. See the documentation for supported classes or treat them as RAW types.
改成 java 略好⼀点
改完的代码,写成这样:
public class ParseJson extends TableFunction<Row> {
public void json) {
if (json == null || json.length == 0) {
return;
}
// get column from json
String[] arr = getStrings(json);
RowKind rowKind = RowKind.fromByteValue((byte) 0);
Row row = new Row(rowKind, json.length - 1);
for (int i = 0; i < arr.length; ++i) {
row.setField(json[i + 1], arr[i]);
}
collect(row);
}
/**
* parse user columns from json and provider column name
*/
private String[] getStrings(String[] json) {
JsonObject jsonObject = new JsonParser().parse(json[0]).getAsJsonObject();
int len = json.length - 1;
String[] arr = new String[len];
for (int i = 0; i < len - 1; ++i) {
JsonElement tm = (json[i + 1]);
if (tm != null) {
arr[i] = tm.getAsString();
} else {
arr[i] = null;
}
}
return arr;
}
}
代码⽐较简单,输⼊参数是数组的,第⼀个字段是需要处理的 json 字符串,后⾯的字段是需要解析的字段
json检查解析出来的结果放在 String 字符数组中,再转为 Row
可是不能推导出 Row 的类型和名称: Cannot extract a data type from a pure 'org.pes.Row' class. Please use annotations to define field names and field types.
吐⾎啊,如果这样不能识别,需要在 annotations 上写配置对应的列名和列类型,就不通⽤了
退⽽求其次,先配置个 annotations 看下情况再说
@FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING>"))
public void eval(String json, String col1, String col2, String col3) {
if (json == null || im().length() == 0) {
return;
}
String[] inputArr = new String[3];
inputArr[0] = col1;
inputArr[1] = col2;
inputArr[2] = col3;
String[] arr = getStrings(json, inputArr);
collect(Row.of(arr[0], arr[1], arr[2]));
}
@FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING, col4 STRING>"))
public void eval(String json, String col1, String col2, String col3, String col4) {
if (json == null || im().length() == 0) {
return;
}
String[] inputArr = new String[4];
inputArr[0] = col1;
inputArr[1] = col2;
inputArr[2] = col3;
inputArr[3] = col4;
String[] arr = getStrings(json, inputArr);
collect(Row.of(arr[0], arr[1], arr[2], arr[3]));
}
测试 json 如下:
{"subJson":"{\"password\":\"pass_4\",\"doub\":\"3.1250\",\"username\":\"venn_4\"}","category_id":"category_id_4","user_id":"user_id_702","item_id":"item_id_4","behavior":"4","sort_col":"25","sales":"35","ts":"2021-05-31 14:29:53.680"}测试 SQL 如下:
insert into t_json_sink
select col1, col2, item_id, username, password, cast(doub as double) doub
from t_json a
LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'subJson')) AS T(col1, col2, item_id,subJson) ON TRUE
LEFT JOIN LATERAL TABLE(udf_parse_json(subJson, 'username', 'password', 'doub')) AS T1(username, password, doub) ON TRUE
;
这样是可以解析复杂的嵌套 json 的,但是需要穷举字段数量,因为是通⽤的 UDF,需要⽀持所有的字段
测试了⼀下三层的 json,是可⽀持的
测试 SQL 如下:
insert into t_json_sink
select category_id, user_id, item_id, cast(sort_col as int) sort_col, username, password, cast(doub as double) doub,sub_name,sub_pass
from t_json a
LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'sort_col', 'sub_json')) AS T(category_id, user_id, item_id, sort_col, sub_json) ON TRUE
LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json, 'username', 'password', 'doub', 'sub_json_1')) AS T1(username, password, doub, sub_json_1) ON TRUE
LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json_1, 'sub_name', 'sub_pass')) AS T2(sub_name, sub_pass) ON TRUE
;
看起来,多嵌套⼏层也是可以执⾏的,唯⼀的问题就是需要穷举字段数量
直接代码⽣成100个以内的所有字段的 eval 重载⽅法,发现程序不能启动了,⼀直卡在 sql 检查上
减少 eval 的重载⽅法数量,发现随着重载⽅法增多,启动越慢,10 个以上重载⽅法的启动时间(15秒)就不能接受了
挑战失败,继续翻官⽹,在数据类型章节最前⾯看到:
ROW<myField ARRAY<BOOLEAN>, myOtherField TIMESTAMP(3)>
ROW 类型,⾥⾯的字段有数组,如果把 FunctionHint 写成这样:
@FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))
直接返回⼀个数组,刚好处理过程都是⽤的数组作为流转的类型,不需要构造结果的格式了
最后的代码写成这样
@FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))
public void json) {
if (json == null || json.length == 0) {
return;
}
String[] arr = getStrings(json);
RowKind rowKind = RowKind.fromByteValue((byte) 0);
Row row = new Row(rowKind, json.length - 1);
row.setField(0, arr);
collect(row);
}
对应的SQL 如下:
insert into t_json_sink
select arr, arr[1],arr[2]
from t_json a
LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id')) AS T(arr) ON TRUE
这样也能很好的实现通⽤的复杂 json 解析
测试了下嵌套6层的 json:
{
"sub_json": "{
"sub_json":" {
"sub_json":"{
"sub_json":"{
"sub_json":"{"sub_name":"sub_5_venn_3","sub_pass":"sub_5_pass_3"}",
"sub_name":"sub_4_venn_3","sub_pass":"sub_4_pass_3"}",
"sub_name":"sub_3_venn_3","sub_pass":"sub_3_pass_3"}",
"sub_name":"sub_2_sub_venn_3","sub_pass":"sub_2_sub_pass_3"}",
"password":"sub_1_pass_3","sub_name":"sub_1_venn_3","doub":"3.1250"}",
"category_id": "category_id_3",
"user_id": "user_id_73",
"item_id": "item_id_3",
"behavior": "3",
"sort_col": "45",
"sales": "45",
"ts": "2021-06-01 13:52:44.708"
}
对应 SQL 如下:
insert into t_json_sink
select T.arr[1], T1.arr[1], T2.arr[1], T3.arr[1], T4.arr[1], T5.arr[1]
from t_json a
LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'user_id', 'sub_json')) AS T(arr) ON TRUE
LEFT JOIN LATERAL TABLE(udf_parse_json(T.arr[2], 'sub_name', 'sub_json')) AS T1(arr) ON TRUE LEFT JOIN LATERAL TABLE(udf_parse_json(T1.arr[2], 'sub_name', 'sub_json')) AS T2(arr) ON TRUE LEFT JOIN LATERAL TABLE(udf_parse_json(T2.arr[2], 'sub_name', 'sub_json')) AS T3(arr) ON TRUE LEFT JOIN LATERAL TABLE(udf_parse_json(T3.arr[2], 'sub_name', 'sub_json')) AS T4(arr) ON TRUE LEFT JOIN LATERAL TABLE(udf_parse_json(T4.arr[2], 'sub_name', 'sub_json')) AS T5(arr) ON TRUE ;
解析的结果如下:
+I[user_id_261, sub_1_venn_6, sub_2_sub_venn_6, sub_3_venn_6, sub_4_venn_6, sub_5_venn_6]
+I[user_id_262, sub_1_venn_3, sub_2_sub_venn_3, sub_3_venn_3, sub_4_venn_3, sub_5_venn_3]
+I[user_id_263, sub_1_venn_7, sub_2_sub_venn_7, sub_3_venn_7, sub_4_venn_7, sub_5_venn_7]
+I[user_id_264, sub_1_venn_5, sub_2_sub_venn_5, sub_3_venn_5, sub_4_venn_5, sub_5_venn_5]
+I[user_id_265, sub_1_venn_8, sub_2_sub_venn_8, sub_3_venn_8, sub_4_venn_8, sub_5_venn_8]
+I[user_id_266, sub_1_venn_0, sub_2_sub_venn_0, sub_3_venn_0, sub_4_venn_0, sub_5_venn_0]
执⾏没有任何问题
注:突然好久没写博客,⽔⼀篇凑个数
欢迎关注Flink菜鸟,会不定期更新Flink(开发技术)相关的推⽂
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论