⼤数据分析引擎-Doris简要介绍
⽬录:
1、背景
2、Doris的架构
3、Doris的核⼼特性
4、数据的导⼊和输出
1、背景
⼤数据的时代,数据的处理能⼒⼤⼤增强,但在最后⼀个环节,即数据应⽤服务环节依然存在较⼤的瓶颈。
原来业务数据库时代,⾼并发、⾼灵活性是⼀个⽭盾体,如何让⼀线在灵活定制分析SQL时候,虽然拖拉拽⽣成了不忍直视的SQL,但希望查询引擎依旧可以保持强劲的性能指标,不管是并发度还是查询时间都能让客户满意,是蛮有挑战的⼀件事情。
⽬前的查询分析⾮常多,并且还在不断的涌现出来,引擎层出不穷,各有优势也有其缺点,⽐如ADB、Hologres、Presto、Kylin、Hbase、Doris,这些产品本质上都是⽤资源换时间,或者空间换时间,本质上就是计算机制重构(⽐如MPP)、硬件提速(⽐如SSD磁盘)、索引提速(⽐如位图)、空间转换(⽐如预计算)等维度是提升性能。不断涌现且更新的技术产品也从侧⾯印证了查询引擎的问题依然很多,痛点依旧没有解决,同时也⼤有可为。
Doris最近也是⽐较⽕的⼀款产品,脱胎于百度的⼴告业务,适应于实时分析场景,确实解决了查询引擎的⼀些场景下的痛点问题,接下来就将个⼈对Doris的⼀些理解简单介绍⼀下。
2、Doris架构
在介绍Doris的特性之前,先让我们了解⼀下Doris的整体架构。具体包括⼏个核⼼维度:
MPP的运⾏框架,充分挖掘多核CPU的并⾏计算能⼒;
分布式架构⽀持多副本⽀撑⾼可⽤;
接⼊了多个⼤数据的⽣态,⽐如Spark, Flink, Hive, ElasticSearch,提供了丰富的数据接⼊和输出的服务;
采取分区分桶的机制,⽀持多种索引技术,满⾜PB级的存储和分析能⼒;
⽀持Mysql协作,简单、易⽤;
列式存储和压缩技术,提升查询性能;
Doris由FrontEnd DorisDB前端节点和BackEnd DorisDB后端节点核⼼组件组成;前端节点负责管理元数据、管理客户端的连接、进⾏查询规划和调度等⼯作;后端节点负责数据存储、计算执⾏、副本管理等;另外还包括DorisManager和Broker,DorisManager管理⼯具,负责提供集管理、在线查询、故障查询、监控报警的可视化⼯具;Broker负责和外部存储(HDFS或对象存储)进⾏数据的导出导⼊等辅助功能;Doris 可以通过MySQL客户端直接访问。
Doris核⼼组件及运⾏模式如下(以下FrontEnd DorisDB简称FE,BackEnd DorisDB简称BE):
⾸先是FE:
管理元数据, 执⾏SQL DDL命令, ⽤Catalog记录库, 表, 分区, tablet副本等信息。
FE⾼可⽤部署, 使⽤复制协议选主和主从同步元数据, 所有的元数据修改操作, 由FE leader节点完成, FE follower节点可执⾏读操作。 元数据的读写满⾜顺序⼀致性。  FE的节点数⽬采⽤2n+1, 可容忍n个节点故障。  当FE leader故障时, 从现有的follower节点重新选主, 完成故障切换。
FE的SQL layer对⽤户提交的SQL进⾏解析, 分析, 改写, 语义分析和关系代数优化, ⽣产逻辑执⾏计划。
FE的Planner负责把逻辑计划转化为可分布式执⾏的物理计划, 分发给⼀组BE。
FE监督BE, 管理BE的上下线, 根据BE的存活和健康状态, 维持tablet副本的数量。
FE协调数据导⼊, 保证数据导⼊的⼀致性。
其次是BE:
BE管理tablet副本, tablet是table经过分区分桶形成的⼦表, 采⽤列式存储。
BE受FE指导, 创建或删除⼦表。
BE接收FE分发的物理执⾏计划并指定BE coordinator节点, 在BE coordinator的调度下, 与其他BE worker共同协作完成执⾏。
BE读本地的列存储引擎获取数据,并通过索引和谓词下沉快速过滤数据。
BE后台执⾏compact任务, 减少查询时的读放⼤。
数据导⼊时, 由FE指定BE coordinator, 将数据以fanout的形式写⼊到tablet多副本所在的BE上。
以查询为例,通过FE的组织、协调、控制,对提交的SQL进⾏解析, 分析, 改写, 优化和规划, ⽣成分布式执⾏计划,然后由若⼲BE执⾏,并在若⼲BE中选定⼀个协作者coordinator,由协作者协调n个BE进⾏本地计算,然后返回给协作者,协作者汇总后返回给FE最终结果,最后由FE将最终结果提供给最终⽤户。
3、Doris的核⼼特性
Doris具体具备什么样的核⼼特性,让我们看看他能做什么。
1)表是如何设计?列式存储;稀疏索引Shortkey Index;加速数据处理(预先聚合、分区分桶、RollUp表物化索引、列级别的索引技术布隆过滤器和Bitmap索引);
2)数据模型有哪些?明细模型DUPLICATE、聚合模型AGGREGATE、更新模型UNIQUE;每个模型后都需要指定排序键;对于不同的应⽤场景,采取不同的数据模型,满⾜⾼性能的要求;
3)数据分布有⼏种?Round-Robin轮转范围、Round指定区分范围、离散List、哈希Hash;同时还⽀持动态分区分布;分区分桶是为了MPP最⼤化利⽤资源,为防⽌数据倾斜,需要选择合理的分布策略;
4)物化视图:相较于聚合模型的汇总分析的数据不⼀致性,物化视图天然的数据⼀致性是它最⼤的优点。
物化视图的组织形成与基表、RollUp表相同,创建后,基本的数据会⾃动以异步的⽅式填充分到物化视图中;数据导⼊时,基表和物化视图保持原⼦型保证数据的⼀致性。物化视图创建后,不能通过命令直接查询,还是查基表,是否⽤物化视图,需要由执⾏计划⾃动选择,可以查询是否使⽤了物化视图。todo基表、Rollup物化索引、物化视图都是使⽤前缀索引。
可以使⽤bitmap_union创建物化视图来处理精确去重;也可以使⽤hll_union创建物化视图来处理近亿去重;也可以匹配更丰富的前缀索引,⽐如⽤户的基表tableA有(k1, k2, k3) 三列。其中 k1, k2 为排序键。这时候如果⽤户查询条件中包含 where k1=1 and k2=2 就能通过shortkey索引加速查询。但是⽤户查询语句中使⽤条件k3=3, 则⽆法通过shortkey索引加速. 此时, 可创建以k3作为第⼀列的物化视图。
当然也有⼀些限制,分区列必须在Groupby中;不⽀持Key列聚合只⽀持Value列聚合;不⽀持指定物化视图查询;只⽀持单例聚合,不⽀持表达式(⽐如sum(a+b)) ;过多的物化视图,会影响导⼊数据的效率,⽐如有20张物化视图,则相当于导⼊20张表,但不影响查询性能,在有物化索引或物化视图的情况下,性能会更好;相同列,不同聚合函数,不能同时出现在⼀张物化视图中;物化视图不⽀持Join和where,不⽀持Groupby的Having⼦句;不能并⾏只能串⾏创建物化视图;
5)Bitmap索引:Bitmap索引的原理就是将RawData进⾏Dictionary的转提炼,然后基于Dictionary(Value、ID)的ID进⾏
BitmapIndex(ID、Bitmap)的存储和查询。⼀般为0和1,如果是多值列,则需要转化为某值为0,其他值为1;
当然也有⼀些限制,对于聚合模型,只⽀持Key列Bitmap索引;适⽤于⼤量重复、较低基数的场景;不⽀持Float、Double、Decimal类型列建Bitmap索引;通过查询的Profile信息查看是否使⽤了索引;
6)Bitmap精确去重:重有2种,⼀个传统的是count distinct,优点是保留明细数据,缺点是消耗极⼤的计算和存储资源;⼀个是基于预计算,在⽤户不关⼼明细数据的情况下,采取预计算的⽅式去重,⽤空间换时间,效果不错,MOLAP的核⼼思路也是如此。
Doris使⽤Bitmap去重,原理即给定1个数组,其取值范围为[0,n](不包括n),对该数组去重,可采⽤(n+7)/8的字节长度的Bitmap,初始化为0;逐个处理数组元素,以数组中元素取值作为Bitmap的下标,将该下标的bit置为1;最后统计Bitmap中1的个数即为数组的count distinct 的结果。
使⽤Bitmap去重的优势:空间优势,对于Int32的去重,只需要1/32空间,在Doris中,使⽤Roaring Bitmap存储空间会进⼀步降低。时间优势:基于Bitmap的操作⽐基于Sort和基于Hash的去重效率都要⾼,复杂度只有O(1)~O(n),并且⽆条件依赖和数据依赖,可向量化执⾏。
当然以有些限制,包括只能⽤于聚合表,明细表和更新表不⽀持Bitmap列;数据类型只能是BITMAP,聚合函数为BITMAP_UNION;在Bitmap 列上使⽤count distinct,⾃动转换为BITMAP_UNION_COUNT计算。
7)Bloomfilter索引:⽤于判断某个元素是否在⼀个集合中的数据结构,优点是空间效率和时间效率都⽐较⾼,缺点是有⼀定的误判率,但不存集合中时,⼀定会报不存在。布隆过滤器由Bit数组和N个哈希函数构成,Bit数据组实始全为0,当⼀个元素插⼊时,则通过N个哈希函数进⾏计算n个Slot,然后将Bit数组中的n个Slot的Bit置1。当判断某⼀个值是否存在时,就通过N个哈希函数计算n个Slot,如果n个Slot对应的Bit位都为1,则集合存在,只要有⼀个Bit为0,则不存在。由于Bit数组位数有限,所以Bit位通过不同的哈希函数计算完成后,Bit是相同或冲突的,所以全1的情形也不⼀定真存在;与Bitmap的适⽤场景相反,适⽤⽤于⾼基数的场景,⼀般⽤于in条件的⽐较多,=条件也适⽤;
当然也有⼀些限制:不⽀持Tinyint、Float、Double 类型的列建Bloom Filter索引;只⽀持in和=过滤查询;通过查询的Profile信息查看是否使⽤了索引;
8)外部表:DorisDB⽀持以外部表的形式,接⼊其他数据源。外部表指的是保存在其他数据源中的数据表。⽬前DorisDB已⽀持的第三⽅数据源包括MySQL、HDFS、ElasticSearch,Hive。对这⼏种种数据源,现阶段只⽀持读取,还不⽀持写⼊。
9)Doris还⽀持数组、窗⼝函数、HyperLogLog去重、broadcast join、 Lateral Join等⼀些应⽤,在这⾥就不⼀⼀介绍了,可以看看官⽅资料。
4、数据的导⼊和输出
导⼊:
根据不同的数据来源可以选择不同的导⼊⽅式:
1)离线数据导⼊,如果数据源是Hive/HDFS,推荐采⽤Broker Load导⼊, 如果数据表很多导⼊⽐较⿇烦可以考虑使⽤Hive外表直连查询,性能会⽐Broker load导⼊效果差,但是可以避免数据搬迁,如果单表的数据量特别⼤,或者需要做全局数据字典来精确去重可以考虑Spark Load导⼊。
2)实时数据导⼊,⽇志数据和业务数据库的binlog同步到Kafka以后,优先推荐通过Routine load导⼊DorisDB,如果导⼊过程中有复杂的多表关联和ETL预处理可以使⽤Flink处理以后⽤stream load写⼊DorisDB,我们有标准的Flink-connector 可以⽅便Flink任务使⽤
3)程序写⼊DorisDB,推荐使⽤Stream Load,可以参考例⼦中有java python shell的demo,
4)⽂本⽂件导⼊推荐使⽤ Stream load
5)Mysql数据导⼊,推荐使⽤Mysql外表,insert into new_table select * from external_table 的⽅式导⼊
6)其他数据源导⼊,推荐使⽤DataX导⼊,我们提供了DataX-dorisdb-writer
7)DorisDB内部导⼊,可以在DorisDB内部使⽤insert into tablename select 的⽅式导⼊,可以跟外部调度器配合实现简单的ETL处理。
⼀个导⼊作业主要分为5个阶段:
1)PENDING
⾮必须。该阶段是指⽤户提交导⼊作业后,等待FE调度执⾏。
Broker Load和将来的Spark Load包括该步骤。
2)ETL
⾮必须。该阶段执⾏数据的预处理,包括清洗、分区、排序、聚合等。
Spark Load包括该步骤,它使⽤外部计算资源Spark完成ETL。
3)LOADING
该阶段先对数据进⾏清洗和转换,然后将数据发送给BE处理。当数据全部导⼊后,进⼊等待⽣效过程,此时导⼊作业状态依旧是LOADING。
4)FINISHED
在导⼊作业涉及的所有数据均⽣效后,作业的状态变成 FINISHED,FINISHED后导⼊的数据均可查询。FINISHED是导⼊作业的最终状态。
5)CANCELLED
在导⼊作业状态变为FINISHED之前,作业随时可能被取消并进⼊CANCELLED状态,如⽤户⼿动取消或导⼊出现错误等。CANCELLED也是导⼊作业的⼀种最终状态。
适⽤场景:
1)HDFS导⼊
源数据存储在HDFS中,数据量为⼏⼗GB到上百GB时,可采⽤Broker Load⽅法向DorisDB导⼊数据。
此时要求部署的Broker进程可以访问HDFS数据源。导⼊数据的作业异步执⾏,⽤户可通过SHOW LOAD命令查看导⼊结果。
源数据存储在HDSF中,数据量达到TB级别时,可采⽤Spark Load⽅法向DorisDB导⼊数据。此时要求部署的Spark进程可以访问HDFS数据源。导⼊数据的作业异步执⾏,⽤户可通过SHOW LOAD命令查看导⼊结果。
对于其它外部数据源,只要Broker或Spark进程能读取对应数据源,也可采⽤Broker Load或Spark Load⽅法导⼊数据。
2)本地⽂件导⼊
数据存储在本地⽂件中,数据量⼩于10GB,可采⽤Stream Load⽅法将数据快速导⼊DorisDB系统。采⽤HTTP协议创建导⼊作业,作业同步执⾏,⽤户可通过HTTP请求的返回值判断导⼊是否成功。
3)Kafka导⼊
数据来⾃于Kafka等流式数据源,需要向DorisDB系统导⼊实时数据时,可采⽤Routine Load⽅法。⽤户通过MySQL协议创建例⾏导⼊作
安卓在线解析json业,DorisDB持续不断地从Kafka中读取并导⼊数据。
4)Insert Into导⼊
⼿⼯测试及临时数据处理时可以使⽤Insert Into⽅法向DorisDB表中写⼊数据。其中,INSERT INTO tbl SELECT ...;语句是从 DorisDB 的表中读取数据并导⼊到另⼀张表;INSERT INTO tbl VALUES(...);语句向指定表⾥插⼊单条数据。
5)同时,还有其他⼀些⽅式Json数据导⼊(对于⼀些半结构化的⽐如Json类型的数据,我们可以⽤stream load 或者 routine load的⽅式进⾏导⼊。Stream Load: 对于⽂本⽂件存储的Json数据,我们可以使⽤ stream load进⾏导⼊。Routine Load:对于Kafka中的json格式数据,可以使⽤Routine load的⽅式导⼊)、flink-connector-dorisdb(内部实现是通过缓存并批量由stream load导⼊)、DataX-dorisdb-
writer(DorisWriter 插件实现了写⼊数据到 DorisDB 的⽬的表的功能。在底层实现上, DorisWriter 通过Stream load以csv或 json 格式导⼊数据⾄DorisDB。内部将reader读取的数据进⾏缓存后批量导⼊⾄DorisDB,以提⾼写⼊性能。总体数据流是 source -> Reader -> DataX
channel -> Writer -> DorisDB。)
输出:
数据导出(Export)是 DorisDB 提供的⼀种将数据导出并存储到其他介质上的功能。该功能可以将⽤户指定的表或分区的数据,以⽂本的格式,通过 Broker 进程导出到远端存储上,如 HDFS/阿⾥云OSS/AWS S3(或者兼容S3协议的对象存储) 等。
⽤户提交⼀个导出作业后,DorisDB 会统计这个作业涉及的所有 Tablet,然后对这些 Tablet 进⾏分组,每组⽣成⼀个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中。
处理流程主要包括:
1)⽤户提交⼀个 Export 作业到 FE。
2)FE 的导出调度器会通过两阶段来执⾏⼀个导出作业:
3)PENDING:FE ⽣成⼀个 ExportPendingTask,向 BE 发送 snapshot 命令,对所有涉及到的 Tablet 做⼀个快照,并⽣成多个查询计划。
4) EXPORTING:FE ⽣成⼀个 ExportExportingTask,开始执⾏⼀个个的查询计划。
Spark DorisDB Connector 可以⽀持通过 Spark 读取 DorisDB 中存储的数据。
1)当前版本只⽀持从DorisDB中读取数据。
2)可以将DorisDB表映射为DataFrame或者RDD,推荐使⽤DataFrame。
3)⽀持在DorisDB端完成数据过滤,减少数据传输量。
以上内容,⼤部分是官⽅资料,中间增加了我的个⼈理解。总的来说,从存储机制、索引机制、运⾏机制、⽀撑场景等维度进⾏了简要说明,确实也能解决⼀部分的应⽤问题。
后续应⽤深⼊后,再分享实践相关的内容。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。