一、背景
什么是慢节点?
随着HDFS集群规模的不断增长、服务器使用寿命的缩减,在大规模集群中性能退化节点的出现是必然的,我们将这样的节点称为慢节点。慢节点问题是大规模集群中的常见问题,其影响范围可以波及整个集群,会对集群的吞吐能力造成严重的影响。
HDFS作为大数据平台的存储底座,慢节点问题将对上层各个应用产生频繁的影响,可能造成数据无法按时产出,上游任务的延迟将波及下游任务,发生链式反应。Hadoop社区原生版本中对慢节点的防治有初步的实践,本文将对社区的一些PR进行集中的讲解,这些PR一般已经合入了高版本的Hadoop分支中。
二、发现慢节点(盘)
① DataNode收集监控数据
在前文的HDFS写文件流程中提到的receivePacket方法,其内部调用了
trackSendPacketToLastNodeInPipeline(duration);
DN选用Pipline的倒数第二个节点作为基准节点,统计其发送packet到最后一个节点的耗时,以此作为判断慢节点的依据。只采用倒数第二个节点进行统计,可以降低统计的复杂度。具体可以看下这个方法的注释。
这里稍微讲一下来龙去脉,首先在DataNode启动时,会初始化DataNodePeerMetrics对象,构造方法内设置了一些属性,并初始化MutableRollingAverages对象,其内部启动了一个RatesRoller线程,这个线程会把快照下来的指标进行封装进一个Map
peerMetrics = dnConf.peerStatsEnabled ?
DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
/******************************************************************************/
public DataNodePeerMetrics(final String name, Configuration conf) {
this.name = name;
minOutlierDetectionSamples = conf.getLong(
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
lowThresholdMs =
conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
minOutlierDetectionNodes =
conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
this.slowNodeDetector =
new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
}
② DataNode汇报慢节点
在前文DataNode启动流程中提到了DataNode与NameNode维持心跳的过程,DataNode在心跳汇报时,会将根据最近3小时数据,计算与每个下游节点传输packet的平均耗时,并计算对应的中位数和平均绝对偏差(MAD)。采用median + 3*MAD作为阈值,如果和某个下游DataNode传输packet的平均耗时超过了该阈值,则认为是慢节点。那么,就把改下游DataNode汇报给NameNode。
在BPServiceActor.sendHeartBeat中,有如下流程,具体计算流程在getOutliers中。
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
final SlowPeerReports slowPeers =
outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
③ NameNode处理SlowPeerReport
前文提到心跳是通过RPC协议来通信的,那么在通信时NameNode需要处理DataNode上报的这些慢节点报告,具体在下面的流程
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
}
}
}
这里出现了一个陌生的变量slowPeerTracker,让我们看下它是如何构造的,首先看到是在initSlowPeerTracker来赋值的,所以查看下调用栈发现是在NameNode启动时一路调用过来的,initSlowPeerTracker里初始化了allReport对象,所以addReport就是把节点信息加入到allReport中。
同时,在NameNode中启动一个SlowPeerCollector线程,定期将allReports里面的慢节点根据reportNodes的数量进行排序,根据配置取出TopN的节点,组成集群中最慢的节点集合slowNodesUuidSet。在读写流程中,将可以使用slowNodesUuidSet对慢节做相应处理。Namenode根据reportNodes数量排序,认为被汇报最多的节点是最慢的节点,这保证了慢节点识别的准确性。代码如下:
private void startSlowPeerCollector() {
if (slowPeerCollectorDaemon != null) {
return;
}
slowPeerCollectorDaemon = new Daemon(new Runnable() {
@Override
public void run() {
while (true) {
try {
slowNodesUuidSet = getSlowPeersUuidSet();
} catch (Exception e) {
LOG.error("Failed to collect slow peers", e);
}
try {
Thread.sleep(slowPeerCollectionInterval);
} catch (InterruptedException e) {
LOG.error("Slow peers collection thread interrupted", e);
return;
}
}
}
});
slowPeerCollectorDaemon.start();
}
三、对于慢节点写处理
HDFS在写数据时,会调用BlockPlacementPolicy的实现类的chooseTarget方法,为block的副本选择合适的DN节点。在chooseTarget方法中,根据SlowPeerCollector收集的slowNodesUuidSet,将慢节点剔除掉。这样,chooseTarget将返回不包含慢节点的DN集合,给客户端写数据,以此优化写数据性能。
boolean isGoodDatanode(DatanodeDescriptor node,
int maxTargetPerRack, boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes) {
...
// check if the target is a slow node
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
Set<String> slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet();
if (slowNodesUuidSet.contains(node.getDatanodeUuid())) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
return false;
}
}
return true;
}
四、对于慢节点读处理
在前文HDFS读文件流程提到,HDFS在读数据时,会调用FSNamesystem的getBlockLocations方法,选择读数据的DN列表。然后默认选择列表中的第一个DN读数据,如果改DN读数据失败则选取下一个DN,以此类推。在getBlockLocations方法中,有对DN列表进行排序的功能,会将非正常节点放在正常节点之后。我们修改了Comparator的对比规则,如果要读取的block所在的DN在slowNodesUuidSet中,则将该节点放在正常节点之后。这样,可以帮助客户端尽可能避开从慢节点读数据,以此提升读数据的性能。
具体的方法如下:
public void sortLocatedBlocks(final String targetHost,
final List<LocatedBlock> locatedBlocks) {
Comparator<DatanodeInfo> comparator =
avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ?
new DFSUtil.StaleAndSlowComparator(
avoidStaleDataNodesForRead, staleInterval,
avoidSlowDataNodesForRead, slowNodesUuidSet) :
new DFSUtil.ServiceComparator();
// sort located block
for (LocatedBlock lb : locatedBlocks) {
if (lb.isStriped()) {
sortLocatedStripedBlock(lb, comparator);
} else {
sortLocatedBlock(lb, targetHost, comparator);
}
}
}
具体的Comaprator规则如下:
public int compare(DatanodeInfo a, DatanodeInfo b) {
int ret = super.compare(a, b);
if (ret != 0) {
return ret;
}
// Stale nodes will be moved behind the normal nodes
if (avoidStaleDataNodesForRead) {
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
ret = aStale == bStale ? 0 : (aStale ? 1 : -1);
if (ret != 0) {
return ret;
}
}
// Slow nodes will be moved behind the normal nodes
if (avoidSlowDataNodesForRead) {
boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid());
boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid());
ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1);
}
return ret;
}
五、客户端检测慢节点与剔除
铺垫:
① 在前文DataNode启动流程中,提到了DataNode的注册与连接到NameNode的过程。其中connectToNNAndHandshake()方法中会初始化block pool,会维护一个Map
② 在前文HDFS写流程中,提到了PacketResponder,其中的run方法中的sendAckUpStream方法会向上游返回Ack信息,此DataNode是否是slow在Ack的Header中反应。
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
PipelineAck.combineHeader(datanode.getECN(), myStatus,
datanode.getSLOWByBlockPoolId(block.getBlockPoolId())));
OK,在ResponseProcessor线程类的run方法中会从socket流中读取DataNode发送过来的ack数据存放到PipelineAck对象中。然后从PipelineAck对象中拿到pipeline中每一个DataNode是不是SLOW的信息,如果某个DataNode返回的ack中包含SLOW状态超过某个阈值(默认值是10次),那就会被标记为Bad DataNode,设置一下errorState并且抛出一个异常,让这个ResponseProcessor线程结束掉。
PipelineAck ack = new PipelineAck();
...
ack.readFields(blockReplyStream);
...
if (PipelineAck.getSLOWFromHeader(ack.getHeaderFlag(i)) ==
PipelineAck.SLOW.SLOW) {
slownodesFromAck.add(targets[i]);
}
...
if (slownodesFromAck.isEmpty()) {
if (!slowNodeMap.isEmpty()) {
slowNodeMap.clear();
}
} else {
markSlowNode(slownodesFromAck);
LOG.debug("SlowNodeMap content: {}.", slowNodeMap);
}
void markSlowNode(List<DatanodeInfo> slownodesFromAck) throws IOException {
Set<DatanodeInfo> discontinuousNodes = new HashSet<>(slowNodeMap.keySet());
for (DatanodeInfo slowNode : slownodesFromAck) {
if (!slowNodeMap.containsKey(slowNode)) {
slowNodeMap.put(slowNode, 1);
} else {
int oldCount = slowNodeMap.get(slowNode);
slowNodeMap.put(slowNode, ++oldCount);
}
discontinuousNodes.remove(slowNode);
}
for (DatanodeInfo discontinuousNode : discontinuousNodes) {
slowNodeMap.remove(discontinuousNode);
}
if (!slowNodeMap.isEmpty()) {
for (Map.Entry<DatanodeInfo, Integer> entry : slowNodeMap.entrySet()) {
if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) {
DatanodeInfo slowNode = entry.getKey();
int index = getDatanodeIndex(slowNode);
if (index >= 0) {
errorState.setBadNodeIndex(index);
throw new IOException("Receive reply from slowNode " + slowNode +
" for continuous " + markSlowNodeAsBadNodeThreshold +
" times, treating it as badNode");
}
slowNodeMap.remove(entry.getKey());
}
}
}
}
发现了慢节点抛出异常后,就进入了pipeline recovery阶段,这样DataStreamer的run方法在下一次循环时,调用processDatanodeOrExternalError方法就可以对这个Pipeline进行处理了。处理的主要逻辑如下:
- 关闭原来的一些socket流,将ackQueue中packet全部添加到dataQueue中,然后清空ackQueue。这么做的原因是:当前发生了datanode错误或者外部错误,我们要重新发送这些packet数据,因此要重新将packet添加到dataQueue中。
- 调用setupPipelineForAppendOrRecovery()方法进行pipeline的恢复。这个方法是核心方法,此方法中会调用setupPipelineInternal方法。setupPipelineInternal方法中会调用handleBadDatanode方法将bad datanode从datanode列表中移除,然后再调用handleDatanodeReplacement方法进行datanode的替换工作。如果说移除某个datanode后的写datanode的列表节点个数触发了重新构造pipeline的条件的话,就会调用getAdditionalDatanode这个RPC选出一个新的datanode加入到现有的pipeline中,从而达到将SLOW节点从pipeline中踢出的效果。
这里有个小细节。就是判断是否需要调用getAdditionalDatanode RPC来向pipeline中增加一个新节点这里。是有条件的。由handleDatanodeReplacement方法进行处理。
这个dtpReplaceDatanodeOnFailure是客户端配置的参数,有三种策略,我们讲下默认的策略。
默认的策略: 假设 r 为副本数。 n 是现存的DataNode数。 当且仅当 r >= 3 并且 要么 floor ( r/2) >= n 或者 block被flushed/appended时,才会去选一个新的DataNode添加到pipeline中。
最后再通过transfer方法把副本数据在新加入的节点复制一份。
六、慢盘相关
请参考社区HDFS-16371,大致流程与前文所述大致不差。
七、参考
HDFS-10917 & HDFS-11194 & HDFS-11461
核心功能:DN统计Peer指标并推送给NN识别慢节点
核心类:DataNodePeerMetrics , SlowPeerTracker , OutlierDetector, SlowPeerReports
异常值检测逻辑:
收集指标:sendPacketDownstreamRollingAverages
样本量过少,忽略(可配置)
Median absolute deviation(MAD)中位数绝对偏差
大于
设定阈值(可配置)
3 *
对DN 每30min 上报SLowNode
对NM 取被报告数量 top5 节点
HDFS-10959 & HDFS-11545 & HDFS-11551
核心类:DataNodeVolumeMetrics, SlowDiskReports, SlowDiskTracker
异常值检测逻辑相同
读、写、metadataOP 三种异常值
METADATA READ WRITE
异常值出现次数最多的磁盘
随机抽取一定比例的IO操作进行采样
HDFS-15879
核心功能:避免慢节点写操作。
核心逻辑: BlockPlacementPolicyDefault 类 chooseTargetInOrder 方法及isGoodDatanode 方法。若为慢节点,则不选择为 BlockPlacement 操作的目标。
慢节点为 SlowPeerTracker 统计结果。
HDFS-16076
核心功能:避免慢节点读操作。
核心逻辑:DatanodeManager 类sortLocatedBlocks 方法。添加avoidSlowDataNodesForRead 逻辑,将慢节点排序至末尾,即客户端会优先读取非Slow节点。
慢节点为 SlowPeerTracker 统计结果。
HDFS-16320
核心逻辑:HeartbeatResponse 添加isSlownode 字段,让DN知道自己为慢节点。
HDFS-16348
核心逻辑:PipelineAck 添加SLOW_BITS ,让客户端知道自己连接的是慢节点。
HDFS-16371
核心功能:避免慢盘写操作。
核心逻辑:FsVolumeList 类chooseVolume 方法,排除慢盘。