Administrator
Published on 2024-11-10 / 6 Visits
0

Flink中的StreamGraph,JobGraph,ExecutionGraph是如何一步一步转变的

Flink中的StreamGraph,JobGraph,ExecutionGraph是如何一步一步转变的

一、StreamGraph

在用户自定义的Flink Java应用程序中,经常会用到多个算子以流式的方式来对source进行计算最后sink到另一个地方,那么从用户程序角度来看其实Flink会初步把用户程序中的source,transform,sink组装成StreamGraph,表示用户程序的拓扑结构。那么从代码来看下StreamGraph是怎么生成的。

env.execute() //用户提交的Flink程序入口,从这里开始看

public JobExecutionResult execute(String jobName) throws Exception {
    final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
    StreamGraph streamGraph = getStreamGraph();
    if (jobName != null) {
        streamGraph.setJobName(jobName);
    }

    try {
        return execute(streamGraph);
    } catch (Throwable t) {
        Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
                ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
        if (!clusterDatasetCorruptedException.isPresent()) {
            throw t;
        }

        // Retry without cache if it is caused by corrupted cluster dataset.
        invalidateCacheTransformations(originalTransformations);
        streamGraph = getStreamGraph(originalTransformations);
        return execute(streamGraph);
    }
}

//从getStreamGraph()方法一直点进去到StreamGraphGenerator的generate方法,重点关注其中的transform方法,其中的transformations其实是算子的集合
return shouldExecuteInBatchMode
        ? translator.translateForBatch(transform, context)
        : translator.translateForStreaming(transform, context);//进入到这里,可以看到这里会区分批还是流来进行转换

一路找到translateForStreamingInternal这个抽象方法,如下图其有很多个实现类

![image-20241101164248838](/Users/tiger/Library/Application Support/typora-user-images/image-20241101164248838.png)

这里的实现类代表用户程序算子的类型,比如拿flatMap这个算子举例:

public <R> SingleOutputStreamOperator<R> flatMap(
        FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
    return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
//可以看到其传入的StreamFlatMap是OneInputStreamOperator的实现类,那么在translateForStreamingInternal对应的实现类就是OneInputTransformationTranslator

//继续观察直到找到如下代码

//···
streamGraph.addOperator(
        transformationId,
        slotSharingGroup,
        transformation.getCoLocationGroupKey(),
        operatorFactory,
        inputType,
        transformation.getOutputType(),
        transformation.getName()); //逻辑为创建一个StreamNode对象放入到Map集合中,key为Vertex ID,value为StreamNode对象
//···
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
    streamGraph.addEdge(inputId, transformationId, 0);
}

getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
//此时已经将StreamNode之间的关系构建起来,其关系存在StreamGraph中的
//private transient Map<Integer, StreamNode> streamNodes;变量中

//StreamNode中记录Edge的信息:
//private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
//private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

二、JobGraph

StreamGraph经过优化后生成JobGraph,提交给JobManager的数据结构。主要优化为,将多个符合条件的节点chain在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输的开销。其中StreamNode转化为JobVertex,StreamEdge转化为JobEdge,两者使用IntermediateDataSet连接。

我们还是从execute方法继续看下,上一节说到其已经使用getStreamGraph方法把source,transform,sink抽象成StreamGraph来表示,后面的execute(streamGraph)才是转换的过程。需要注意的是,Local,Embedd模式下是不会进行转换的。

//一路找到StreamNode中的这个方法
public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
    return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}

//继续找到核心方法
StreamingJobGraphGenerator.createJobGraph方法

//判断上下游节点是否可以chain的条件
public static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    if (!(streamGraph.isChainingEnabled() //没有禁用算子链
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //上下游算子实例处于同一个SlotSharingGroup中,上下游算子并行度相同
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) //上游算子的链接策略
            && arePartitionerAndExchangeModeChainable( //两个算子间的物理分区逻辑,和shuffle模式
                    edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {

        return false;
    }

    // check that we do not have a union operation, because unions currently only work
    // through the network/byte-channel stack.
    // we check that by testing that each "type" (which means input position) is used only once
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
            return false;
        }
    }
    return true;
} 

