一、背景
在Yarn中为了高效的处理一个任务的生命周期,其采用了基于事件驱动的并发模型,该模型能够大大增加并发性,从而提高系统整体性能。该模型将处理逻辑抽象成事件和对应的事件带调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。该模型图如下:
这张图反映了AsyncDispatcher的工作原理,我们来看下代码:
private final BlockingQueue<Event> eventQueue;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName(dispatcherThreadName);
eventHandlingThread.start();
}
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
首先需要定一个AsyncDispatcher,如果想用其来调度事件,需要使用其register方法将事件注册到eventDispatcherHandler中,具体方法在ResourceManager,NodeManager和MRAppMaster中都有其身影,以MRAppMaster为例,它内部包含一个中央异步调度器,并注册了TaskAttemptEvent/TaskAttemptImpl,TaskEvent/TaskImpl,JobEvent/JobImpl等一系列事件/事件处理器,并由中央异步调度器统一管理和调度。在代码中,createThread方法创建了一个线程用于消费阻塞队列,然后将队列中的事件一一交给dispatch方法找到对应的处理器处理。
这种基于事件驱动的模型使得Yarn具有低耦合,高内聚的特点,各个模块或者组件只需要定义自己的处理器处理自己的事件,而模块之间用事件的流转联系起来,维护起来也相对比较简便。
二、事件驱动带来的变化
我们知道在Yarn出现之前,任务的调度是通过早期的JobTracker和TaskTracker来实现的,对象之间的作用关系是通过函数调用实现的,整个过程是串行的,这样大大加大了阻塞的程度。所以在后来引入了新的编程模型。
基于函数调用的编程模型是低效的,相比之下,Yarn的出现引入了一种高效的事件驱动编程模型。在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理器处理一种类型的事件,同时根据需要触发另外一种事件。比如一个任务需要下载文件,它不需要一直等待文件下载完成,只需要向中央异步处理器发送一个事件即可(之后可以继续完成后面的工作),该事件会被传递给对应的事件处理器,由其中的逻辑完成下载任务。
相比基于函数调用的编程模型,这种编程方式具有异步、并发等特点,更加高效,因此更适合大型分布式系统。
三、状态机
状态机在Yarn中的作用是在接受到某一个事件时,把一个状态通过一个状态转移函数转化成另一种状态,所以可以说一个任务的生命周期是由很多的状态机来维持流转的,在Yarn中定义了三种状态转换方式:
- 一个初始状态、一个最终状态、一种事件。该方式表示状态机在初始状态下,接收到Event事件后,执行状态转移函数Hook,并在执行完成后将当前状态转换为最终状态。
- 一个初始状态、多个最终状态、一个事件。该方式表示状态机在初始状态下,接收到Event事件后,执行状态转移函数Hook,并在执行完成后将当前状态转换为函数返回值的状态。
- 一个初始状态、一个最终状态、多个事件。该方式表示状态机在初始状态下,接收到多个Event事件中的任何一个后,执行状态转移函数Hook,并在执行完成后将当前状态转换为最终 状态。