Administrator
Published on 2023-08-02 / 1 Visits
0

Yarn Federation任务提交流程(一)

1. 启动Router时做了什么事

官方对Router有如下释义,这个组件的作用大致意思就是可以对外暴露ApplicationClientProtocol,让外界不感知ResourceManager的存在,所有通过Yarn Client发送的请求会经过这里,从而进行解耦。

/**
 * The router is a stateless YARN component which is the entry point to the
 * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
 * a LoadBalancer.
 *
 * The Router exposes the ApplicationClientProtocol (RPC and REST) to the
 * outside world, transparently hiding the presence of ResourceManager(s), which
 * allows users to request and update reservations, submit and kill
 * applications, and request status on running applications.
 *
 * In addition, it exposes the ResourceManager Admin API.
 *
 * This provides a placeholder for throttling mis-behaving clients (YARN-1546)
 * and masks the access to multiple RMs (YARN-3659).
 */

我们来看下这个类的main方法。

public static void main(String[] argv) {
    Configuration conf = new YarnConfiguration();
    Thread
        .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    StringUtils.startupShutdownMessage(Router.class, argv, LOG);
    Router router = new Router();
    try {

      // Remove the old hook if we are rebooting.
      if (null != routerShutdownHook) {
        ShutdownHookManager.get().removeShutdownHook(routerShutdownHook);
      }

      routerShutdownHook = new CompositeServiceShutdownHook(router);
      ShutdownHookManager.get().addShutdownHook(routerShutdownHook,
          SHUTDOWN_HOOK_PRIORITY);

      router.init(conf);
      router.start();
    } catch (Throwable t) {
      LOG.error("Error starting Router", t);
      System.exit(-1);
    }
  }

我们需要关注router.init和router.start方法,这两个方法都是父类的方法,主要是用于做一些状态的变更以及回调子类重写父类的方法。


我们先看init方法,这个方法是父类AbstractService的方法,经过这个方法会把Router的状态从NOTINITED变为INITED,并回调Router中的serviceInit方法,代码如下。

@Override
  protected void serviceInit(Configuration config) throws Exception {
    this.conf = config;
    // ClientRM Proxy
    clientRMProxyService = createClientRMProxyService();
    addService(clientRMProxyService);
    // RMAdmin Proxy
    rmAdminProxyService = createRMAdminProxyService();
    addService(rmAdminProxyService);
    // WebService
    webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
        YarnConfiguration.ROUTER_BIND_HOST,
        WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf));
    // Metrics
    DefaultMetricsSystem.initialize(METRICS_NAME);
    super.serviceInit(conf);
  }

这个方法的作用是,将RouterClientRMService和RouterRMAdminService两个代理类初始化,这里还是通过父类AbstractService做了状态的变更,状态从NOTINITED变为INITED。


接下来是start方法,这里还是通过父类AbstractService做了状态的变更,把Router的状态从INITED变为STARTED,然后回调Router类中的serviceStart方法,代码如下。

@Override
  protected void serviceStart() throws Exception {
    try {
      doSecureLogin();
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed Router login", e);
    }
    startWepApp();
    super.serviceStart();
  }

doSecureLogin是判断是不是使用了kerberos,可以先略过。接下来是startWepApp方法,这个主要是起了一个HTTP Server用来创建并启动Router的Web应用程序。它根据配置参数决定是否启用WebApp代理功能,以实现请求的路由和代理。通过WebApp,管理员和用户可以通过Web界面来查看Router和联邦集群的状态、监控资源使用情况以及查看和管理正在运行的应用程序。

public void startWepApp() {

    RMWebAppUtil.setupSecurityAndFilters(conf, null);

    Builder<Object> builder =
        WebApps.$for("cluster", null, null, "ws").with(conf).at(webAppAddress);
    if(conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE)) {
      fetcher = new FedAppReportFetcher(conf);
      builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
          ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
      builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
      String proxyHostAndPort = getProxyHostAndPort(conf);
      String[] proxyParts = proxyHostAndPort.split(":");
      builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
    }
    webApp = builder.start(new RouterWebApp(this));
  }

这里面的fetcher = new FedAppReportFetcher(conf);这一步会通过YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS的配置来初始化FederationStateStore具体的存储方式,项目中配置一般是ZookeeperFederationStateStore,在初始化时会在ZK上创建三个目录,目录结构为:
|--- FEDERATIONSTORE(可配置)
|--- MEMBERSHIP
|----- SC1
|----- SC2
|--- APPLICATION
|----- APP1
|----- APP2
|--- POLICY
|----- QUEUE1
|----- QUEUE1
后续会从ZK获取合适的cluster来发送Application。


