Flink中的StreamGraph,JobGraph,ExecutionGraph是如何一步一步转变的
一、StreamGraph
在用户自定义的Flink Java应用程序中,经常会用到多个算子以流式的方式来对source进行计算最后sink到另一个地方,那么从用户程序角度来看其实Flink会初步把用户程序中的source,transform,sink组装成StreamGraph,表示用户程序的拓扑结构。那么从代码来看下StreamGraph是怎么生成的。
env.execute() //用户提交的Flink程序入口,从这里开始看
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
if (!clusterDatasetCorruptedException.isPresent()) {
throw t;
}
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}
//从getStreamGraph()方法一直点进去到StreamGraphGenerator的generate方法,重点关注其中的transform方法,其中的transformations其实是算子的集合
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);//进入到这里,可以看到这里会区分批还是流来进行转换
一路找到translateForStreamingInternal这个抽象方法,如下图其有很多个实现类

这里的实现类代表用户程序算子的类型,比如拿flatMap这个算子举例:
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
//可以看到其传入的StreamFlatMap是OneInputStreamOperator的实现类,那么在translateForStreamingInternal对应的实现类就是OneInputTransformationTranslator
//继续观察直到找到如下代码
//···
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName()); //逻辑为创建一个StreamNode对象放入到Map集合中,key为Vertex ID,value为StreamNode对象
//···
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
//此时已经将StreamNode之间的关系构建起来,其关系存在StreamGraph中的
//private transient Map<Integer, StreamNode> streamNodes;变量中
//StreamNode中记录Edge的信息:
//private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
//private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
二、JobGraph
StreamGraph经过优化后生成JobGraph,提交给JobManager的数据结构。主要优化为,将多个符合条件的节点chain在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输的开销。其中StreamNode转化为JobVertex,StreamEdge转化为JobEdge,两者使用IntermediateDataSet连接。
我们还是从execute方法继续看下,上一节说到其已经使用getStreamGraph方法把source,transform,sink抽象成StreamGraph来表示,后面的execute(streamGraph)才是转换的过程。需要注意的是,Local,Embedd模式下是不会进行转换的。
//一路找到StreamNode中的这个方法
public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}
//继续找到核心方法
StreamingJobGraphGenerator.createJobGraph方法
//判断上下游节点是否可以chain的条件
public static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(streamGraph.isChainingEnabled() //没有禁用算子链
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //上下游算子实例处于同一个SlotSharingGroup中,上下游算子并行度相同
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) //上游算子的链接策略
&& arePartitionerAndExchangeModeChainable( //两个算子间的物理分区逻辑,和shuffle模式
edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {
return false;
}
// check that we do not have a union operation, because unions currently only work
// through the network/byte-channel stack.
// we check that by testing that each "type" (which means input position) is used only once
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
//createChain方法,递归调用分类为可以chain的边和不可以chain的边,并封装一些chainInfo到StreamConfig中
//接下来就是连接的流程,JobVertex与IntermediateDataSet相连,IntermediateDataSet与JobEdge相连,JobEdge与JobVertex相连
//setAllVertexNonChainedOutputsConfigs -> setVertexNonChainedOutputsConfig -> connect -> connectNewDataSetAsInput
//直到这个方法,intermediateDataSetId维护两个part的随机long行整数
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType,
IntermediateDataSetID intermediateDataSetId,
boolean isBroadcast) {
IntermediateDataSet dataSet =
input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
三、ExecutionGraph
client生成JobGraph之后,通过submitJobt提交给JobManager,JobManager会根据JobGraph生成对应的ExecutionGraph。ExecutionGraph是Flink作业调度时使用到的核心结构数据,它包含每一个并行的task、IntermediateDataSet以及他们之间的关系。
ExecutionGraph的入口比较复杂,其流程嵌合在创建Dispatcher中,Dispatcher主要干两件事情,接受用户作业和生成JobMaster。这里暂不详细讲述全部流程,以Per Job模式为例只给出调用链路,如下:
StreamExecutionEnvironment.execute() ->
AbstractJobClusterExecutor.execute() ->
YarnClusterDescriptor.deployCluster() -> startAppMaster() -> setupApplicationMasterContainer()
YarnJobClusterEntrypoint.main() ->
ClusterEntryPoint.runClusterEntrypoint() -> startCluster() -> runCluster() ->
DefaultDispatcherResourceManagerComponentFactory.create() ->
DefaultDispatcherRunnerFactory.createDispatcherRunner() ->
DefaultDispatcherRunner.create() -> start() ->
StandaloneLeaderElection.startLeaderElection() ->
JobMasterServiceLeadershipRunner.grantLeadership() -> createNewJobMasterServiceProcessIfValidLeader() -> createNewJobMasterServiceProcess() ->
DefaultJobMasterServiceProcessFactory.create() ->
DefaultJobMasterServiceFactory.createJobMasterService() -> internalCreateJobMasterService() -> new JobMaster() -> createScheduler() ->
DefaultSlotPoolServiceSchedulerFactory.createScheduler() ->
DefaultSchedulerFactory.createScheduler() -> new DefaultScheduler() ->
SchedulerBase.createAndRestoreExecutionGraph() ->
DefaultExecutionGraphFactory.createAndRestoreExecutionGraph() ->
DefaultExecutionGraphBuilder.buildGraph()
ExecutionGraph的构建核心在buildGraph方法内部
public void attachJobGraph(
List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)
throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug(
"Attaching {} topologically sorted vertices to existing job graph with {} "
+ "vertices and {} intermediate results.",
verticesToAttach.size(),
tasks.size(),
intermediateResults.size());
attachJobVertices(verticesToAttach, jobManagerJobMetricGroup); //将JobVertex转换成ExecutionJobVertex
if (!isDynamic) {
initializeJobVertices(verticesToAttach); //初始化ExecutionJobVertex,并连接
}
// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
partitionGroupReleaseStrategy =
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
⚠️ Flink在后续版本将ExecutionEdge改为EdgeManager