Administrator
Published on 2023-11-02 / 2 Visits
0

Yarn Federation任务提交流程(二)

一、Home SubCluster的选择

什么是Home SubCluster

在Yarn Fedration模式下,一个大规模集群会被分成很多子集群,那么在提交一个作业任务时需要选择其中一个子集群作为这个任务的Home,官方文档释义如下:

The sub-cluster on which the Application Master (AM) runs is called the Application’s “home sub-cluster”. The AM is not limited to resources from the home sub-cluster but can also request resources from other sub-clusters, referred to as secondary sub-clusters. The federated environment will be configured and tuned periodically such that when an AM is placed on a sub-cluster, it should be able to find most of the resources on the home sub-cluster. Only in certain cases it should need to ask for resources from other sub-clusters.

翻译:在子集群运行的Application Master称为这个应用程序的“主子集群”。AM不仅限于来自主子集群的资源,还可以从其他子集群(称为辅助子集群)请求资源。此联邦环境会被定期配置和调整以便于当一个AM被置于一个子集群,它可以在主子集群获取更多的资源。只有在某些情况下,它才需要从其他子集群请求资源。

说人话就是,Home SubCluster是选出来作为一个AM运行的子集群,这个选择的结果主要由Yarn Router控制;除此之外,在一个子集群运行的AM可以借用其他子集群的资源来支持任务的顺利运行。

Home SubCluster是如何被选出来的

主要由Yarn Router来做决定,根据前文所说,Yarn Router作为一层路由,接收客户端请求并通过FederationClientInterceptor来对请求做一些处理,最后将请求再转发到选出的Home SubCluster。那么分析一下选出Home SubCluster的流程。

在讲之前先复习一下,在前文提到的ZookeeperFederationStateStore在初始化是会创建三个目录,这里选择Home SubCluster会用到其中的/federationstore/policy目录下的数据,此目录下以queue来作为区分。也就是说选择Home SubCluster可以根据queue纬度来配置。
下面为选择Home SubCluster的代码:

  public SubClusterId getHomeSubcluster(
      ApplicationSubmissionContext appSubmissionContext,
      List<SubClusterId> blackListSubClusters) throws YarnException {

    // the maps are concurrent, but we need to protect from reset()
    // reinitialization mid-execution by creating a new reference local to this
    // method.
    Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
    Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;

    if (appSubmissionContext == null) {
      throw new FederationPolicyException(
          "The ApplicationSubmissionContext " + "cannot be null.");
    }

    String queue = appSubmissionContext.getQueue();

    // respecting YARN behavior we assume default queue if the queue is not
    // specified. This also ensures that "null" can be used as a key to get the
    // default behavior.
    if (queue == null) {
      queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
    }

    // the facade might cache this request, based on its parameterization
    SubClusterPolicyConfiguration configuration = null;

    try {
      configuration = federationFacade.getPolicyConfiguration(queue);
    } catch (YarnException e) {
      String errMsg = "There is no policy configured for the queue: " + queue
          + ", falling back to defaults.";
      LOG.warn(errMsg, e);
    }

    // If there is no policy configured for this queue, fallback to the baseline
    // policy that is configured either in the store or via XML config (and
    // cached)
    if (configuration == null) {
      LOG.warn("There is no policies configured for queue: " + queue + " we"
          + " fallback to default policy for: "
          + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);

      queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
      try {
        configuration = federationFacade.getPolicyConfiguration(queue);
      } catch (YarnException e) {
        String errMsg = "Cannot retrieve policy configured for the queue: "
            + queue + ", falling back to defaults.";
        LOG.warn(errMsg, e);

      }
    }

    // the fallback is not configure via store, but via XML, using
    // previously loaded configuration.
    if (configuration == null) {
      configuration =
          cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
      if (LOG.isDebugEnabled()) {
        LOG.debug("No policy configured for queue: " + queue + " we"
            + " fallback to default policy for: "
            + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
      }
    }

    // if the configuration has changed since last loaded, reinit the policy
    // based on current configuration
    if (!cachedConfs.containsKey(queue)
        || !cachedConfs.get(queue).equals(configuration)) {
      singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
    }

    FederationRouterPolicy policy = policyMap.get(queue);
    if (policy == null) {
      // this should never happen, as the to maps are updated together
      throw new FederationPolicyException("No FederationRouterPolicy found "
          + "for queue: " + appSubmissionContext.getQueue() + " (for "
          + "application: " + appSubmissionContext.getApplicationId() + ") "
          + "and no default specified.");
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Getting home subcluster for application: "
          + appSubmissionContext.getApplicationId() + " with queue: " + queue
          + " and configuration: " + configuration + " and policy: "
          + policy.getClass().getName());
    }
    return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
  }