接着是super.serviceStart方法,会启动刚才存入List中的两个服务RouterClientRMService和RouterRMAdminService,套路还是一样,先经由父类变更状态从INITED变为STARTED,然后回调自己的serviceStart()方法。
先说RouterClientRMService,代码如下:

@Override
  protected void serviceStart() throws Exception {
    LOG.info("Starting Router ClientRMService");
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    UserGroupInformation.setConfiguration(conf);

    this.listenerEndpoint =
        conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
            YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);

    if (getConfig().getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE)) {
      redirectURL = getRedirectURL();
    }

    int maxCacheSize =
        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
    this.userPipelineMap = Collections.synchronizedMap(
        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
            maxCacheSize, true));

    Configuration serverConf = new Configuration(conf);

    int numWorkerThreads =
        serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
            YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);

    this.server = rpc.getServer(ApplicationClientProtocol.class, this,
        listenerEndpoint, serverConf, null, numWorkerThreads);

    // Set service-level authorization security policy
    boolean serviceAuthEnabled = conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
    if (serviceAuthEnabled) {
      server.refreshServiceAclWithLoadedConfiguration(conf, RMPolicyProvider.getInstance());
    }

    this.server.start();
    LOG.info("Router ClientRMService listening on address: "
        + this.server.getListenerAddress());
    super.serviceStart();
  }

前面都是在获取或者判断一些配置参数,比较重要的是rpc.getServer这一步,这一步会构造一个RpcServer用于通信,最终我们来到这里

public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig) {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        pbServiceImplClazz = conf
            .getClassByName(getPbServiceImplClassName(protocol)); //这里的值是ApplicationClientProtocolPBServiceImpl
      } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnRuntimeException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      service = constructor.newInstance(instance); //这里通过ApplicationClientProtocolPBServiceImpl的构造方法设置变量值,this.real = impl
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    } catch (InstantiationException e) {
      throw new YarnRuntimeException(e);
    }

    Class<?> pbProtocol = service.getClass().getInterfaces()[0]; //得到ApplicationClientProtocolPBServiceImpl的第一个实现类ApplicationClientProtocolPB
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        protoClazz = conf.getClassByName(getProtoClassName(protocol)); //ApplicationClientProtocol$ApplicationClientProtocolService
      } catch (ClassNotFoundException e) {
        throw new YarnRuntimeException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);//得到ApplicationClientProtocol中的抽象类ApplicationClientProtocolService中的newReflectiveBlockingService方法
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnRuntimeException(e);
      }
    }
    
    try {
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnRuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new YarnRuntimeException(e);
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
  }

整个方法就是在获取构建Rpc Server所需要的协议和其他的一些配置,注意这里的ApplicationClientProtocol是org.apache.hadoop.yarn.proto包下的,而不是org.apache.hadoop.yarn.api下的。
另一个服务RouterRMAdminService的启动流程也是相同的道理。到这里整个Router的启动也就完全结束了。


2. 从YarnClient提交Application时是如何走到RouterClientRMService的

从YarnClient的提交Application的过程不再赘述,网上有很多文章讲的原理,这里从rmClient.submitApplication来讲。
在讲之前先说下YarnClientImpl的serviceStart方法,有一行
rmClient = ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class);
这里发生了Rpc调用,由于在Hadoop中通信协议一般都是Protocol Buffers,所以这里的实现类为ApplicationClientProtocolPBClientImpl,并使用此类的构造函数进行初始化。

