kafka最新版本Flink1.11新版本解读:更易⽤的流批⼀体SQL引擎
许多的数据科学家、分析师和 BI ⽤户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核⼼
模块之⼀。作为⼀个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开
发者可以很⽅便地在⼀个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、
和代码⽣成技术,Flink SQL 拥有⾮常⾼的查询效率。同时借助于 Flink runtime 良好的容错和扩
展性,Flink SQL 可以轻松处理海量数据。
在保证优秀性能的同时,易⽤性是 1.11 版本 Flink SQL 的重头戏。易⽤性的提升主要体现在以下
⼏个⽅⾯:
更⽅便的追加或修改表定义;
灵活的声明动态的查询参数;
加强和统⼀了原有 TableEnv 上的 SQL 接⼝;
简化了 connector 的属性定义;
对 Hive 的 DDL 做了原⽣⽀持;
加强了对 python UDF 的⽀持。
下⾯逐⼀为⼤家介绍 ~
Create Table Like
在⽣产中,⽤户常常有调整现有表定义的需求。例如⽤户想在⼀些外部的表定义(例如 Hive metastore)基础上追加 Flink 特有的⼀些定义⽐如 watermark。在 ETL 场景中,将多张表的数据
合并到⼀张表,⽬标表的 schema 定义其实是上游表的合集,需要⼀种⽅便合并表定义的⽅式。
从 1.11 版本开始,Flink 提供了 LIKE 语法,⽤户可以很⽅便的在已有的表定义上追加新的定义。
例如我们可以使⽤下⾯的语法给已有表 base_table 追加 watermark 定义:
CREATE [TEMPORARY] TABLE base_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id)
) WITH (
'connector': 'kafka'
)
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;
这⾥ derived_table 表定义等价于如下定义:
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id),
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
‘connector’: ‘kafka’
)
对⽐之下,新的语法省去了重复的 schema 定义,⽤户只需要定义追加属性,⾮常⽅便简洁。
多属性策略
有的⼩伙伴会问,原表和新表的属性只是新增或追加吗?如果我想覆盖或者排除某些属性该如何操作?这是⼀个好问题,Flink LIKE 语法提供了⾮常灵活的表属性操作策略。
LIKE 语法⽀持使⽤不同的 keyword 对表属性分类:
ALL:完整的表定义;
CONSTRAINTS: primary keys, unique key 等约束;
GENERATED: 主要指计算列和 watermark;
OPTIONS: WITH (...) 语句内定义的 table options;
PARTITIONS: 表分区信息。
在不同的属性分类上可以追加不同的属性⾏为:
INCLUDING:包含(默认⾏为);
EXCLUDING:排除;
OVERWRITING:覆盖。
下⾯这张表格说明了不同的分类属性允许的⾏为:
例如下⾯的语句:
CREATE [TEMPORARY] TABLE base_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id)
) WITH (
'connector': 'kafka',
'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
'format': 'json'
)
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
WITH (
'connector.starting-offset': '0'
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);
等价的表属性定义为:
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
'connector': 'kafka',
'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
'format': 'json'
)
细节参见:/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
Dynamic Table Options
在⽣产中,调整参数是⼀个常见需求,很多的时候是临时修改(⽐如通过终端查询和展⽰),⽐如下⾯这张 Kafka 表:
create table kafka_table (
id bigint,
age int,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'employees',
'de' = 'timestamp',
'scan.startup.timestamp-millis' = '123456',
'format' = 'csv',
'csv.ignore-parse-errors' = 'false'
)
在之前的版本,如果⽤户有如下需求:
⽤户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性;
⽤户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true。
只能使⽤ ALTER TABLE 这样的语句修改表的定义,从 1.11 开始,⽤户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (...) 语句内定义的 table options。
基本语法为:
table_name /*+ OPTIONS('k1'='v1', ''='v2') */
OPTIONS 内的键值对会覆盖原表的 table options,⽤户可以在各种 SQL 语境中使⽤这样的语法,例如:
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('de'='earliest-offset') */;
-- override table options in join
select * from
kafka_table1 /*+ OPTIONS('de'='earliest-offset') */ t1
join
kafka_table2 /*+ OPTIONS('de'='earliest-offset') */ t2
on t1.id = t2.id;
-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from
kafka_table2;
动态参数的使⽤没有语境限制,只要是引⽤表的地⽅都可以追加定义。在指定的表后⾯追加的动态参数会⾃动追加到原表定义中,是不是很⽅便呢:)
由于可能对查询结果有影响,动态参数功能默认是关闭的,使⽤下⾯的⽅式开启该功能:
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = Config().getConfiguration();
// set low-level key-value options
configuration.setString('abled', 'true');
细节参见:/projects/flink/flink-docs-master/dev/table/sql/hints.html
SQL API 改进
随着 Flink SQL ⽀持的语句越来越丰富,⽼的 API 容易引起⼀些困惑:
原先的 sqlUpdate() ⽅法传递 DDL 语句会⽴即执⾏,⽽ INSERT INTO 语句在调⽤ execute ⽅法时才会执⾏;
Table 程序的执⾏⼊⼝不够清晰,像 ute() 和ute() 都可以触发 table 程序执⾏;
execute ⽅法没有返回值。像 SHOW TABLES 这样的语句没有很好地⽅式返回结果。另
外,sqlUpdate ⽅法加⼊了越来越多的语句导致接⼝定义不清晰,sqlUpdate 可以执⾏ SHOW TABLES 就是⼀个反例;
在 Blink planner ⼀直提供多 sink 优化执⾏的能⼒,但是在 API 层没有体现出来。
1.11 重新梳理了 TableEnv 上的 sql 相关接⼝,提供了更清晰的执⾏语义,同时执⾏任意 sql 语句现在都有返回值,⽤户可以通过新的 API 灵活的组织多⾏ sql 语句⼀起执⾏。
更清晰的执⾏语义
新的接⼝ TableEnvironment#executeSql 统⼀返回抽象 TableResult,⽤户可以迭代 TableResult 拿到执⾏结果。根据执⾏语句的不同,返回结果的数据结构也有变化,⽐如 SELECT 语句会返回查询结果,⽽ INSERT 语句会异步提交作业到集。
组织多条语句⼀起执⾏
新的接⼝ TableEnvironment#createStatementSet 允许⽤户添加多条 INSERT 语句并⼀起执⾏,在多 sink 场景,Blink planner 会针对性地对执⾏计划做优化。
新旧 API 对⽐
⼀张表格感受新⽼ API 的变化:
sqlUpdate vs executeSql
execute vs createStatementSet
详情参见:/confluence/pages/viewpage.action?pageId=134745878
Hive 语法兼容加强
从 1.11 开始,Flink SQL 将 Hive parser 模块独⽴出来,⽤以兼容 Hive 的语法,⽬前 DDL 层⾯,DB、Table、View、Function 相关的语法均已⽀持。搭配 HiveCatalog,Hive 的同学可以直接使⽤ Hive 的语法来进⾏相关的操作。
在使⽤ hive 语句之前需要设置正确的 Dialect:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论