1、前言

1.1、JavaCC

        JavaCC(Java Compiler Compiler)是一个开源的语法分析器生成器和词法分析器生成器。JavaCC通过词法和语法描述文件来生成分析器。

        flink通过java CC生成分析器用于sql解析和校验。

如下图:在flink-table下的flink-sql-parser项目中,org.apache.flink.sql.parser.impl下的类,就是使用javacc生成的。

1.2、Calcite

        Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。

        flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。

        梳理一下Calcite SQL执行的几个阶段:

  1. 通过Parser解析器将传入的sql解析成一颗词法树,SqlNode作为树的节点
  2. 做词法的校验Validate,类型校验,元数据校验等等
  3. 将校验好的SqlNode树转换成对应的关系代数表达式,也是一颗树,RelNode作为节点
  4. 将RelNode关系代数表达式树,通过内置的两种优化器Volcano , Hep 优化关系代数表达式得到最优逻辑代数的一颗树,也是RelNode
  5. 最优的逻辑代数表达式(RelNode),会被转换成对应的可执行的物理执行计划(转换逻辑根据框架有所不同),像Flink就转成他的Operator去运行

2、Flink SQL转换流程

        SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Operation树,接下来blink planner将Opearation树转为RelNode然后进行优化,最后生成Transformation变成流计算任务。

2.1、Sql语句解析成语法树阶段(SQL - > SqlNode)

        TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了executeSql,sqlQuery等方法用来执行DDL和DML等sql,sql执行时会对sql进行解析,ParserImpl是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法并创建并调用使用javacc生成的sql解析器(FlinkSqlParserImpl)中的parseSqlStmtEof方法完成sql解析,并返回SqlNode对象

        核心代码如下:

public List<Operation> parse(String statement) {
	CalciteParser parser = calciteParserSupplier.get();
	FlinkPlannerImpl planner = validatorSupplier.get();
	//TODO 在这里调用使用javacc生成的分析器,将sql语句解析成sqlNode
	SqlNode parsed = parser.parse(statement);

	//TODO 将sqlNode转换为Operation
	Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
		.orElseThrow(() -> new TableException("Unsupported query: " + statement));
	return Collections.singletonList(operation);
}

        其中parser.parse(...)方法,将sql语句解析成sqlNode。对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。如下:

2.2、Sql校验(SqlNode - > Operation)

        sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是在这个过程中完成。在SqlToOperationConverter#convert()方法中完成这个过程的转换,之间会通过FlinkPlannerInpm#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Opeation。

         不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。

         其中

2.3、Flink SQL优化(Operation - > RelNode->Transformation )

        Blink中并没有直接使用Calcite的优化器,而是通过规则组合和Calcite优化组合分别为batch和stream实现了自定义的优化器。
        优化执行前会先将SqlNode转为RelNode,基于RelNode调用PlannerBase#optimize()并执行StreamCommonSubGraphBasedOptimizer#doOptimize()方法完成优化

        在完成Sql到RelNode的转换后,会执行executeOperation(...)操作,在这里先将sqlNode转换成RelNode。然后进行优化操作。传入的参数为:

        然后根据传入的sql语句类型,选择不同的操作。包含有Modify、CreateTable、DropTable等。如下:

        其实都是调用的TableEnvironmentImpl.executeInternal(...)。

         在这里,有进行转换和优化操作,重点是在translate方法中,最终调用的是PlannerBase里的translate(...)方法

override def translate(
    modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
  if (modifyOperations.isEmpty) {
    return List.empty[Transformation[_]]
  }
  // prepare the execEnv before translating
  getExecEnv.configure(
    getTableConfig.getConfiguration,
    Thread.currentThread().getContextClassLoader)
  overrideEnvParallelism()

  // TODO 在这里完成转换 SqlNode转换为RelNode
  val relNodes = modifyOperations.map(translateToRel)

  // TODO 在这里完成优化
  val optimizedRelNodes = optimize(relNodes)
  val execNodes = translateToExecNodePlan(optimizedRelNodes)
  translateToPlan(execNodes)
}

        在上述的优化代码行,根据是流处理或者批处理老选择不同的类中的方法进行优化。

        最终由translateToPlan方法将ExecNode转换成Transfomation列表

        整体流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation

        基于生成的Transformation对象调用StreamExecutor#createPipeline()方法生成StreamGraph便可以执行任务了。

        至此flink sql转换流程便结束了。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