1. 启动Router时做了什么事
官方对Router有如下释义,这个组件的作用大致意思就是可以对外暴露ApplicationClientProtocol,让外界不感知ResourceManager的存在,所有通过Yarn Client发送的请求会经过这里,从而进行解耦。
/**
* The router is a stateless YARN component which is the entry point to the
* cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
* a LoadBalancer.
*
* The Router exposes the ApplicationClientProtocol (RPC and REST) to the
* outside world, transparently hiding the presence of ResourceManager(s), which
* allows users to request and update reservations, submit and kill
* applications, and request status on running applications.
*
* In addition, it exposes the ResourceManager Admin API.
*
* This provides a placeholder for throttling mis-behaving clients (YARN-1546)
* and masks the access to multiple RMs (YARN-3659).
*/
我们来看下这个类的main方法。
public static void main(String[] argv) {
Configuration conf = new YarnConfiguration();
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(Router.class, argv, LOG);
Router router = new Router();
try {
// Remove the old hook if we are rebooting.
if (null != routerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(routerShutdownHook);
}
routerShutdownHook = new CompositeServiceShutdownHook(router);
ShutdownHookManager.get().addShutdownHook(routerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
router.init(conf);
router.start();
} catch (Throwable t) {
LOG.error("Error starting Router", t);
System.exit(-1);
}
}
我们需要关注router.init和router.start方法,这两个方法都是父类的方法,主要是用于做一些状态的变更以及回调子类重写父类的方法。
我们先看init方法,这个方法是父类AbstractService的方法,经过这个方法会把Router的状态从NOTINITED变为INITED,并回调Router中的serviceInit方法,代码如下。
@Override
protected void serviceInit(Configuration config) throws Exception {
this.conf = config;
// ClientRM Proxy
clientRMProxyService = createClientRMProxyService();
addService(clientRMProxyService);
// RMAdmin Proxy
rmAdminProxyService = createRMAdminProxyService();
addService(rmAdminProxyService);
// WebService
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.ROUTER_BIND_HOST,
WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf));
// Metrics
DefaultMetricsSystem.initialize(METRICS_NAME);
super.serviceInit(conf);
}
这个方法的作用是,将RouterClientRMService和RouterRMAdminService两个代理类初始化,这里还是通过父类AbstractService做了状态的变更,状态从NOTINITED变为INITED。
接下来是start方法,这里还是通过父类AbstractService做了状态的变更,把Router的状态从INITED变为STARTED,然后回调Router类中的serviceStart方法,代码如下。
@Override
protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed Router login", e);
}
startWepApp();
super.serviceStart();
}
doSecureLogin是判断是不是使用了kerberos,可以先略过。接下来是startWepApp方法,这个主要是起了一个HTTP Server用来创建并启动Router的Web应用程序。它根据配置参数决定是否启用WebApp代理功能,以实现请求的路由和代理。通过WebApp,管理员和用户可以通过Web界面来查看Router和联邦集群的状态、监控资源使用情况以及查看和管理正在运行的应用程序。
public void startWepApp() {
RMWebAppUtil.setupSecurityAndFilters(conf, null);
Builder<Object> builder =
WebApps.$for("cluster", null, null, "ws").with(conf).at(webAppAddress);
if(conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE)) {
fetcher = new FedAppReportFetcher(conf);
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
String proxyHostAndPort = getProxyHostAndPort(conf);
String[] proxyParts = proxyHostAndPort.split(":");
builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
}
webApp = builder.start(new RouterWebApp(this));
}
这里面的fetcher = new FedAppReportFetcher(conf);这一步会通过YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS的配置来初始化FederationStateStore具体的存储方式,项目中配置一般是ZookeeperFederationStateStore,在初始化时会在ZK上创建三个目录,目录结构为:
|--- FEDERATIONSTORE(可配置)
|--- MEMBERSHIP
|----- SC1
|----- SC2
|--- APPLICATION
|----- APP1
|----- APP2
|--- POLICY
|----- QUEUE1
|----- QUEUE1
后续会从ZK获取合适的cluster来发送Application。
接着是super.serviceStart方法,会启动刚才存入List中的两个服务RouterClientRMService和RouterRMAdminService,套路还是一样,先经由父类变更状态从INITED变为STARTED,然后回调自己的serviceStart()方法。
先说RouterClientRMService,代码如下:
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting Router ClientRMService");
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf);
this.listenerEndpoint =
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
if (getConfig().getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE)) {
redirectURL = getRedirectURL();
}
int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true));
Configuration serverConf = new Configuration(conf);
int numWorkerThreads =
serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);
this.server = rpc.getServer(ApplicationClientProtocol.class, this,
listenerEndpoint, serverConf, null, numWorkerThreads);
// Set service-level authorization security policy
boolean serviceAuthEnabled = conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
if (serviceAuthEnabled) {
server.refreshServiceAclWithLoadedConfiguration(conf, RMPolicyProvider.getInstance());
}
this.server.start();
LOG.info("Router ClientRMService listening on address: "
+ this.server.getListenerAddress());
super.serviceStart();
}
前面都是在获取或者判断一些配置参数,比较重要的是rpc.getServer这一步,这一步会构造一个RpcServer用于通信,最终我们来到这里
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
String portRangeConfig) {
Constructor<?> constructor = serviceCache.get(protocol);
if (constructor == null) {
Class<?> pbServiceImplClazz = null;
try {
pbServiceImplClazz = conf
.getClassByName(getPbServiceImplClassName(protocol)); //这里的值是ApplicationClientProtocolPBServiceImpl
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Failed to load class: ["
+ getPbServiceImplClassName(protocol) + "]", e);
}
try {
constructor = pbServiceImplClazz.getConstructor(protocol);
constructor.setAccessible(true);
serviceCache.putIfAbsent(protocol, constructor);
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Could not find constructor with params: "
+ Long.TYPE + ", " + InetSocketAddress.class + ", "
+ Configuration.class, e);
}
}
Object service = null;
try {
service = constructor.newInstance(instance); //这里通过ApplicationClientProtocolPBServiceImpl的构造方法设置变量值,this.real = impl
} catch (InvocationTargetException e) {
throw new YarnRuntimeException(e);
} catch (IllegalAccessException e) {
throw new YarnRuntimeException(e);
} catch (InstantiationException e) {
throw new YarnRuntimeException(e);
}
Class<?> pbProtocol = service.getClass().getInterfaces()[0]; //得到ApplicationClientProtocolPBServiceImpl的第一个实现类ApplicationClientProtocolPB
Method method = protoCache.get(protocol);
if (method == null) {
Class<?> protoClazz = null;
try {
protoClazz = conf.getClassByName(getProtoClassName(protocol)); //ApplicationClientProtocol$ApplicationClientProtocolService
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Failed to load class: ["
+ getProtoClassName(protocol) + "]", e);
}
try {
method = protoClazz.getMethod("newReflectiveBlockingService",
pbProtocol.getInterfaces()[0]);//得到ApplicationClientProtocol中的抽象类ApplicationClientProtocolService中的newReflectiveBlockingService方法
method.setAccessible(true);
protoCache.putIfAbsent(protocol, method);
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException(e);
}
}
try {
return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
(BlockingService)method.invoke(null, service), portRangeConfig);
} catch (InvocationTargetException e) {
throw new YarnRuntimeException(e);
} catch (IllegalAccessException e) {
throw new YarnRuntimeException(e);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
整个方法就是在获取构建Rpc Server所需要的协议和其他的一些配置,注意这里的ApplicationClientProtocol是org.apache.hadoop.yarn.proto包下的,而不是org.apache.hadoop.yarn.api下的。
另一个服务RouterRMAdminService的启动流程也是相同的道理。到这里整个Router的启动也就完全结束了。
2. 从YarnClient提交Application时是如何走到RouterClientRMService的
从YarnClient的提交Application的过程不再赘述,网上有很多文章讲的原理,这里从rmClient.submitApplication来讲。
在讲之前先说下YarnClientImpl的serviceStart方法,有一行
rmClient = ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class);
这里发生了Rpc调用,由于在Hadoop中通信协议一般都是Protocol Buffers,所以这里的实现类为ApplicationClientProtocolPBClientImpl,并使用此类的构造函数进行初始化。
private ApplicationClientProtocolPB proxy;
public ApplicationClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
ProtobufRpcEngine.class);//将配置项"rpc.engine.ApplicationClientProtocolPB"设置成ProtobufRpcEngine
//创建proxy
//这个proxy存在于用户为提交运行具体应用而起的那个JVM上,它既不属于
//ResourceManager,也不属于NodeManager,而是一个独立的Java虚拟机,可以是在集群//内的任何一台机器上
proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException,
IOException {
//从请求request中取出其协议报文(message)部分
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl) request).getProto();
try {
//交由proxy将报文发送出去,并等候服务端回应
//将服务端回应包装成SubmitApplicationResponsePBImpl对象
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
ApplicationClientProtocolPBClientImpl的submitApplication方法,在其里面就是调用proxy.submitApplication方法,而proxy是在构造函数中创建的。
通过proxy发出的SubmitApplicationRequest,是以RM节点为目标的,最终经由操作系统提供的网络传输层以TCP报文的方式送达RM所在节点机上的对等层,那上面是ProtoBuf,它会从TCP报文中还原出对端所发送的对象。再往上,那就是同样也实现了ApplicationClientProtocolPB界面的ApplicationClientProtocolPBServiceImpl,ProtoBuf这一层会根据对方请求直接就调用其submitApplication()。这样,Client一侧对于ApplicationClientProtocolPBClientImpl所提供函数的调用就转化成Server一侧对于applicationClientProtocolPBServiceImpl所提供的对应函数的调用。当然,Server一侧函数调用的返回值也会转化成Client一侧的返回值,这就实现了远程过程调用RPC。不言而喻,Client/Server双方的这两个对象必须提供对同一个界面的实现,在这里就是ApplicationClientProtocolPB。
以上通过Rpc请求将Application发送到server端,对应的类为ApplicationClientProtocolPBServiceImpl的submitApplication方法。
@Override
public SubmitApplicationResponseProto submitApplication(RpcController arg0,
SubmitApplicationRequestProto proto) throws ServiceException {
SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);
try {
SubmitApplicationResponse response = real.submitApplication(request);
return ((SubmitApplicationResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
还记得在前面启动Router时来到过这个类吗,这里的real就是RouterClientRMService,所以这就很巧妙的执行到了RouterClientRMService中的submitApplication方法。
3. 拦截链
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().submitApplication(request);
}
protected RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
if (chain != null && chain.getRootInterceptor() != null) {
return chain;
}
return initializePipeline(user);
}
private RequestInterceptorChainWrapper initializePipeline(String user) {
synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user);
return userPipelineMap.get(user);
}
RequestInterceptorChainWrapper chainWrapper =
new RequestInterceptorChainWrapper();
try {
// We should init the pipeline instance after it is created and then
// add to the map, to ensure thread safe.
LOG.info("Initializing request processing pipeline for application "
+ "for the user: {}", user);
ClientRequestInterceptor interceptorChain =
this.createRequestInterceptorChain(); //此处根据配置参数得到具体是实现类,这里使用FederationInterceptor,注意:这里可以配置多个拦截器,从而形成一个拦截链,
//此自定义的拦截类必须是AbstractRequestInterceptor的实现类。其中FederationInterceptor必须是拦截链的最后一个,调用FederationInterceptor的setNextInterceptor会报错。
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
throw e;
}
this.userPipelineMap.put(user, chainWrapper);
return chainWrapper;
}
}
在interceptorChain.init(user);这一步做了一些Router Policy的一些设置,这里有几个地方要需要着重看一下
public RouterPolicyFacade(Configuration conf,
FederationStateStoreFacade facade, SubClusterResolver resolver,
SubClusterId homeSubcluster)
throws FederationPolicyInitializationException {
//从Zookeeper目录里获取具体的Router Policy,这里用到的JCache,这里重新定义了一个CacheLoader,使得可以在get的时候出发invoke方法
SubClusterPolicyConfiguration configuration = null;
try {
configuration = federationFacade.getPolicyConfiguration(defaultKey);
} catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior.");
}
//Type为从配置参数获取的一个值,这里使用的是HomePolicyManager
FederationPolicyManager fallbackPolicyManager =
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
}
将拦截链的pipeline设置好后会使用rootInterceptor.submitApplication方法。
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
//随机获取一个Active SubCluster
SubClusterId subClusterId = policyFacade.getHomeSubcluster(
request.getApplicationSubmissionContext(), blacklist);
//将此application放入zk目录下
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
//通过RPC协议向RM提交应用
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
SubmitApplicationResponse response = null;
try {
response = clientRMProxy.submitApplication(request);
} catch (Exception e) {
LOG.warn("Unable to submit the application " + applicationId
+ "to SubCluster " + subClusterId.getId(), e);
}
}
整个提交流程就结束了。