FlinkSQL源码解析(一)转换流程
1、前言
1.1、JavaCC
JavaCC(Java Compiler Compiler)是一个开源的语法分析器生成器和词法分析器生成器。JavaCC通过词法和语法描述文件来生成分析器。
flink通过java CC生成分析器用于sql解析和校验。
如下图:在flink-table下的flink-sql-parser项目中,org.apache.flink.sql.parser.impl下的类,就是使用javacc生成的。
1.2、Calcite
Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。
flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。
梳理一下Calcite SQL执行的几个阶段:
- 通过Parser解析器将传入的sql解析成一颗词法树,SqlNode作为树的节点
- 做词法的校验Validate,类型校验,元数据校验等等
- 将校验好的SqlNode树转换成对应的关系代数表达式,也是一颗树,RelNode作为节点
- 将RelNode关系代数表达式树,通过内置的两种优化器Volcano , Hep 优化关系代数表达式得到最优逻辑代数的一颗树,也是RelNode
- 最优的逻辑代数表达式(RelNode),会被转换成对应的可执行的物理执行计划(转换逻辑根据框架有所不同),像Flink就转成他的Operator去运行
2、Flink SQL转换流程
SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Operation树,接下来blink planner将Opearation树转为RelNode然后进行优化,最后生成Transformation变成流计算任务。
2.1、Sql语句解析成语法树阶段(SQL - > SqlNode)
TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了executeSql,sqlQuery等方法用来执行DDL和DML等sql,sql执行时会对sql进行解析,ParserImpl是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法并创建并调用使用javacc生成的sql解析器(FlinkSqlParserImpl)中的parseSqlStmtEof方法完成sql解析,并返回SqlNode对象
核心代码如下:
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
//TODO 在这里调用使用javacc生成的分析器,将sql语句解析成sqlNode
SqlNode parsed = parser.parse(statement);
//TODO 将sqlNode转换为Operation
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
其中parser.parse(...)方法,将sql语句解析成sqlNode。对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。如下:
2.2、Sql校验(SqlNode - > Operation)
sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是在这个过程中完成。在SqlToOperationConverter#convert()方法中完成这个过程的转换,之间会通过FlinkPlannerInpm#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Opeation。
不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。
其中
2.3、Flink SQL优化(Operation - > RelNode->Transformation )
Blink中并没有直接使用Calcite的优化器,而是通过规则组合和Calcite优化组合分别为batch和stream实现了自定义的优化器。
优化执行前会先将SqlNode转为RelNode,基于RelNode调用PlannerBase#optimize()并执行StreamCommonSubGraphBasedOptimizer#doOptimize()方法完成优化
在完成Sql到RelNode的转换后,会执行executeOperation(...)操作,在这里先将sqlNode转换成RelNode。然后进行优化操作。传入的参数为:
然后根据传入的sql语句类型,选择不同的操作。包含有Modify、CreateTable、DropTable等。如下:
其实都是调用的TableEnvironmentImpl.executeInternal(...)。
在这里,有进行转换和优化操作,重点是在translate方法中,最终调用的是PlannerBase里的translate(...)方法
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()
// TODO 在这里完成转换 SqlNode转换为RelNode
val relNodes = modifyOperations.map(translateToRel)
// TODO 在这里完成优化
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
translateToPlan(execNodes)
}
在上述的优化代码行,根据是流处理或者批处理老选择不同的类中的方法进行优化。
最终由translateToPlan方法将ExecNode转换成Transfomation列表
整体流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation
基于生成的Transformation对象调用StreamExecutor#createPipeline()方法生成StreamGraph便可以执行任务了。
至此flink sql转换流程便结束了。
更多推荐
所有评论(0)