private ApplicationClientProtocolPB proxy;

  public ApplicationClientProtocolPBClientImpl(long clientVersion,
      InetSocketAddress addr, Configuration conf) throws IOException {
    RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
      ProtobufRpcEngine.class);//将配置项"rpc.engine.ApplicationClientProtocolPB"设置成ProtobufRpcEngine 
    //创建proxy
    //这个proxy存在于用户为提交运行具体应用而起的那个JVM上,它既不属于
    //ResourceManager,也不属于NodeManager,而是一个独立的Java虚拟机,可以是在集群//内的任何一台机器上
    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
  }

  @Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException,
      IOException {
    //从请求request中取出其协议报文(message)部分 
    SubmitApplicationRequestProto requestProto =
        ((SubmitApplicationRequestPBImpl) request).getProto();
    try {
      //交由proxy将报文发送出去,并等候服务端回应 
      //将服务端回应包装成SubmitApplicationResponsePBImpl对象
      return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }

ApplicationClientProtocolPBClientImpl的submitApplication方法,在其里面就是调用proxy.submitApplication方法,而proxy是在构造函数中创建的。
通过proxy发出的SubmitApplicationRequest,是以RM节点为目标的,最终经由操作系统提供的网络传输层以TCP报文的方式送达RM所在节点机上的对等层,那上面是ProtoBuf,它会从TCP报文中还原出对端所发送的对象。再往上,那就是同样也实现了ApplicationClientProtocolPB界面的ApplicationClientProtocolPBServiceImpl,ProtoBuf这一层会根据对方请求直接就调用其submitApplication()。这样,Client一侧对于ApplicationClientProtocolPBClientImpl所提供函数的调用就转化成Server一侧对于applicationClientProtocolPBServiceImpl所提供的对应函数的调用。当然,Server一侧函数调用的返回值也会转化成Client一侧的返回值,这就实现了远程过程调用RPC。不言而喻,Client/Server双方的这两个对象必须提供对同一个界面的实现,在这里就是ApplicationClientProtocolPB。


以上通过Rpc请求将Application发送到server端,对应的类为ApplicationClientProtocolPBServiceImpl的submitApplication方法。

@Override
  public SubmitApplicationResponseProto submitApplication(RpcController arg0,
      SubmitApplicationRequestProto proto) throws ServiceException {
    SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);
    try {
      SubmitApplicationResponse response = real.submitApplication(request);
      return ((SubmitApplicationResponsePBImpl)response).getProto();
    } catch (YarnException e) {
      throw new ServiceException(e);
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

还记得在前面启动Router时来到过这个类吗,这里的real就是RouterClientRMService,所以这就很巧妙的执行到了RouterClientRMService中的submitApplication方法。

3. 拦截链

@Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
    return pipeline.getRootInterceptor().submitApplication(request);
  }

  protected RequestInterceptorChainWrapper getInterceptorChain()
      throws IOException {
    String user = UserGroupInformation.getCurrentUser().getUserName();
    RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
    if (chain != null && chain.getRootInterceptor() != null) {
      return chain;
    }
    return initializePipeline(user);
  }

  private RequestInterceptorChainWrapper initializePipeline(String user) {
    synchronized (this.userPipelineMap) {
      if (this.userPipelineMap.containsKey(user)) {
        LOG.info("Request to start an already existing user: {}"
            + " was received, so ignoring.", user);
        return userPipelineMap.get(user);
      }

      RequestInterceptorChainWrapper chainWrapper =
          new RequestInterceptorChainWrapper();
      try {
        // We should init the pipeline instance after it is created and then
        // add to the map, to ensure thread safe.
        LOG.info("Initializing request processing pipeline for application "
            + "for the user: {}", user);

        ClientRequestInterceptor interceptorChain =
            this.createRequestInterceptorChain(); //此处根据配置参数得到具体是实现类,这里使用FederationInterceptor,注意:这里可以配置多个拦截器,从而形成一个拦截链,
//此自定义的拦截类必须是AbstractRequestInterceptor的实现类。其中FederationInterceptor必须是拦截链的最后一个,调用FederationInterceptor的setNextInterceptor会报错。
        interceptorChain.init(user);
        chainWrapper.init(interceptorChain);
      } catch (Exception e) {
        LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
        throw e;
      }

      this.userPipelineMap.put(user, chainWrapper);
      return chainWrapper;
    }
  }

在interceptorChain.init(user);这一步做了一些Router Policy的一些设置,这里有几个地方要需要着重看一下

public RouterPolicyFacade(Configuration conf,
      FederationStateStoreFacade facade, SubClusterResolver resolver,
      SubClusterId homeSubcluster)
      throws FederationPolicyInitializationException {
    //从Zookeeper目录里获取具体的Router Policy,这里用到的JCache,这里重新定义了一个CacheLoader,使得可以在get的时候出发invoke方法
    SubClusterPolicyConfiguration configuration = null;
    try {
      configuration = federationFacade.getPolicyConfiguration(defaultKey);
    } catch (YarnException e) {
      LOG.warn("No fallback behavior defined in store, defaulting to XML "
          + "configuration fallback behavior.");
    }
    
    //Type为从配置参数获取的一个值,这里使用的是HomePolicyManager
    FederationPolicyManager fallbackPolicyManager =
      FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
  }

将拦截链的pipeline设置好后会使用rootInterceptor.submitApplication方法。

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    //随机获取一个Active SubCluster
    SubClusterId subClusterId = policyFacade.getHomeSubcluster(
    request.getApplicationSubmissionContext(), blacklist);
    
    //将此application放入zk目录下
    subClusterId =
        federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
    
    //通过RPC协议向RM提交应用
    ApplicationClientProtocol clientRMProxy =
        getClientRMProxyForSubCluster(subClusterId);
       
    SubmitApplicationResponse response = null;
    try {
      response = clientRMProxy.submitApplication(request);
    } catch (Exception e) {
      LOG.warn("Unable to submit the application " + applicationId
          + "to SubCluster " + subClusterId.getId(), e);
    }
  }

整个提交流程就结束了。