首頁>技術>

基於flink-1.8, 1.9包含了blink的程式碼

flink SQL 示例

    public static void main(String[] args) throws Exception {        // check parameter        if (args.length != 1) {            System.err.println("Please provide the path to the taxi rides file as a parameter");        }        String inputPath = args[0];        // create execution environment        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // configure event-time and watermarks        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.getConfig().setAutoWatermarkInterval(1000L);        // create table environment        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        // register user-defined function        tEnv.registerFunction("toCellId", new GeoUtils.ToCellId());        // get taxi ride event stream        DataStream<TaxiRide> rides = TaxiRides.getRides(env, inputPath);        // register taxi ride event stream as table "Rides"        tEnv.registerDataStream(            "Rides",            rides,            "medallion, licenseId, pickUpTime, dropOffTime.rowtime, " +                "pickUpLon, pickUpLat, dropOffLon, dropOffLat, total");        // define SQL query to compute average total per area and hour of day.        Table result = tEnv.sqlQuery(            "SELECT " +            "  toCellId(dropOffLon, dropOffLat) AS area, " +            "  EXTRACT(HOUR FROM dropOffTime) AS hourOfDay, " +            "  AVG(total) AS avgTotal " +            "FROM Rides " +            "GROUP BY " +            "  toCellId(dropOffLon, dropOffLat), " +            "  EXTRACT(HOUR FROM dropOffTime)"        );        // convert result table into a retraction stream and print it        tEnv.toRetractStream(result, Row.class)                .print();        // execute the query        env.execute();    }

1,解析sql語句,轉換成AST語法樹,這裡用SqlNode表示2,驗證SQL,結合Catalog,驗證/檢查sql語法等3,生成邏輯執行(Logical Plan)計劃,將sqlNode表示的AST轉換成LogicalPlan,flink裡邊用RelNode表示,但是這個返回的是一個RelRoot,裡邊封裝了RelNode

  def sqlQuery(query: String): Table = {    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)    // parse the sql query    val parsed = planner.parse(query)    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {      // validate the sql query      val validated = planner.validate(parsed)      // transform to a relational tree      val relational = planner.rel(validated)      new Table(this, LogicalRelNode(relational.rel))    } else {      throw new TableException(        "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")    }  }

所以sqlQuery之後生成的是最初的Logical Plan,按照正常的sql,應該還有optimize,以及生成物理執行計劃physicalPlan 繼續看tEnv.toAppendStream(result, Row.class)

def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {    toAppendStream(table, clazz, queryConfig)}def toAppendStream[T](  table: Table,  clazz: Class[T],  queryConfig: StreamQueryConfig): DataStream[T] = {val typeInfo = TypeExtractor.createTypeInfo(clazz)TableEnvironment.validateType(typeInfo)translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)}protected def translate[A](  table: Table,  queryConfig: StreamQueryConfig,  updatesAsRetraction: Boolean,  withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {val relNode = table.getRelNodeval dataStreamPlan = optimize(relNode, updatesAsRetraction)val rowType = getResultType(relNode, dataStreamPlan)translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)}  

