Administrator
Published on 2023-10-05 / 1 Visits
0

对于慢节点(盘)的处理

一、背景

什么是慢节点?
随着HDFS集群规模的不断增长、服务器使用寿命的缩减,在大规模集群中性能退化节点的出现是必然的,我们将这样的节点称为慢节点。慢节点问题是大规模集群中的常见问题,其影响范围可以波及整个集群,会对集群的吞吐能力造成严重的影响。
HDFS作为大数据平台的存储底座,慢节点问题将对上层各个应用产生频繁的影响,可能造成数据无法按时产出,上游任务的延迟将波及下游任务,发生链式反应。Hadoop社区原生版本中对慢节点的防治有初步的实践,本文将对社区的一些PR进行集中的讲解,这些PR一般已经合入了高版本的Hadoop分支中。

二、发现慢节点(盘)

① DataNode收集监控数据
在前文的HDFS写文件流程中提到的receivePacket方法,其内部调用了

trackSendPacketToLastNodeInPipeline(duration);

DN选用Pipline的倒数第二个节点作为基准节点,统计其发送packet到最后一个节点的耗时,以此作为判断慢节点的依据。只采用倒数第二个节点进行统计,可以降低统计的复杂度。具体可以看下这个方法的注释。

这里稍微讲一下来龙去脉,首先在DataNode启动时,会初始化DataNodePeerMetrics对象,构造方法内设置了一些属性,并初始化MutableRollingAverages对象,其内部启动了一个RatesRoller线程,这个线程会把快照下来的指标进行封装进一个Map结构中,key是下游DN的ip,value是以SumAndCount为元素的队列。DN每次向下游DN发送packet时,会将发送的packet数量和耗时记录下来,用SampleStat保存。然后每5min对采集的数据做聚合,保存到SumAndCount,该对象记录了最近5min内发送的packet个数和总耗时。最后,将SumAndCount存放到LinkedBlockingDeque。LinkedBlockingDeque默认长度是36,如果队列满了,踢掉队首元素,在队尾追加。因此,该模型统计的时间窗口为最近3小时,这一方面可以保证数据的时效性,又可以保证数据的准确性。

peerMetrics = dnConf.peerStatsEnabled ?
        DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
/******************************************************************************/       
public DataNodePeerMetrics(final String name, Configuration conf) {
  this.name = name;
  minOutlierDetectionSamples = conf.getLong(
      DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
      DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
  lowThresholdMs =
      conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
          DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
  minOutlierDetectionNodes =
      conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
          DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
  this.slowNodeDetector =
      new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
  sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
}

② DataNode汇报慢节点
在前文DataNode启动流程中提到了DataNode与NameNode维持心跳的过程,DataNode在心跳汇报时,会将根据最近3小时数据,计算与每个下游节点传输packet的平均耗时,并计算对应的中位数和平均绝对偏差(MAD)。采用median + 3*MAD作为阈值,如果和某个下游DataNode传输packet的平均耗时超过了该阈值,则认为是慢节点。那么,就把改下游DataNode汇报给NameNode。

在BPServiceActor.sendHeartBeat中,有如下流程,具体计算流程在getOutliers中。

    final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
    final SlowPeerReports slowPeers =
        outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != null ?
            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
            SlowPeerReports.EMPTY_REPORT;

③ NameNode处理SlowPeerReport
前文提到心跳是通过RPC协议来通信的,那么在通信时NameNode需要处理DataNode上报的这些慢节点报告,具体在下面的流程

if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
  final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
  if (!slowPeersMap.isEmpty()) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
    }
    for (String slowNodeId : slowPeersMap.keySet()) {
      slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
    }
  }
}

这里出现了一个陌生的变量slowPeerTracker,让我们看下它是如何构造的,首先看到是在initSlowPeerTracker来赋值的,所以查看下调用栈发现是在NameNode启动时一路调用过来的,initSlowPeerTracker里初始化了allReport对象,所以addReport就是把节点信息加入到allReport中。

同时,在NameNode中启动一个SlowPeerCollector线程,定期将allReports里面的慢节点根据reportNodes的数量进行排序,根据配置取出TopN的节点,组成集群中最慢的节点集合slowNodesUuidSet。在读写流程中,将可以使用slowNodesUuidSet对慢节做相应处理。Namenode根据reportNodes数量排序,认为被汇报最多的节点是最慢的节点,这保证了慢节点识别的准确性。代码如下:

  private void startSlowPeerCollector() {
    if (slowPeerCollectorDaemon != null) {
      return;
    }
    slowPeerCollectorDaemon = new Daemon(new Runnable() {
      @Override
      public void run() {
        while (true) {
          try {
            slowNodesUuidSet = getSlowPeersUuidSet();
          } catch (Exception e) {
            LOG.error("Failed to collect slow peers", e);
          }

          try {
            Thread.sleep(slowPeerCollectionInterval);
          } catch (InterruptedException e) {
            LOG.error("Slow peers collection thread interrupted", e);
            return;
          }
        }
      }
    });
    slowPeerCollectorDaemon.start();
  }

