⼀种HudionFlink动态同步元数据变化的⽅法
⽂章⽬录
⼀、背景
⼀个需求,需要同步MySQL数据到Hive,包括DDL与DML,所以需要动态同步元数据变化。
⼆、官⽅Schema Evolution例⼦
三、Flink + Hudi实现Schema Evolution
由于多种原因(省略⼀万字)选择了Flink+Hudi,且为了实现⼀些逻辑更⾃由,选择了DataStream API⽽不是Flink SQL,在⾥从⼤佬处了解到Hudi中使⽤DataStream API操作的类org.apache.hudi.streamer.HoodieFlinkStreamer。通过两次启动任务传⼊的修改后的Avro Schema,也能实现官⽅⽂档Schema Evolution中例⼦类似的功能。
但是按这种⽅式,只能通过重新定义schema并重启Flink任务,才能将源表新增的列同步到⽬标Hive表中,⽆法在启动任务后⾃动同步schema中未定义的源表中新增的列。所以需要对HoodieFlinkStreamer的功能进⾏改进。先说⽅案,经过对HoodieFlinkStreamer分析,其中⽤到的⼀些主要Function(Source
、Map、Sink等)都是在最初定义时传⼊参数配置类,要么在构造⽅法中、要么在open⽅法中,根据配置(包含schema)⽣成deserialer、converter、writeClient等对数据进⾏反序列化、转换、保存的实例,由于是根据最初schema⽣成的实例,即使数据中有新增的字段,转换后新增的字段也没有保留。所以采⽤的⽅式是,在各个Function处理数据的⽅法中,判断如果数据中的schema与当前Function中的schema不⼀致(⼀种简单的优化),就使⽤数据中的schema重新⽣成这些deserialer、converter、writeClient,这样数据经过处理后,就有新增的字段。⽅法很简单,主要是需要了解⼀下HoodieFlinkStreamer的处理流程。
四、HoodieFlinkStreamer流程浅析及扩展⽅法
调试环境:可先在Idea中建个Flink Demo/QuickStart项⽬,导⼊Hudi的hudi-flink-bundle模块、及对应的Hadoop、Hive相关依赖。先在Idea中操作⽅便调试、以及分析依赖冲突等问题。当前使⽤分⽀release-0.10.0,Hive版本2.1.1-cdh6.3.0,hadoop版本3.0.0-
cdh6.3.0。
扩展可将流程中关键的类(及必须依赖的类)从Hudi源码拷贝出来(或集成,视各类的依赖关系⽽定),修改相应逻辑,再使⽤修改后的类作为处理函数
经过亿些分析及调试后,梳理出HoodieFlinkStreamer的⼤致流程如下图。
4.1 FlinkKafkaConsumer
Function功能说明
数据来源,从Kafka中读取数据,HoodieFlinkStreamer类中,指定的反序列化类
为org.apache.flink.formats.json.JsonRowDataDeserializationSchema,将Json数据转换为org.apache.flink.table.data.RowData。
处理逻辑扩展
对source函数扩展,主要就是修改反序列化类,数据可选择Debezium发送到Kafka中的带Schema格式的Json数据,反序列化类使
⽤org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema并修改部分逻辑,
在DebeziumJsonDeserializationSchema的deserialize(byte[] message, Collector<RowData> out)⽅法中,先通过message获取到数据中的schema,再参考构造⽅法重新⽣成成员变量this.jsonDeserializer、adataConverters。
输出扩展
DebeziumJsonDeserializationSchema本⾝输出的是实现了RowData接⼝的org.apache.flink.table.data.GenericRowData(逻辑在emitRow⽅法中),改为通过复制或继承GenericRowData定义的SchemaWithRowData,添加⼀个字符串成员变量保存当前数据的schema,以便下游的函数能根据schema重新⽣成数据处理实例。
4.2 RowDataToHoodieFunction
Function功能说明
将DataStream转换为后⾯HudiAPI操作需要的DataStream。
处理逻辑扩展
在O map(I i)⽅法中,⾸先根据上游发来的SchemaWithRowData中的schema,参考open⽅法重新⽣成verter等成员变量。
RowDataToHoodieFunction类中有⼀个org.figuration.Configuration类型的成员变量config,保存了任务配置的参数,后⾯流程中函数基本都有这个成员变量,且很多函数也从该配置中读取schema信息,所以在更新schema时,可以⾸先设
置fig.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema);(任务启动通过--source-avro-schema传⼊参数,所以schema存在config的FlinkOptions.SOURCE_AVRO_SCHEMA这个key中)。后续不再赘述。
输出扩展
与前⾯类似,为了让下游函数获取到schema,在toHoodieRecord⽅法中,修改返回值为继承了HoodieRecord类的带有schema信息的⾃定义类SchemaWithHoodieRecord。
4.3 StreamWriteFunction
(其实前⾯还有个BucketAssignerFunction,看起来没有直接修改或转换当前从流中接收到的数据的各字段值,只是设置了location。也添加了更新schema逻辑,重新⽣成了bucketAssigner成员变量。)
Function功能说明
将流中的数据写⼊HDFS。数据缓存在this.buckets中,由bufferRecord⽅法的注释可知,缓存的记录数⼤
于FlinkOptions.WRITE_BATCH_SIZE配置的值、或缓冲区⼤⼩⼤于FlinkOptions.WRITE_TASK_MAX_SIZE时,调⽤flushBucket将缓存的数据写⼊⽂件。
在每次checkpoint时,snapshotState⽅法也会调⽤flushRemaining⽅法将缓存的记录写⼊⽂件。
处理逻辑扩展
仍然在processElement⽅法中,⾸先通过接收到的SchemaWithHoodieRecord中的schema信息,更新this.writeClient,先关闭再重新⽣成。
输出扩展
Hudi官⽅代码中,只在processElement中调⽤了bufferRecord(所以图中画的虚线)。为了让下游的compact和clean函数接收到新的schema,可直接转发:llect(value);(可优化,⽐如只传schema)
改到当前Function为⽌,数据已经能写⼊⽂件(MOR表的log⽂件、COW表的parquet⽂件),但是在Hive中查询不出来
结合StreamWriteFunction类的注释,及⼀些⽇志,及⼀些调试分析。了解到数据写⼊⽂件后,会通知StreamWriteOperatorCoordinator保存hudi表相关参数,如提交instant更新Timeline相关记录,相关元数据等,应该就是操作hudi表⽬录下的.hoodie⽬
录。StreamWriteFunction在flushBucket和flushRemaining⽅法最后调
⽤this.eventGateway.sendEventToCoordinator(event);将org.apache.hudi.sink.event.WriteMetadataEvent发
到StreamWriteOperatorCoordinator。StreamWriteOperatorCoordinator在org.apache.hudi.sinkmon.WriteOperatorFactory的getCoordinatorProvi der⽅法中实例化,也是传⼊的初始配置,为了能保存最新的元数据,所以也要将schema发过去,在StreamWriteOperatorCoordinator中主要使⽤WriteMetadataEvent的writeStatuses成员变量,所以将schema存在writeStatuses中。
4.4 StreamWriteOperatorCoordinator
Function功能说明
主要逻辑在notifyCheckpointComplete⽅法中,即每次checkpoint完成后执⾏,总体分为2部分,commitInstant和hive同步。
commitInstant⽅法中,从eventBuffer中读取WriteMetadataEvent的writeStatus,若前⾯的步骤中真的有数据处理,这⾥获取到
的writeResults不为空,则调⽤doCommit⽅法提交相关信息。
如果commitInstant真的提交了数据,返回true,则会调⽤syncHiveIfEnabled⽅法执⾏hive同步操作。最终其实调⽤
到HiveSyncTool的syncHoodieTable⽅法,从这个⽅法可以看到Hive同步⽀持的⼀些功能,⾃动建数据库、⾃动建数据表、⾃动建分区;元
从Hive metastore查出的表的元数据对⽐,如果不同则将元数据变化同步到hudi表最新的提交中的元数据与从Hive metastore查出的表的元数据对⽐
数据同步功能通过将hudi表最新的提交中的元数据
Hive metastore中。
处理逻辑扩展
上⾯提到,commitInstant⽅法中,如果writeResults不为空,则会调⽤doCommit⽅法,所以在调⽤doCommit之前添加更新schema的逻辑,从⾃定义的SchemaWithWriteStatus中读取schema,参考start⽅法的逻辑重新⽣成writeClient。StreamWriteOperatorCoordinator修改后,在之前数据已经写⼊⽂件的基础上,新增的字段、修改的类型已经能同步到hudi表(指.hoodie⽬录)及hive metastore中,在Hive中COW表也能正常查询。但是对于MOR表,新增的字段只是写到了增量⽇志⽂件中,读
优化表(_ro)查不到新增字段的数据,所以还要修改Compaction处理类。
4.5 Compaction及Clean类
如上⾯流程图,compaction分三步:⽣成压缩计划、执⾏压缩、提交压缩执⾏结果。都是类似的操作,先是CompactionPlanOperator接收到DataStream<SchemaWithHoodieRecord>,更新元数据,后续的CompactionPlanEvent和CompactionCommiEvent也可带上schema,更新各⾃的table、writeClient等成员。CleanFunction也类似。
到⽬前为⽌,MOR表的读优化表在Hive也能查询到新增列的数据,历史parquet⽂件中没有新增字段,查询结果中新增字段为null。但是实时表(_rt)查询还有点问题。如果查询rt表涉及历史的parquet⽂件(没有新增字段,⾄于为什么肯定是Parquet⽂件,后⾯会说到通过调试发现,如果以后发现有其他情况再补充),则会报类似这样的错误:
"Field new_col2 not found in log schema. Query cannot proceed! Derived Schema Fields: ..."
五、MOR rt表查询bug解决
5.1 分析
在Hudi源码中搜索该报错信息,到两个位置,实时表对应的位置
是org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils#generateProjectionSchema:
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
List<String> fieldNames) {
/**
* ......
*/
List<Schema.Field> projectedFields = new ArrayList<>();
for (String fn : fieldNames) {
Schema.Field field = (fn.toLowerCase());
if (field == null) {
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
} else {
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
}
}
Schema projectedSchema = Name(), Doc(),
projectedSchema.setFields(projectedFields);
return projectedSchema;
}
遍历了fieldNames,如果schemaFieldsMap中不到这个字段则报错,所以fieldNames包含新增的字段,schemaFieldsMap为历史parquet⽂件中读取出来的字段信息,
该⽅法在org.apache.altime.AbstractRealtimeRecordReader#init 中被调⽤。源码中也写了⼀个TODO还没有DO:
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
即未来基于⽇志⽂件更新reader schema,并且会⽀持将新增的字段置为空值。
从AbstractRealtimeRecordReader#init⽅法中看到,HoodieRealtimeRecordReaderUtils#generateProjectionSchema的fieldNames参数从jobConf 中读取,包含新增的字段。通过,发现jobConf的properties中以下⼏个key的值包含字段信息(new_col2为新增字段):
hive.adcolumn.names -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_n ame,last_name,alias,new_col,new_col2
lumns -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_name ,
last_name,alias,new_col,new_col2
serialization.ddl -> struct customers1_rt { string _hoodie_commit_time, string _hoodie_commit_seqno, string _hoodie_record_key, string _hoodie_partition _path, string _hoodie_file_name, i32 id, string first_name, string last_name, string alias, double new_col, double new_col2}
pes -> string,string,string,string,string,int,string,string,string,double,double
fieldNames参数就使⽤到了hive.adcolumn.names的值。lumns中包含了新增字段,且与之对应
的pes中包含了字段的类型(看起来是Hive中的类型)。所以尝试
将HoodieRealtimeRecordReaderUtils#generateProjectionSchema中抛异常的位置改为在projectedFields中仍然添加⼀个字段,默认值为null,字段schema通过pes中的类型转换⽽来。sql优化的几种方式
(经过测试,即使不解决这个bug,更新⼀下历史的数据也可以,但是实际情况中肯定不会⽤这种⽅法)
5.2 修改
AbstractRealtimeRecordReader#init⽅法中调⽤HoodieRealtimeRecordReaderUtils#generateProjectionSchema的位置改成:
readerSchema = ateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields,
<("lumns"), ("pes"));
HoodieRealtimeRecordReaderUtils#generateProjectionSchema改为:
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
List<String> fieldNames, String csColumns, String csColumnTypes){
/**
* ...
*/
List<Schema.Field> projectedFields =new ArrayList<>();
Map<String, Schema.Field> fieldMap =getFieldMap(csColumns, csColumnTypes);
for(String fn : fieldNames){
Schema.Field field = (fn.toLowerCase());
if(field ==null){
// throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
// + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
projectedFields.(fn));
}else{
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
}
}
Schema projectedSchema = Name(), Doc(),
projectedSchema.setFields(projectedFields);
return projectedSchema;
}
其中getFieldMap⽅法为:
private static Map<String, Schema.Field> getFieldMap(String csColumns, String csColumnTypes) {
LOG.info(String.format("columns:%s\ntypes:%s", csColumns, csColumnTypes));
Map<String, Schema.Field> result = new HashMap<>();
String[] columns = csColumns.split(",");
String[] types = csColumnTypes.split(",");
for (int i = 0; i < columns.length; i++) {
String columnName = columns[i];
result.put(columnName, new Schema.Field(columnName,toSchema(types[i]), null, null));
}
return result;
}
private static Schema toSchema(String hiveSqlType) {
switch (LowerCase()) {
case "boolean":
ate(Schema.Type.BOOLEAN);
case "byte":
case "short":
case "integer":
ate(Schema.Type.INT);
case "long":
ate(Schema.Type.LONG);
case "float":
ate(Schema.Type.FLOAT);
case "double":
case "decimal":
ate(Schema.Type.DOUBLE);
case "binary":
ate(Schema.Type.BYTES);
case "string":
case "char":
case "varchar":
case "date":
case "timestamp":
default:
ate(Schema.Type.STRING);
}
}
修改后,重新将依赖包部署到Hive中,rt表也能正常查询。从parquet⽂件中读取的数据,新增的字段则显⽰的空值六、总结
通过这种⽅法,实现了元数据动态同步到Hive。
对有些地⽅的源码细节了解得还不透彻,以后可以再多看看,以便能发现和解决更多的问题。
期待Hudi官⽅以后新增加这种功能,看是否有更好的⽅案。
参考资料
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论