基於flink-1.8, 1.9包含了blink的程式碼
flink SQL 示例
public static void main(String[] args) throws Exception { // check parameter if (args.length != 1) { System.err.println("Please provide the path to the taxi rides file as a parameter"); } String inputPath = args[0]; // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure event-time and watermarks env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000L); // create table environment StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); // register user-defined function tEnv.registerFunction("toCellId", new GeoUtils.ToCellId()); // get taxi ride event stream DataStream<TaxiRide> rides = TaxiRides.getRides(env, inputPath); // register taxi ride event stream as table "Rides" tEnv.registerDataStream( "Rides", rides, "medallion, licenseId, pickUpTime, dropOffTime.rowtime, " + "pickUpLon, pickUpLat, dropOffLon, dropOffLat, total"); // define SQL query to compute average total per area and hour of day. Table result = tEnv.sqlQuery( "SELECT " + " toCellId(dropOffLon, dropOffLat) AS area, " + " EXTRACT(HOUR FROM dropOffTime) AS hourOfDay, " + " AVG(total) AS avgTotal " + "FROM Rides " + "GROUP BY " + " toCellId(dropOffLon, dropOffLat), " + " EXTRACT(HOUR FROM dropOffTime)" ); // convert result table into a retraction stream and print it tEnv.toRetractStream(result, Row.class) .print(); // execute the query env.execute(); }
1,解析sql語句,轉換成AST語法樹,這裡用SqlNode表示2,驗證SQL,結合Catalog,驗證/檢查sql語法等3,生成邏輯執行(Logical Plan)計劃,將sqlNode表示的AST轉換成LogicalPlan,flink裡邊用RelNode表示,但是這個返回的是一個RelRoot,裡邊封裝了RelNode
def sqlQuery(query: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(query) if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) new Table(this, LogicalRelNode(relational.rel)) } else { throw new TableException( "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.") } }
所以sqlQuery之後生成的是最初的Logical Plan,按照正常的sql,應該還有optimize,以及生成物理執行計劃physicalPlan 繼續看tEnv.toAppendStream(result, Row.class)
def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = { toAppendStream(table, clazz, queryConfig)}def toAppendStream[T]( table: Table, clazz: Class[T], queryConfig: StreamQueryConfig): DataStream[T] = {val typeInfo = TypeExtractor.createTypeInfo(clazz)TableEnvironment.validateType(typeInfo)translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)}protected def translate[A]( table: Table, queryConfig: StreamQueryConfig, updatesAsRetraction: Boolean, withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {val relNode = table.getRelNodeval dataStreamPlan = optimize(relNode, updatesAsRetraction)val rowType = getResultType(relNode, dataStreamPlan)translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)}
看到optimize就知道了,看下里邊的最佳化規則什麼的,規則都在FlinkRuleSets裡邊定義
private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { // 0. convert sub-queries before query decorrelation val convSubQueryPlan = runHepPlanner( HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) // 0. convert table references val fullRelNode = runHepPlanner( HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_REF_RULES, convSubQueryPlan, relNode.getTraitSet) // 1. decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode) // 2. convert time indicators val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) // 3. normalize the logical plan val normRuleSet = getNormRuleSet val normalizedPlan = if (normRuleSet.iterator().hasNext) { runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) } else { convPlan } // 4. optimize the logical Flink plan val logicalOptRuleSet = getLogicalOptRuleSet val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) } else { normalizedPlan } // 5. optimize the physical Flink plan val physicalOptRuleSet = getPhysicalOptRuleSet val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) } else { logicalPlan } // 6. decorate the optimized plan val decoRuleSet = getDecoRuleSet val decoratedPlan = if (decoRuleSet.iterator().hasNext) { val planToDecorate = if (updatesAsRetraction) { physicalPlan.copy( physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)), physicalPlan.getInputs) } else { physicalPlan } runHepPlanner( HepMatchOrder.BOTTOM_UP, decoRuleSet, planToDecorate, planToDecorate.getTraitSet) } else { physicalPlan } decoratedPlan }
先看下FlinkRuleSets定義的規則
FlinkRuleSets.TABLE_SUBQUERY_RULESFlinkRuleSets.TABLE_REF_RULESFlinkRuleSets.LOGICAL_OPT_RULESFlinkRuleSets.DATASET_NORM_RULESFlinkRuleSets.DATASET_OPT_RULESFlinkRuleSets.DATASTREAM_NORM_RULESFlinkRuleSets.DATASTREAM_OPT_RULESFlinkRuleSets.DATASTREAM_DECO_RULES
optimize這塊在1.9有重新調整了下程式碼結構,流批的Optimizer的optimize方法如下
//StreamOptimizer.scaladef optimize( relNode: RelNode, updatesAsRetraction: Boolean, relBuilder: RelBuilder): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode)//subquery規則最佳化 val expandedPlan = optimizeExpandPlan(convSubQueryPlan) val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan, relBuilder) val planWithMaterializedTimeAttributes = RelTimeIndicatorConverter.convert(decorPlan, relBuilder.getRexBuilder) val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes) val logicalPlan = optimizeLogicalPlan(normalizedPlan) val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM) optimizeDecoratePlan(physicalPlan, updatesAsRetraction)}//BatchOptimizer.scaladef optimize(relNode: RelNode): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode) val expandedPlan = optimizeExpandPlan(convSubQueryPlan) val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan) val logicalPlan = optimizeLogicalPlan(normalizedPlan) optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)}
前兩步最佳化規則都一樣
FlinkRuleSets.TABLE_SUBQUERY_RULES FlinkRuleSets.EXPAND_PLAN_RULES, FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES
另外optimizeLogicalPlan也是一樣的規則
protected def getBuiltInLogicalOptRuleSet: RuleSet = { FlinkRuleSets.LOGICAL_OPT_RULES }
但是optimizeNormalizeLogicalPlan,optimizePhysicalPlan使用的規則是不一樣的, batch使用的規則
FlinkRuleSets.DATASET_NORM_RULESFlinkRuleSets.DATASET_OPT_RULES
stream使用的規則
FlinkRuleSets.DATASTREAM_NORM_RULESFlinkRuleSets.DATASTREAM_OPT_RULES
stream在後邊還有一步optimizeDecoratePlan,使用的規則是
FlinkRuleSets.DATASTREAM_DECO_RULES
最後會轉換成dataStream/dataset?
//StreamOptimizer.scalaval dataStream = translateToCRow(optimizedPlan, queryConfig)private def translateOptimized[A]( optimizedPlan: RelNode, logicalSchema: TableSchema, tpe: TypeInformation[A], queryConfig: StreamQueryConfig, withChangeFlag: Boolean): DataStream[A] = {val dataStream = translateToCRow(optimizedPlan, queryConfig)DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)} private def translateToCRow( logicalPlan: RelNode, queryConfig: StreamQueryConfig): DataStream[CRow] = { logicalPlan match { case node: DataStreamRel => node.translateToPlan(this, queryConfig) case _ => throw new TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") } }
這裡的DataStreamRel是個trait,extends了FlinkRelNode
trait DataStreamRel extends FlinkRelNode { /** * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamPlanner]] of the translated Table. * @param queryConfig The configuration for the query to generate. * @return DataStream of type [[CRow]] */ def translateToPlan( tableEnv: StreamPlanner, queryConfig: StreamQueryConfig): DataStream[CRow] } ...}
實現translateToPlan的很多,比如DataStreamWindowJoin,DataStreamJoin
最新評論