三、对于慢节点写处理

HDFS在写数据时,会调用BlockPlacementPolicy的实现类的chooseTarget方法,为block的副本选择合适的DN节点。在chooseTarget方法中,根据SlowPeerCollector收集的slowNodesUuidSet,将慢节点剔除掉。这样,chooseTarget将返回不包含慢节点的DN集合,给客户端写数据,以此优化写数据性能。

boolean isGoodDatanode(DatanodeDescriptor node,
                         int maxTargetPerRack, boolean considerLoad,
                         List<DatanodeStorageInfo> results,
                         boolean avoidStaleNodes) {
  ...
  // check if the target is a slow node
  if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
    Set<String> slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet();
    if (slowNodesUuidSet.contains(node.getDatanodeUuid())) {
      logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
      return false;
    }
  }

  return true;
}

四、对于慢节点读处理

在前文HDFS读文件流程提到,HDFS在读数据时,会调用FSNamesystem的getBlockLocations方法,选择读数据的DN列表。然后默认选择列表中的第一个DN读数据,如果改DN读数据失败则选取下一个DN,以此类推。在getBlockLocations方法中,有对DN列表进行排序的功能,会将非正常节点放在正常节点之后。我们修改了Comparator的对比规则,如果要读取的block所在的DN在slowNodesUuidSet中,则将该节点放在正常节点之后。这样,可以帮助客户端尽可能避开从慢节点读数据,以此提升读数据的性能。

具体的方法如下:

  public void sortLocatedBlocks(final String targetHost,
      final List<LocatedBlock> locatedBlocks) {
    Comparator<DatanodeInfo> comparator =
        avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ?
        new DFSUtil.StaleAndSlowComparator(
            avoidStaleDataNodesForRead, staleInterval,
            avoidSlowDataNodesForRead, slowNodesUuidSet) :
        new DFSUtil.ServiceComparator();
    // sort located block
    for (LocatedBlock lb : locatedBlocks) {
      if (lb.isStriped()) {
        sortLocatedStripedBlock(lb, comparator);
      } else {
        sortLocatedBlock(lb, targetHost, comparator);
      }
    }
  }

具体的Comaprator规则如下:

public int compare(DatanodeInfo a, DatanodeInfo b) {
  int ret = super.compare(a, b);
  if (ret != 0) {
    return ret;
  }

  // Stale nodes will be moved behind the normal nodes
  if (avoidStaleDataNodesForRead) {
    boolean aStale = a.isStale(staleInterval);
    boolean bStale = b.isStale(staleInterval);
    ret = aStale == bStale ? 0 : (aStale ? 1 : -1);
    if (ret != 0) {
      return ret;
    }
  }

  // Slow nodes will be moved behind the normal nodes
  if (avoidSlowDataNodesForRead) {
    boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid());
    boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid());
    ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1);
  }
  return ret;
}

五、客户端检测慢节点与剔除

铺垫:
① 在前文DataNode启动流程中,提到了DataNode的注册与连接到NameNode的过程。其中connectToNNAndHandshake()方法中会初始化block pool,会维护一个Map结构,其中会记录isSlowNode,因此DataNode可以根据心跳响应来得知自己是否是慢节点。
② 在前文HDFS写流程中,提到了PacketResponder,其中的run方法中的sendAckUpStream方法会向上游返回Ack信息,此DataNode是否是slow在Ack的Header中反应。

sendAckUpstream(ack, expected, totalAckTimeNanos,
  (pkt != null ? pkt.offsetInBlock : 0),
    PipelineAck.combineHeader(datanode.getECN(), myStatus,
        datanode.getSLOWByBlockPoolId(block.getBlockPoolId())));