大致流程就是先根据队列在ZK中获取SubClusterPolicyConfiguration,比较缓存中是否有这个配置,如果没有的话就把这个配置加入到缓存,这里看一下singlePolicyReinit方法:

  private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
      Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
      SubClusterPolicyConfiguration conf)
      throws FederationPolicyInitializationException {

    FederationPolicyInitializationContext context =
        new FederationPolicyInitializationContext(conf, subClusterResolver,
            federationFacade, null);
    String newType = context.getSubClusterPolicyConfiguration().getType();
    FederationRouterPolicy routerPolicy = policyMap.get(queue);

    FederationPolicyManager federationPolicyManager =
        FederationPolicyUtils.instantiatePolicyManager(newType);
    // set queue, reinit policy if required (implementation lazily check
    // content of conf), and cache it
    federationPolicyManager.setQueue(queue);
    routerPolicy =
        federationPolicyManager.getRouterPolicy(context, routerPolicy);

    // we need the two put to be atomic (across multiple threads invoking
    // this and reset operations)
    synchronized (this) {
      policyMap.put(queue, routerPolicy);
      cachedConfs.put(queue, conf);
    }
  }

这里呢有个叫FederationPolicyManager的类,这是一个可配置的类,我们可以看到其有很多实现类,这些实现类中的构造方法对父类的AbstractPolicyManager的routerFederationPolicyamrmProxyFedrationPolicy进行了赋值,这里的routerFederationPolicy就是主要决定该以何种方式来选择Home SubCluster的,而amrmProxyFedrationPolicy是决定该以何种方式来借用其他子集群的资源的。

我们可以看到FederationRouterPolicy的诸多实现类,其中的getHomeCluster方法有具体的选择策略,也就是第89行的走向。

RouterPolicy.png
这里介绍一下我们常用的WeightedRandomRouterPolicy,其routerPolicyWeight可以配置:

{
    "routerPolicyWeights":{
        "entry":[
            {
                "key":{
                    "id":"yarnclustera"
                },
                "value":"1.0"
            },
            {
                "key":{
                    "id":"yarnclusterb"
                },
                "value":"0.0"
            }
        ]
    },
    "amrmPolicyWeights":{
        "entry":[
            {
                "key":{
                    "id":"yarnclustera"
                },
                "value":"0.5"
            },
            {
                "key":{
                    "id":"yarnclusterb"
                },
                "value":"0.5"
            }
        ]
    },
    "headroomAlpha":"0.0"
}

可以比较清晰的看到id代表子集群的id,value代表这个子集群被选为Home SubCluster的概率。

二、子集群的资源借用

上一节说到在Home SubCluster运行的AM可以借用其他子集群的资源,这个资源就是container。来看下Hadoop官方文档此的描述:

The AMRMProxy is a key component to allow the application to scale and run across sub-clusters. The AMRMProxy runs on all the NM machines and acts as a proxy to the YARN RM for the AMs by implementing the ApplicationMasterProtocol. Applications will not be allowed to communicate with the sub-cluster RMs directly. They are forced by the system to connect only to the AMRMProxy endpoint, which would provide transparent access to multiple YARN RMs (by dynamically routing/splitting/merging the communications). At any one time, a job can span across one home sub-cluster and multiple secondary sub-clusters, but the policies operating in the AMRMProxy try to limit the footprint of each job to minimize overhead on the scheduling infrastructure (more in section on scalability/load).

翻译:AMRMProxy是允许应用程序扩展和跨子集群运行的关键组件。AMRMProxy运行在所有的NM机器上,并通过实现ApplicationMasterProtocol作为AMs的YARN RM的代理。应用不允许直接与子集群RM通信。它们被系统强制只连接到AMRMProxy,这将提供对多个YARN RMs的透明访问(通过动态路由/拆分/合并通信)。在任何时候,作业都可以跨一个主子集群和多个辅助子集群,但是在AMRMProxy中操作的策略试图限制每个作业的占用,以最小化调度基础设施的开销(更多信息请参见可伸缩性/负载部分)。