//createChain方法,递归调用分类为可以chain的边和不可以chain的边,并封装一些chainInfo到StreamConfig中

//接下来就是连接的流程,JobVertex与IntermediateDataSet相连,IntermediateDataSet与JobEdge相连,JobEdge与JobVertex相连
//setAllVertexNonChainedOutputsConfigs -> setVertexNonChainedOutputsConfig -> connect -> connectNewDataSetAsInput
//直到这个方法,intermediateDataSetId维护两个part的随机long行整数
public JobEdge connectNewDataSetAsInput(
        JobVertex input,
        DistributionPattern distPattern,
        ResultPartitionType partitionType,
        IntermediateDataSetID intermediateDataSetId,
        boolean isBroadcast) {

    IntermediateDataSet dataSet =
            input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);

    JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);
    this.inputs.add(edge);
    dataSet.addConsumer(edge);
    return edge;
}  

三、ExecutionGraph

client生成JobGraph之后,通过submitJobt提交给JobManager,JobManager会根据JobGraph生成对应的ExecutionGraph。ExecutionGraph是Flink作业调度时使用到的核心结构数据,它包含每一个并行的task、IntermediateDataSet以及他们之间的关系。

ExecutionGraph的入口比较复杂,其流程嵌合在创建Dispatcher中,Dispatcher主要干两件事情,接受用户作业和生成JobMaster。这里暂不详细讲述全部流程,以Per Job模式为例只给出调用链路,如下:

StreamExecutionEnvironment.execute() -> 
  AbstractJobClusterExecutor.execute() -> 
    YarnClusterDescriptor.deployCluster() -> startAppMaster() -> setupApplicationMasterContainer()

YarnJobClusterEntrypoint.main() -> 
  ClusterEntryPoint.runClusterEntrypoint() -> startCluster() -> runCluster() -> 
    DefaultDispatcherResourceManagerComponentFactory.create() -> 
        DefaultDispatcherRunnerFactory.createDispatcherRunner() -> 
            DefaultDispatcherRunner.create() -> start() -> 
                StandaloneLeaderElection.startLeaderElection() -> 
                    JobMasterServiceLeadershipRunner.grantLeadership() -> createNewJobMasterServiceProcessIfValidLeader() -> createNewJobMasterServiceProcess() -> 
                        DefaultJobMasterServiceProcessFactory.create() ->
                            DefaultJobMasterServiceFactory.createJobMasterService() -> internalCreateJobMasterService() -> new JobMaster() -> createScheduler() ->
                                DefaultSlotPoolServiceSchedulerFactory.createScheduler() -> 
                                    DefaultSchedulerFactory.createScheduler() -> new DefaultScheduler() -> 
                                        SchedulerBase.createAndRestoreExecutionGraph() -> 
                                            DefaultExecutionGraphFactory.createAndRestoreExecutionGraph() -> 
                                                DefaultExecutionGraphBuilder.buildGraph()

ExecutionGraph的构建核心在buildGraph方法内部

  public void attachJobGraph(
          List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)
          throws JobException {

      assertRunningInJobMasterMainThread();

      LOG.debug(
              "Attaching {} topologically sorted vertices to existing job graph with {} "
                      + "vertices and {} intermediate results.",
              verticesToAttach.size(),
              tasks.size(),
              intermediateResults.size());

      attachJobVertices(verticesToAttach, jobManagerJobMetricGroup); //将JobVertex转换成ExecutionJobVertex
      if (!isDynamic) {
          initializeJobVertices(verticesToAttach); //初始化ExecutionJobVertex,并连接
      }

      // the topology assigning should happen before notifying new vertices to failoverStrategy
      executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);

      partitionGroupReleaseStrategy =
              partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
  }

⚠️ Flink在后续版本将ExecutionEdge改为EdgeManager