看到optimize就知道了,看下里邊的最佳化規則什麼的,規則都在FlinkRuleSets裡邊定義

 private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {    // 0. convert sub-queries before query decorrelation    val convSubQueryPlan = runHepPlanner(      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)    // 0. convert table references    val fullRelNode = runHepPlanner(      HepMatchOrder.BOTTOM_UP,      FlinkRuleSets.TABLE_REF_RULES,      convSubQueryPlan,      relNode.getTraitSet)    // 1. decorrelate    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)    // 2. convert time indicators    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)    // 3. normalize the logical plan    val normRuleSet = getNormRuleSet    val normalizedPlan = if (normRuleSet.iterator().hasNext) {      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)    } else {      convPlan    }    // 4. optimize the logical Flink plan    val logicalOptRuleSet = getLogicalOptRuleSet    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)    } else {      normalizedPlan    }    // 5. optimize the physical Flink plan    val physicalOptRuleSet = getPhysicalOptRuleSet    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)    } else {      logicalPlan    }    // 6. decorate the optimized plan    val decoRuleSet = getDecoRuleSet    val decoratedPlan = if (decoRuleSet.iterator().hasNext) {      val planToDecorate = if (updatesAsRetraction) {        physicalPlan.copy(          physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),          physicalPlan.getInputs)      } else {        physicalPlan      }      runHepPlanner(        HepMatchOrder.BOTTOM_UP,        decoRuleSet,        planToDecorate,        planToDecorate.getTraitSet)    } else {      physicalPlan    }    decoratedPlan  }

先看下FlinkRuleSets定義的規則

FlinkRuleSets.TABLE_SUBQUERY_RULESFlinkRuleSets.TABLE_REF_RULESFlinkRuleSets.LOGICAL_OPT_RULESFlinkRuleSets.DATASET_NORM_RULESFlinkRuleSets.DATASET_OPT_RULESFlinkRuleSets.DATASTREAM_NORM_RULESFlinkRuleSets.DATASTREAM_OPT_RULESFlinkRuleSets.DATASTREAM_DECO_RULES

optimize這塊在1.9有重新調整了下程式碼結構,流批的Optimizer的optimize方法如下

//StreamOptimizer.scaladef optimize(    relNode: RelNode,    updatesAsRetraction: Boolean,    relBuilder: RelBuilder): RelNode = {    val convSubQueryPlan = optimizeConvertSubQueries(relNode)//subquery規則最佳化    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan, relBuilder)    val planWithMaterializedTimeAttributes =      RelTimeIndicatorConverter.convert(decorPlan, relBuilder.getRexBuilder)    val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)    val logicalPlan = optimizeLogicalPlan(normalizedPlan)    val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)    optimizeDecoratePlan(physicalPlan, updatesAsRetraction)}//BatchOptimizer.scaladef optimize(relNode: RelNode): RelNode = {    val convSubQueryPlan = optimizeConvertSubQueries(relNode)    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)    val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)    val logicalPlan = optimizeLogicalPlan(normalizedPlan)    optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)}

前兩步最佳化規則都一樣

 FlinkRuleSets.TABLE_SUBQUERY_RULES FlinkRuleSets.EXPAND_PLAN_RULES, FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES

另外optimizeLogicalPlan也是一樣的規則

  protected def getBuiltInLogicalOptRuleSet: RuleSet = {    FlinkRuleSets.LOGICAL_OPT_RULES  }

但是optimizeNormalizeLogicalPlan,optimizePhysicalPlan使用的規則是不一樣的, batch使用的規則

FlinkRuleSets.DATASET_NORM_RULESFlinkRuleSets.DATASET_OPT_RULES

stream使用的規則

FlinkRuleSets.DATASTREAM_NORM_RULESFlinkRuleSets.DATASTREAM_OPT_RULES

stream在後邊還有一步optimizeDecoratePlan,使用的規則是

FlinkRuleSets.DATASTREAM_DECO_RULES

最後會轉換成dataStream/dataset?

//StreamOptimizer.scalaval dataStream = translateToCRow(optimizedPlan, queryConfig)private def translateOptimized[A](  optimizedPlan: RelNode,  logicalSchema: TableSchema,  tpe: TypeInformation[A],  queryConfig: StreamQueryConfig,  withChangeFlag: Boolean): DataStream[A] = {val dataStream = translateToCRow(optimizedPlan, queryConfig)DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)}  private def translateToCRow(    logicalPlan: RelNode,    queryConfig: StreamQueryConfig): DataStream[CRow] = {    logicalPlan match {      case node: DataStreamRel =>        node.translateToPlan(this, queryConfig)      case _ =>        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +          "This is a bug and should not happen. Please file an issue.")    }  }

這裡的DataStreamRel是個trait,extends了FlinkRelNode

trait DataStreamRel extends FlinkRelNode {  /**    * Translates the FlinkRelNode into a Flink operator.    *    * @param tableEnv    The [[StreamPlanner]] of the translated Table.    * @param queryConfig The configuration for the query to generate.    * @return DataStream of type [[CRow]]    */  def translateToPlan(    tableEnv: StreamPlanner,    queryConfig: StreamQueryConfig): DataStream[CRow]    }    ...}

實現translateToPlan的很多,比如DataStreamWindowJoin,DataStreamJoin

18
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 精講分散式系統核心:實戰:基於JavaRMI實現分散式物件通訊