hivespark的RoaringBitmap写⼊Clickhouse的bitmap
先说结论:要把hive上的bitmap数据同步到clickhouse的bitmap⾥⾯
参考连接:
1、Clickhouse的RoaringBitmap结构
⽬标是将Hive的Binary类型能顺利转成Clickhouse的Bitmap类型
Hive的Binary类型是⼆进制数组byte[]
Clickhouse的Bitmap类型是,⼀般是通过groupBitmap⽅式构建出来的,⽐如:
select
series_id,
groupBitmapState(toUInt32(dvid))  bitmap列
FROM
dms_pds_flow_interest_dvid_city_day_all
group by
series_id
其中关键sql是:groupBitmapState,源码对应位置是:AggregateFunctionGroupBitmap.cpp注册的;
这个C++代码的关键点是:
createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData>
代表通过函数:createAggregateFunctionBitmap来创建bitmap类型:AggregateFunctionGroupBitmapData 然后跟进这个AggregateFunctionGroupBitmapData类,⽂件(AggregateFunctionGroupBitmapData.h)
结构:
内部:
其中bitmap的⼀些计算函数逻辑,就是这个AggregateFunctionGroupBitmapData.h⽂件实现的;⽐如:select bitmapOrCardinality(bitmap_a , bitmap_b) 是取两个bitmap的并集;
那么实现就是:
⾔归正传,根据Rbitmap的数据结构:
1、⾸先,将 32bit int(⽆符号的)类型数据划分为 2^16 个桶(即使⽤数据的前16位⼆进制作为桶的编号),
每个桶有⼀个Container(可以理解为容器也可以理解为这个桶,容器和桶在这⾥可以理解为⼀个东西,只是说法不⼀样⽽已)来存放⼀个数值的低16位。
2、在存储和查询数值时,将数值 k 划分为⾼ 16 位和低 16 位,取⾼ 16 位值到对应的桶,
然后在将低 16 位值存放在相应的 Container 中。这样说可能⽐较抽象不易理解,下⾯以⼀个例⼦来帮助⼤家理解。
⼤概意思是,在clickhouse的Rbitmap⾥⾯,为了优化存储空间,会将⼀个32位的数据,分成⾼16位和
低16位;
⾼16位会被作为key存储到short[] keys中,低16位则被看做value
⽐如我要存储666这个数字,需要将666划分成⾼16位和低16位,通过⾼16位定位到当前桶是5,定位到竖着排列的桶未知后,在将低16位的值存储到横着排列的数组中;
之前看clickhouse源码中C++⾥⾯返回的roaring和roaring64map到底是啥,在看CRoaring源码,创建Rbitmap的地⽅:
其中的关键点是:
上⾯意思是定义⼀个结构体,类型是roaring_array_t , 变量名是:high_low_container
这个就是图⽚⾥⾯说的⾼16位和低16位的存储模型,然后查看roaring_array_t的结构:
然后查看ROARING_CONTAINER_T,也就是低16位类型是,因为clickhouse是C++编写的,因此构建的数组其实是:struct container_s {}指向的各个⼦类
返回Clickhouse的源码,要开辟的⼦类就是:
这样就⼜回到了Clickhouse的Rbitmap。虽然转了⼀圈,但是已经知道这个Rbitmap底层存储的其实是数组
2、hive或者sparksql⾥⾯的RoaringBitmapmutable是什么意思
关键就是了解UDAF⾥⾯的函数:
// 输⼊输出都是Object inspectors
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// AggregationBuffer保存数据处理的临时结果
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重新设置AggregationBuffer
public void reset(AggregationBuffer agg) throws HiveException;
// 处理输⼊记录
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// 处理全部输出数据中的部分数据
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
// 把两个部分数据聚合起来
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// 输出最终结果
public Object terminate(AggregationBuffer agg) throws HiveException;
我们要兼容hive的Rbitmap和Clickhouse的Rbitmap,只需要关键⽅法:terminate到底返回了什么
查看代码:
所以关键代码就是:Partial()
hive⾥⾯返回的Rbitmap其实最终是java的⼆进制数组;
所以要想Hive的Rbitmap和Clickhouse的Rbitmap能够兼容,就是演变成:Hive的⼆进制数组如何有效的存储到Clickhouse⾥⾯3、Clickhouse的Roaringbitmap是如何存储的
在回看Clickhouse的Rbitmap,⽐如看添加像Rbitmap⾥⾯添加内容。它的api是:
RoaringBitmap.add(1);
RoaringBitmap.add(2);
其源码是:
//如果基数超过32个,则会将数据存储到Rbitmap
void toLarge()
{
//通过智能指针建⽴对象
rb = std::make_shared<RoaringBitmap>();
//C++ ⾥⾯的for循环,翻译成java就是:for (A x:small)
for (const auto & x : small)
//将smallSet的数据存储到Rbitmap⾥⾯
rb->add(static_cast<Value>(x.getValue()));
//清空smallSet
small.clear();
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。