一、背景
当一个应用向YARN集群提交作业后,此作业的多个任务由于负载不均衡、资源分布不均等原因都会导致各个任务运行完成的时间不一致,甚至会出现一个任务明显慢于同一作业的其它任务的情况。如果对这种情况不加优化,最慢的任务最终会拖慢整个作业的整体执行进度。好在mapreduce框架提供了任务推断执行机制,当有必要时就启动一个备份任务。最终会采用备份任务和原任务中率先执行完的结果作为最终结果。如果推测执行被触发,则可以在输出的信息中看到Killed map/reduce tasks = {}。
二、Speculator
在MapReduce中,负责执行推测执行的接口规范为Sepculator,我们来看一下:
public interface Speculator
extends EventHandler<SpeculatorEvent> {
enum EventType {
ATTEMPT_STATUS_UPDATE,
ATTEMPT_START,
TASK_CONTAINER_NEED_UPDATE,
JOB_CREATE
}
// This will be implemented if we go to a model where the events are
// processed within the TaskAttempts' state transitions' code.
public void handleAttempt(TaskAttemptStatus status);
}
DefaultSpeculator是Speculator的默认实现。DefaultSpeculator负责处理SpeculatorEvent事件,目前主要包括四种事件,分别是:
JOB_CREATE:作业刚刚被创建时触发的事件,并处理一些初始化工作。
ATTEMPT_START:一个任务实例TaskAttemptImpl启动时触发的事件,DefaultSpeculator将会使用内部的推断估算器(默认是LegacyTaskRuntimeEstimator)开启对此任务实例的监控。
ATTEMPT_STATUS_UPDATE:当任务实例的状态更新时触发的事件,DefaultSpeculator将会更新推断估算器对任务的监控信息;更新正在运行中的任务(维护在runningTasks中);任务的统计信息(这些统计信息用于跟踪长时间未汇报心跳的任务,并积极主动的进行推断执行,而不是等待任务超时)
TASK_CONTAINER_NEED_UPDATE:任务Container数量发生变化时触发的事件。
Speculator的初始化
OK,接下来我们看下其是如何初始化的。
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { //optional service to speculate on task attempts' progress speculator = createSpeculator(conf, context); addIfService(speculator); } speculatorEventDispatcher = new SpeculatorEventDispatcher(conf); dispatcher.register(Speculator.EventType.class, speculatorEventDispatcher);在MRAppMaster中的
serviceInit方法中,对其进行了初始化,步骤如下:当启用map任务推断(这里的MRJobConfig.MAP_SPECULATIVE实际由参数
mapreduce.map.speculative控制,默认是true)或者启用reduce任务推断(这里的MRJobConfig.REDUCE_SPECULATIVE实际由参数mapreduce.reduce.speculative控制,默认是true)时调用createSpeculator方法创建推断服务。最后将Speculator添加为MRAppMaster的子服务。向调度器dispatcher注册推断事件与推断事件的处理器SpeculatorEventDispatcher,以便触发了推断事件后交由SpeculatorEventDispatcher作进一步处理。
createSpeculator方法创建的推断服务的实现类默认是DefaultSpeculator,用户也可以通过参数yarn.app.mapreduce.am.job.speculator.class(即MRJobConfig.MR_AM_JOB_SPECULATOR)指定推断服务的实现类。其中的构造方法又通过反射创建了一个TaskRuntimeEstimator。
// "yarn.mapreduce.job.task.runtime.estimator.class"
Class<? extends TaskRuntimeEstimator> estimatorClass
= conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
LegacyTaskRuntimeEstimator.class,
TaskRuntimeEstimator.class);
Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
= estimatorClass.getConstructor();
estimator = estimatorConstructor.newInstance();
estimator.contextualize(conf, context);
根据代码可以看出推断估算器可以通过参数yarn.app.mapreduce.am.job.task.estimator.class(即MRJobConfig.MR_AM_TASK_ESTIMATOR)进行指定,如果没有指定,则默认使用LegacyTaskRuntimeEstimator。实例化LegacyTaskRuntimeEstimator后,还调用其父类StartEndTimesBase的contextualize方法(进行上下文的初始化,实际就是将当前作业添加到map任务统计列表、reduce任务统计列表,并设置任务与其慢任务阈值mapreduce.job.speculative.slowtaskthreshold之间的映射关系,其代码如下:
@Override
public void contextualize(Configuration conf, AppContext context) {
this.context = context;
Map<JobId, Job> allJobs = context.getAllJobs();
for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
final Job job = entry.getValue();
mapperStatistics.put(job, new DataStatistics());
reducerStatistics.put(job, new DataStatistics());
slowTaskRelativeTresholds.put
(job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
}
}
到这里Speculator初始化过程就完成了,接下来看下是如何启动的。
Speculator的启动
在MRAppMaster启动时(serviceStart方法)中有如下流程:
if (job.isUber()) {
speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\") on node "
+ nmHost + ":" + nmPort + ".");
} else {
// send init to speculator only for non-uber jobs.
// This won't yet start as dispatcher isn't started yet.
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
这里对作业类型进行了一个判断,判断是不是Uber模式,但是在大规模集群下一般是不会使用此模式的。所以这里走到else里,向事件处理器发送了一个SpeculatorEvent事件,事件类型为JOB_CREATE。同时在初始化时注册的SpeculatorEventDispatcher会被调度器调用来处理这个事件,SpeculatorEventDispatcher的实现告诉我们当启用map或者reduce任务推断时,将异步调用Speculator的handle方法处理SpeculatorEvent事件。以默认的DefaultSpeculator的handle方法为例,来看看其实现,代码如下:
@Override
public void handle(SpeculatorEvent event) {
processSpeculatorEvent(event);
}
private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
switch (event.getType()) {
case ATTEMPT_STATUS_UPDATE:
statusUpdate(event.getReportedStatus(), event.getTimestamp());
break;
case TASK_CONTAINER_NEED_UPDATE:
{
AtomicInteger need = containerNeed(event.getTaskID());
need.addAndGet(event.containersNeededChange());
break;
}
case ATTEMPT_START:
{
LOG.info("ATTEMPT_START " + event.getTaskID());
estimator.enrollAttempt
(event.getReportedStatus(), event.getTimestamp());
break;
}
case JOB_CREATE:
{
LOG.info("JOB_CREATE " + event.getJobID());
estimator.contextualize(getConfig(), context);
break;
}
}
}
DefaultSpeculator继承于AbstractService,说明其也是一个需要初始化或者启动的服务,可以看到其内部重写了serviceStart方法,其代码如下:
@Override
protected void serviceStart() throws Exception {
Runnable speculationBackgroundCore
= new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
long backgroundRunStartTime = clock.getTime();
try {
int speculations = computeSpeculations();
long mininumRecomp
= speculations > 0 ? soonestRetryAfterSpeculate
: soonestRetryAfterNoSpeculate;
long wait = Math.max(mininumRecomp,
clock.getTime() - backgroundRunStartTime);
if (speculations > 0) {
LOG.info("We launched " + speculations
+ " speculations. Sleeping " + wait + " milliseconds.");
}
Object pollResult
= scanControl.poll(wait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (!stopped) {
LOG.error("Background thread returning, interrupted", e);
}
return;
}
}
}
};
speculationBackgroundThread = new Thread
(speculationBackgroundCore, "DefaultSpeculator background processing");
speculationBackgroundThread.start();
super.serviceStart();
}
可以看到逻辑是启动了一个用来计算推断调度执行的map和reduce任务数量,在computeSepculations方法中:
private int computeSpeculations() {
// We'll try to issue one map and one reduce speculation per job per run
return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
}
private int maybeScheduleASpeculation(TaskType type) {
int successes = 0;
long now = clock.getTime();
ConcurrentMap<JobId, AtomicInteger> containerNeeds
= type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
// This race conditon is okay. If we skip a speculation attempt we
// should have tried because the event that lowers the number of
// containers needed to zero hasn't come through, it will next time.
// Also, if we miss the fact that the number of containers needed was
// zero but increased due to a failure it's not too bad to launch one
// container prematurely.
if (jobEntry.getValue().get() > 0) {
continue;
}
int numberSpeculationsAlready = 0;
int numberRunningTasks = 0;
// loop through the tasks of the kind
Job job = context.getJob(jobEntry.getKey());
Map<TaskId, Task> tasks = job.getTasks(type);
int numberAllowedSpeculativeTasks
= (int) Math.max(minimumAllowedSpeculativeTasks,
proportionTotalTasksSpeculatable * tasks.size());
TaskId bestTaskID = null;
long bestSpeculationValue = -1L;
// this loop is potentially pricey.
// TODO track the tasks that are potentially worth looking at
for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
if (mySpeculationValue == ALREADY_SPECULATING) {
++numberSpeculationsAlready;
}
if (mySpeculationValue != NOT_RUNNING) {
++numberRunningTasks;
}
if (mySpeculationValue > bestSpeculationValue) {
bestTaskID = taskEntry.getKey();
bestSpeculationValue = mySpeculationValue;
}
}
numberAllowedSpeculativeTasks
= (int) Math.max(numberAllowedSpeculativeTasks,
proportionRunningTasksSpeculatable * numberRunningTasks);
// If we found a speculation target, fire it off
if (bestTaskID != null
&& numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
addSpeculativeAttempt(bestTaskID);
++successes;
}
}
return successes;
}
maybeScheduleASpeculation方法首先根据当前Task的类型(map或reduce)获取相应类型任务的需要分配Container数量的缓存containerNeeds,然后遍历containerNeeds,遍历containerNeeds的执行步骤如下:
- 如果当前Job依然有未分配Container的Task,那么跳过当前循环,继续下一次循环。这说明如果当前Job的某一类型的Task依然存在未分配Container的,则不会进行任务推断;
- 从当前应用的上下文AppContext中获取Job,并获取此Job的所有的Task(map或者reduce);
- 计算允许执行推断的Task数量
numberAllowedSpeculativeTasks(map或者reduce)。其中MINIMUM_ALLOWED_SPECULATIVE_TASKS的值是10,PROPORTION_TOTAL_TASKS_SPECULATABLE的值是0.01。numberAllowedSpeculativeTasks取MINIMUM_ALLOWED_SPECULATIVE_TASKS与PROPORTION_TOTAL_TASKS_SPECULATABLE*任务数量之积之间的最大值。因此我们知道,当Job的某一类型(map或者reduce)的Task的数量小于1100时,计算得到的numberAllowedSpeculativeTasks等于10,如果Job的某一类型(map或者reduce)的Task的数量大于等于1100时,numberAllowedSpeculativeTasks才会大于10。numberAllowedSpeculativeTasks变量可以有效防止大量任务同时启动备份任务所造成的资源浪费。 - 遍历Job对应的map任务或者reduce任务集合,调用
speculationValue方法获取每一个Task的推断值。并在迭代完所有的map任务或者reduce任务后,获取这一任务集合中的推断值bestSpeculationValue最大的任务ID。 - 再次计算
numberAllowedSpeculativeTasks,其中PROPORTION_RUNNING_TASKS_SPECULATABLE的值等于0.1,numberRunningTasks是处于运行中的Task。numberAllowedSpeculativeTasks取numberAllowedSpeculativeTasks与PROPORTION_RUNNING_TASKS_SPECULATABLE*numberRunningTasks之积之间的最大值。 - 如果
numberAllowedSpeculativeTasks大于numberSpeculationsAlready(已经推断执行过的Task数量),则调用addSpeculativeAttempt方法(见代码清单10)将第4步中选出的任务的任务ID添加到推断尝试中。
addSpeculativeAttempt方法如下:
protected void addSpeculativeAttempt(TaskId taskID) {
LOG.info
("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
mayHaveSpeculated.add(taskID);
}
可以看到是发送了一个T_ADD_SPEC_ATTEMPT的TaskEvent。
估算任务的推断值
这一节主要分析下上一节的speculationValue方法,代码如下:
private long speculationValue(TaskId taskID, long now) {
Job job = context.getJob(taskID.getJobId());
Task task = job.getTask(taskID);
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
long acceptableRuntime = Long.MIN_VALUE;
long result = Long.MIN_VALUE;
if (!mayHaveSpeculated.contains(taskID)) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
TaskAttemptId runningTaskAttemptID = null;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
if (taskAttempt.getState() == TaskAttemptState.RUNNING
|| taskAttempt.getState() == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING;
}
runningTaskAttemptID = taskAttempt.getID();
long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
long taskAttemptStartTime
= estimator.attemptEnrolledTime(runningTaskAttemptID);
if (taskAttemptStartTime > now) {
// This background process ran before we could process the task
// attempt status change that chronicles the attempt start
return TOO_NEW;
}
long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
long estimatedReplacementEndTime
= now + estimator.estimatedNewAttemptRuntime(taskID);
float progress = taskAttempt.getProgress();
TaskAttemptHistoryStatistics data =
runningTaskAttemptStatistics.get(runningTaskAttemptID);
if (data == null) {
runningTaskAttemptStatistics.put(runningTaskAttemptID,
new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
} else {
if (estimatedRunTime == data.getEstimatedRunTime()
&& progress == data.getProgress()) {
// Previous stats are same as same stats
if (data.notHeartbeatedInAWhile(now)) {
// Stats have stagnated for a while, simulate heart-beat.
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = runningTaskAttemptID;
taskAttemptStatus.progress = progress;
taskAttemptStatus.taskState = taskAttempt.getState();
// Now simulate the heart-beat
handleAttempt(taskAttemptStatus);
}
} else {
// Stats have changed - update our data structure
data.setEstimatedRunTime(estimatedRunTime);
data.setProgress(progress);
data.resetHeartBeatTime(now);
}
}
if (estimatedEndTime < now) {
return PROGRESS_IS_GOOD;
}
if (estimatedReplacementEndTime >= estimatedEndTime) {
return TOO_LATE_TO_SPECULATE;
}
result = estimatedEndTime - estimatedReplacementEndTime;
}
}
// If we are here, there's at most one task attempt.
if (numberRunningAttempts == 0) {
return NOT_RUNNING;
}
if (acceptableRuntime == Long.MIN_VALUE) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
return result;
}
speculationValue方法的执行步骤如下:
- 如果任务还没有被推断执行,那么调用estimator的thresholdRuntime方法获取任务可以接受的运行时长acceptableRuntime。如果acceptableRuntime等于Long.MAX_VALUE,则将ON_SCHEDULE作为返回值,ON_SCHEDULE的值是Long.MIN_VALUE,以此表示当前任务的推断值很小,即被推断尝试的可能最小。
- 如果任务的运行实例数大于1,则说明此任务已经发生了推断执行,因此返回ALREADY_SPECULATING。ALREADY_SPECULATING等于Long.MIN_VALUE + 1。
- 调用estimator的estimatedRuntime方法获取任务运行实例的估算运行时长estimatedRunTime。
- 调用estimator的attemptEnrolledTime方法获取任务实例开始运行的时间,此时间即为startTimes中缓存的start。这个值是在任务实例启动时导致DefaultSpeculator的processSpeculatorEvent方法处理Speculator.EventType.ATTEMPT_START类型的SpeculatorEvent事件时保存的。
- estimatedEndTime表示估算任务实例的运行结束时间,estimatedEndTime = estimatedRunTime + taskAttemptStartTime。
- 调用estimator的estimatedNewAttemptRuntime方法估算如果此时重新为任务启动一个实例,此实例运行结束的时间estimatedReplacementEndTime。
- 如果缓存中没有任务实例的历史统计信息,那么将estimatedRunTime、任务实例进度progress,当前时间封装为历史统计信息缓存起来。
- 如果缓存中存在任务实例的历史统计信息,如果缓存的estimatedRunTime和本次估算的estimatedRunTime一样并且缓存的实例进度progress和本次获取的任务实例进度progress一样,当有一段时间没有收到心跳了,则模拟一次心跳。如果缓存的estimatedRunTime和本次估算的estimatedRunTime不一样或者缓存的实例进度progress和本次获取的任务实例进度progress不一样,那么将estimatedRunTime、任务实例进度progress,当前时间更新到任务实例的历史统计信息中。
- 如果estimatedEndTime小于当前时间,则说明任务实例的进度良好,返回PROGRESS_IS_GOOD,PROGRESS_IS_GOOD等于Long.MIN_VALUE + 3。
- 如果estimatedReplacementEndTime大于等于estimatedEndTime,则说明即便启动备份任务实例也无济于事,因为它的结束时间达不到节省作业总运行时长的作用。
- 计算本次估算的结果值result,它等于estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表示备份任务实例运行后比原任务实例的结束时间就越早,因此调度执行的价值越大。
- 如果numberRunningAttempts等于0,则表示当前任务还没有启动任务实例,返回NOT_RUNNING,NOT_RUNNING等于Long.MIN_VALUE + 4。
- 重新计算acceptableRuntime,处理方式与第1步相同。
- 返回result。