OK,在ResponseProcessor线程类的run方法中会从socket流中读取DataNode发送过来的ack数据存放到PipelineAck对象中。然后从PipelineAck对象中拿到pipeline中每一个DataNode是不是SLOW的信息,如果某个DataNode返回的ack中包含SLOW状态超过某个阈值(默认值是10次),那就会被标记为Bad DataNode,设置一下errorState并且抛出一个异常,让这个ResponseProcessor线程结束掉。

    PipelineAck ack = new PipelineAck();
    ...
    ack.readFields(blockReplyStream);
    ...
    if (PipelineAck.getSLOWFromHeader(ack.getHeaderFlag(i)) ==
        PipelineAck.SLOW.SLOW) {
      slownodesFromAck.add(targets[i]);
    }
    ...
    if (slownodesFromAck.isEmpty()) {
      if (!slowNodeMap.isEmpty()) {
        slowNodeMap.clear();
      }
    } else {
      markSlowNode(slownodesFromAck);
      LOG.debug("SlowNodeMap content: {}.", slowNodeMap);
    }
  void markSlowNode(List<DatanodeInfo> slownodesFromAck) throws IOException {
    Set<DatanodeInfo> discontinuousNodes = new HashSet<>(slowNodeMap.keySet());
    for (DatanodeInfo slowNode : slownodesFromAck) {
      if (!slowNodeMap.containsKey(slowNode)) {
        slowNodeMap.put(slowNode, 1);
      } else {
        int oldCount = slowNodeMap.get(slowNode);
        slowNodeMap.put(slowNode, ++oldCount);
      }
      discontinuousNodes.remove(slowNode);
    }
    for (DatanodeInfo discontinuousNode : discontinuousNodes) {
      slowNodeMap.remove(discontinuousNode);
    }

    if (!slowNodeMap.isEmpty()) {
      for (Map.Entry<DatanodeInfo, Integer> entry : slowNodeMap.entrySet()) {
        if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) {
          DatanodeInfo slowNode = entry.getKey();
          int index = getDatanodeIndex(slowNode);
          if (index >= 0) {
            errorState.setBadNodeIndex(index);
            throw new IOException("Receive reply from slowNode " + slowNode +
                " for continuous " + markSlowNodeAsBadNodeThreshold +
                " times, treating it as badNode");
          }
          slowNodeMap.remove(entry.getKey());
        }
      }
    }
  }

发现了慢节点抛出异常后,就进入了pipeline recovery阶段,这样DataStreamer的run方法在下一次循环时,调用processDatanodeOrExternalError方法就可以对这个Pipeline进行处理了。处理的主要逻辑如下:

  1. 关闭原来的一些socket流,将ackQueue中packet全部添加到dataQueue中,然后清空ackQueue。这么做的原因是:当前发生了datanode错误或者外部错误,我们要重新发送这些packet数据,因此要重新将packet添加到dataQueue中。
  2. 调用setupPipelineForAppendOrRecovery()方法进行pipeline的恢复。这个方法是核心方法,此方法中会调用setupPipelineInternal方法。setupPipelineInternal方法中会调用handleBadDatanode方法将bad datanode从datanode列表中移除,然后再调用handleDatanodeReplacement方法进行datanode的替换工作。如果说移除某个datanode后的写datanode的列表节点个数触发了重新构造pipeline的条件的话,就会调用getAdditionalDatanode这个RPC选出一个新的datanode加入到现有的pipeline中,从而达到将SLOW节点从pipeline中踢出的效果。

这里有个小细节。就是判断是否需要调用getAdditionalDatanode RPC来向pipeline中增加一个新节点这里。是有条件的。由handleDatanodeReplacement方法进行处理。
这个dtpReplaceDatanodeOnFailure是客户端配置的参数,有三种策略,我们讲下默认的策略。
默认的策略: 假设 r 为副本数。 n 是现存的DataNode数。 当且仅当 r >= 3 并且  要么 floor ( r/2) >= n 或者 block被flushed/appended时,才会去选一个新的DataNode添加到pipeline中。
最后再通过transfer方法把副本数据在新加入的节点复制一份。

六、慢盘相关

请参考社区HDFS-16371,大致流程与前文所述大致不差。

七、参考

HDFS-10917 & HDFS-11194 & HDFS-11461
核心功能:DN统计Peer指标并推送给NN识别慢节点
核心类:DataNodePeerMetrics , SlowPeerTracker , OutlierDetector, SlowPeerReports
异常值检测逻辑:
收集指标:sendPacketDownstreamRollingAverages
样本量过少,忽略(可配置)
Median absolute deviation(MAD)中位数绝对偏差

大于
设定阈值(可配置)
3 *

对DN 每30min 上报SLowNode
对NM 取被报告数量 top5 节点

HDFS-10959 & HDFS-11545 & HDFS-11551
核心类:DataNodeVolumeMetrics, SlowDiskReports, SlowDiskTracker
异常值检测逻辑相同
读、写、metadataOP 三种异常值
METADATA READ WRITE
异常值出现次数最多的磁盘
随机抽取一定比例的IO操作进行采样

HDFS-15879
核心功能:避免慢节点写操作。
核心逻辑: BlockPlacementPolicyDefault 类 chooseTargetInOrder 方法及isGoodDatanode 方法。若为慢节点,则不选择为 BlockPlacement 操作的目标。
慢节点为 SlowPeerTracker 统计结果。

HDFS-16076
核心功能:避免慢节点读操作。
核心逻辑:DatanodeManager 类sortLocatedBlocks 方法。添加avoidSlowDataNodesForRead 逻辑,将慢节点排序至末尾,即客户端会优先读取非Slow节点。
慢节点为 SlowPeerTracker 统计结果。

HDFS-16320
核心逻辑:HeartbeatResponse 添加isSlownode 字段,让DN知道自己为慢节点。

HDFS-16348
核心逻辑:PipelineAck 添加SLOW_BITS ,让客户端知道自己连接的是慢节点。

HDFS-16371
核心功能:避免慢盘写操作。
核心逻辑:FsVolumeList 类chooseVolume 方法,排除慢盘。