说人话就是AMRMProxyService这个服务作为一个AM和RM的代理服务,其原理就和上节提到的Router一样,拦截AM到RM的请求做一些加工处理(其实就是向其他子集群借资源)。要想开启这个功能需要在nodemanager的yarn-site.xml文件内增加如下配置:

  <property>
    <name>yarn.nodemanager.amrmproxy.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
    <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor</value>
  </property>

AMRMProxyService的初始化

第一个配置主要是用于判断需要初始化AMRMProxyService,代码片段如下:

  protected void createAMRMProxyService(Configuration conf) {
    this.amrmProxyEnabled =
        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED) ||
            conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
                YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);

    if (amrmProxyEnabled) {
      LOG.info("AMRMProxyService is enabled. "
          + "All the AM->RM requests will be intercepted by the proxy");
      this.setAMRMProxyService(
          new AMRMProxyService(this.context, this.dispatcher));
      addService(this.getAMRMProxyService());
    } else {
      LOG.info("AMRMProxyService is disabled");
    }
  }

那么这个AMRMProxyService在哪用到的呢?观察调用栈可以发现是在ContainerManangerImpl.startContainer方法中,代码如下:

  if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
      .equals(ContainerType.APPLICATION_MASTER)) {
    this.getAMRMProxyService().processApplicationStartRequest(request);
  }

可以看到这里的request(StartContainerRequest)进行了加工,那看一下processApplicationStartRequest方法,这个方法主要做了:

  1. 检查applPipelineMap中是否存在此Application,如果有则说明是在这个Application失败了,这里会把这个Application的上下文清除;如果不存在的话会初始化一个新的拦截链。
  2. 初始化拦截链(包括AM到RM的心跳数据的处理加工类,UnmanagedAMPoolManager,AMRM的策略类(不过是null)等)


ApplicationMaster在启动时会向RM注册自己,方法为registerApplicationMaster,那么这个RPC请求会先被FederationInterceptor拦截,在拦截时才会真正初始化策略类FederationAMRMProxyPolicy,代码如下:

    // Initialize the AMRMProxyPolicy
    try {
      this.policyInterpreter =
          FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
              getConf(), this.federationFacade, this.homeSubClusterId);
    } catch (FederationPolicyInitializationException e) {
      throw new YarnRuntimeException(e);
    }

loadAMRMPolicy其实就是获取上节提到的FederationPolicyManager中配置的amrmProxyFedrationPolicy的值。

资源借用策略

那么在下一步ApplicationMaster向RM请求资源的时候(通过allocate方法)时,也会被拦截,那么看一下FederationInterceptor中的allocate方法,有这样一段逻辑,可以看到里面必定是对申请资源的请求进行了一些处理,那么就看下里面做了什么吧。

  // Split the heart beat request into multiple requests, one for each
  // sub-cluster RM that is used by this application.
  Map<SubClusterId, AllocateRequest> requests =
      splitAllocateRequest(request);

可以看到最后是调用了在注册时初始化的policyInterpreter的splitResourceRequests方法。

  protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
      List<ResourceRequest> askList) throws YarnException {
    return policyInterpreter.splitResourceRequests(askList,
        getTimedOutSCs(true));
  }

FederationAMRMProxyPolicy有诸多实现类,可以实现不同的资源借用策略,读者可以自行研究。
ProxyPolicy.png
ps:被借用资源的子集群会创建一个Unmanaged AM来管理被借用的资源。

引申

工作中遇到一个场景,公司有两个子集群,一个是物理集群,一个是弹性集群。由于弹性集群规模比较小,经常会出现一个计算任务就把整个集群占满的情况,所以这里想设计一个策略,对于Router的选择策略可做到根据两个子集群的负载指标动态调整概率的效果;对于AMRMProxy的分配策略可以做到如果AM在物理集群运行则不允许借用弹性集群的资源,反之如果AM在弹性集群运行则可以借用物理集群的资源。

这里的方案是在两个子集群的NM所在节点增加一个配置,并新加一个AMRMProxyPolicy配置类,根据这个配置台决定是选择HomeAMRMProxy还是其他的策略。