Administrator
Published on 2023-10-02 / 1 Visits
0

HDFS读文件流程

一、选取合适的Block位置并排序

block位置其实就是指向某一DataNode,所以也可以理解为找到一系列合适的DataNode列表。

//常规使用客户端向HDFS读取数据的代码片段,接下来来以此分析
FSDataInputStream is = dfs.open(new Path("/a/b/c.txt"));
byte[] writeBuf = new byte[1024];
is.read(writeBuf);

这里分为两步,open和read,第一节这里分析下open。

org.apache.hadoop.fs.FileSystem#open
org.apache.hadoop.fs.DistributedFileSystem#open

和写流程一样的doCall方法,我们看一下里面做了什么

  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    statistics.incrementReadOps(1);
    storageStatistics.incrementOpCounter(OpType.OPEN);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p) throws IOException {
        final DFSInputStream dfsis =
            dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

这里有两个流程,一个是获取Block位置,然后将结果包装到DFSInputStream中,我们先来看第一步。

  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException {
    checkOpen();
    //    Get block info from namenode
    try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
      LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
      return openInternal(locatedBlocks, src, verifyChecksum);
    }
  }

以下是获取Block位置的调用栈,我们直接看下do中关键的方法,其中的逻辑有判断处理还没有提交,客户端正在写数据的数据;判断是否有副本损坏的节点或不可读的副本;从blocksMap中获取DatanodeStorageInfo,最后将包含选择好的machines数组传入LocatedBlock构造函数。

org.apache.hadoop.hdfs.DFSClient#getLocatedBlocks
org.apache.hadoop.hdfs.DFSClient#callGetBlockLocations
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#getBlockLocations
org.apache.hadoop.hdfs.server.namenode.FSNamesystem#getBlockLocations
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp#getBlockLocations
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#createLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#createLocatedBlockList
   do {
     locatedBlocks.addBlock(
         createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode));
     curPos += blocks[curBlk].getNumBytes();
     curBlk++;
   } while (curPos < endOff 
         && curBlk < blocks.length
         && !locatedBlocks.isBlockMax());
private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
      final BlockInfo blk, final long pos) throws IOException {
    //判断是否有写入的流程在运行  
    if (!blk.isComplete()) {
      final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
      if (blk.isStriped()) {
        final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
        final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
            blk);
        return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
            false);
      } else {
        final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
        final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
            blk);
        return null == locatedBlocks
            ? newLocatedBlock(eb, storages, pos, false)
                : locatedBlocks.newLocatedBlock(eb, storages, pos, false);
      }
    }

    // get block locations
    NumberReplicas numReplicas = countNodes(blk);
    final int numCorruptNodes = numReplicas.corruptReplicas();
    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
    if (numCorruptNodes != numCorruptReplicas) {
      LOG.warn("Inconsistent number of corrupt replicas for {}"
          + " blockMap has {} but corrupt replicas map has {}",
          blk, numCorruptNodes, numCorruptReplicas);
    }

    final int numNodes = blocksMap.numNodes(blk);
    final boolean isCorrupt;
    if (blk.isStriped()) {
      BlockInfoStriped sblk = (BlockInfoStriped) blk;
      isCorrupt = numCorruptReplicas != 0 &&
          numReplicas.liveReplicas() < sblk.getRealDataBlockNum();
    } else {
      isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
    }
    int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
    numMachines -= numReplicas.maintenanceNotForReadReplicas();
    DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
    final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
    int j = 0, i = 0;
    if (numMachines > 0) {
      final boolean noCorrupt = (numCorruptReplicas == 0);
      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
        if (storage.getState() != State.FAILED) {
          final DatanodeDescriptor d = storage.getDatanodeDescriptor();
          // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
          if (d.isInMaintenance()
              || (d.isEnteringMaintenance() && !d.isAlive())) {
            continue;
          }

          if (noCorrupt) {
            machines[j++] = storage;
            i = setBlockIndices(blk, blockIndices, i, storage);
          } else {
            final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
            if (isCorrupt || !replicaCorrupt) {
              machines[j++] = storage;
              i = setBlockIndices(blk, blockIndices, i, storage);
            }
          }
        }
      }
    }

    if(j < machines.length) {
      machines = Arrays.copyOf(machines, j);
    }

    assert j == machines.length :
      "isCorrupt: " + isCorrupt +
      " numMachines: " + numMachines +
      " numNodes: " + numNodes +
      " numCorrupt: " + numCorruptNodes +
      " numCorruptRepls: " + numCorruptReplicas;
    final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
    return blockIndices == null
        ? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt)
            : locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt)
        : newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
  }

接下来说一下openInternal方法,其中构造了一个DFSInputStream对象,其中需要关注下openInfo方法中的fetchLocatedBlocksAndGetLastBlockLength方法。
拿到LocatedBlocks之后,需要进行排序,目的是尽可能的排除badnode和计算传输距离来加快读取block的时间。

org.apache.hadoop.hdfs.server.namenode.FSNamesystem#sortLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#sortLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#sortLocatedBlock
org.apache.hadoop.net.NetworkTopology#sortByDistance

本质上就是构造了一个Comparator的实现类,来对一个集合里的元素进行条件复杂的排序,把badnode放在后面,然后再根据网络拓扑的传输距离再排序。
最后通过getBlockLocations的返回值,客户端就可以和数据节点联系,并通过数据节点的流式接口,读取文件内容了。

二、读取数据

这里分析下read方法,HdfsDataInputStream继承于FSDataInputStream继承于DataInputStream继承于FilterInputStream继承于InputStream,所以可以用父类们的所有read方法,在open中其实对返回的DFSInputStream进行了包装,这个包装类就是FSDataInputStream,但是本质上我们还是对DFSInputStream流进行操作,那不管是调用自己的还是父类的read方法,最后本质上都会回到本身。

  @Override
  public synchronized int read() throws IOException {
    if (oneByteBuf == null) {
      oneByteBuf = new byte[1];
    }
    int ret = read(oneByteBuf, 0, 1);
    return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
  }
  @Override
  public synchronized int read(@Nonnull final byte buf[], int off, int len)
      throws IOException {
    validatePositionedReadArgs(pos, buf, off, len);
    if (len == 0) {
      return 0;
    }
    ReaderStrategy byteArrayReader =
        new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
    return readWithStrategy(byteArrayReader);
  }

我们来看下readWithStrategy方法

protected synchronized int readWithStrategy(ReaderStrategy strategy)
      throws IOException {
    dfsClient.checkOpen();
    if (closed.get()) {
      throw new IOException("Stream closed");
    }

    int len = strategy.getTargetLength();
    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
    failures = 0;
    if (pos < getFileLength()) {
      int retries = 2;
      while (retries > 0) {
        try {
          // currentNode can be left as null if previous read had a checksum
          // error on the same block. See HDFS-3067
          if (pos > blockEnd || currentNode == null) {
            currentNode = blockSeekTo(pos);
          }
          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
          synchronized(infoLock) {
            if (locatedBlocks.isLastBlockComplete()) {
              realLen = (int) Math.min(realLen,
                  locatedBlocks.getFileLength() - pos);
            }
          }
          int result = readBuffer(strategy, realLen, corruptedBlocks);

          if (result >= 0) {
            pos += result;
          } else {
            // got a EOS from reader though we expect more data on it.
            throw new IOException("Unexpected EOS from the reader");
          }
          updateReadStatistics(readStatistics, result, blockReader);
          dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
              result);
          if (readStatistics.getBlockType() == BlockType.STRIPED) {
            dfsClient.updateFileSystemECReadStats(result);
          }
          return result;
        } catch (ChecksumException ce) {
          throw ce;
        } catch (IOException e) {
          checkInterrupted(e);
          if (retries == 1) {
            DFSClient.LOG.warn("DFS Read", e);
          }
          blockEnd = -1;
          if (currentNode != null) {
            addToDeadNodes(currentNode);
          }
          if (--retries == 0) {
            throw e;
          }
        } finally {
          // Check if need to report block replicas corruption either read
          // was successful or ChecksumException occurred.
          reportCheckSumFailure(corruptedBlocks,
              getCurrentBlockLocationsLength(), false);
        }
      }
    }
    return -1;
  }

首先看下blockSeekTo,这个方法会在DataNode上打开一个DataInputStream来读取数据,具体实现逻辑在其中的getBlockAt方法,这个方法会从给定位置获取Block(LocatedBlock对象),优先从缓存中查询,如果查不到则会从NameNode中查询。

接着看下blockSeekTo中的chooseDataNode方法,此方法会选择一个合适的DataNode,这里逻辑很简单,遍历副本信息数组,找到第一个不在deadNodes和ignoreNodes的DataNode,如果遍历结束找不到合适的DataNode则会提示没有可用的DataNode包含此block,最后会连接此DataNode并返回一个DNAddrPair对象。

  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
      Collection<DatanodeInfo> ignoredNodes) {
    DatanodeInfo[] nodes = block.getLocations();
    StorageType[] storageTypes = block.getStorageTypes();
    DatanodeInfo chosenNode = null;
    StorageType storageType = null;
    if (nodes != null) {
      for (int i = 0; i < nodes.length; i++) {
        if (!deadNodes.containsKey(nodes[i])
            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
          chosenNode = nodes[i];
          // Storage types are ordered to correspond with nodes, so use the same
          // index to get storage type.
          if (storageTypes != null && i < storageTypes.length) {
            storageType = storageTypes[i];
          }
          break;
        }
      }
    }
    if (chosenNode == null) {
      reportLostBlock(block, ignoredNodes);
      return null;
    }
    final String dnAddr =
        chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
    DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
    InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
    return new DNAddrPair(chosenNode, targetAddr, storageType, block);
  }

接下来会对blockReader赋值,这里需要看一下内部的getBlockReader方法,可以看到是通过build来构造一个BlockReader对象,我们来重点看下build方法的注释。

  blockReader = getBlockReader(targetBlock, offsetIntoBlock,
      targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
      storageType, chosenNode);
  protected BlockReader getBlockReader(LocatedBlock targetBlock,
      long offsetInBlock, long length, InetSocketAddress targetAddr,
      StorageType storageType, DatanodeInfo datanode) throws IOException {
    ExtendedBlock blk = targetBlock.getBlock();
    Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
    CachingStrategy curCachingStrategy;
    boolean shortCircuitForbidden;
    synchronized (infoLock) {
      curCachingStrategy = cachingStrategy;
      shortCircuitForbidden = shortCircuitForbidden();
    }
    return new BlockReaderFactory(dfsClient.getConf()).
        setInetSocketAddress(targetAddr).
        setRemotePeerFactory(dfsClient).
        setDatanodeInfo(datanode).
        setStorageType(storageType).
        setFileName(src).
        setBlock(blk).
        setBlockToken(accessToken).
        setStartOffset(offsetInBlock).
        setVerifyChecksum(verifyChecksum).
        setClientName(dfsClient.clientName).
        setLength(length).
        setCachingStrategy(curCachingStrategy).
        setAllowShortCircuitLocalReads(!shortCircuitForbidden).
        setClientCacheContext(dfsClient.getClientContext()).
        setUserGroupInformation(dfsClient.ugi).
        setConfiguration(dfsClient.getConfiguration()).
        build();
  }
  /**
   * Build a BlockReader with the given options.
   *
   * This function will do the best it can to create a block reader that meets
   * all of our requirements.  We prefer short-circuit block readers
   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
   * former avoid the overhead of socket communication.  If short-circuit is
   * unavailable, our next fallback is data transfer over UNIX domain sockets,
   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
   * work, we will try to create a remote block reader that operates over TCP
   * sockets.
   *
   * There are a few caches that are important here.
   *
   * The ShortCircuitCache stores file descriptor objects which have been passed
   * from the DataNode.
   *
   * The DomainSocketFactory stores information about UNIX domain socket paths
   * that we not been able to use in the past, so that we don't waste time
   * retrying them over and over.  (Like all the caches, it does have a timeout,
   * though.)
   *
   * The PeerCache stores peers that we have used in the past.  If we can reuse
   * one of these peers, we avoid the overhead of re-opening a socket.  However,
   * if the socket has been timed out on the remote end, our attempt to reuse
   * the socket may end with an IOException.  For that reason, we limit our
   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
   * that, we create new sockets.  This avoids the problem where a thread tries
   * to talk to a peer that it hasn't talked to in a while, and has to clean out
   * every entry in a socket cache full of stale entries.
   */

翻译:
用给定的选项构建一个BlockReader。

此函数将尽可能的创建满足我们所有要求的块读取器。我们更偏向使用近路块读取器(BlockReaderLocal和BlockReaderLocalLeacy)而不是远程的方式,因为前者避免了套接字通信的开销。如果近路不可用,我们的下一个备选是通过UNIX域套接字传输数据,如果dfs.client.domain.socket.data.traffic已经启用。如果这不起作用,我们将尝试创建一个在TCP套接字上操作的远程块读取器。

这里有一些重要的缓存。

ShortCircuitCache存储从DataNode传递过来的文件描述符对象。

DomainSocketFactory存储了过去我们无法使用的UNIX域套接字路径的信息,这样我们就不会浪费时间一次又一次地重新尝试它们。(与所有缓存一样,它也有一个超时。)

PeerCache存储了我们过去使用过的对等节点。如果我们可以重用这些对等体中的一个,我们就可以避免重新打开套接字的开销。然而,如果套接字在远端超时,我们尝试重用套接字可能会以IOException结束。出于这个原因,我们将套接字重用的尝试次数限制为dfs.client.cached.conn.retry times。之后,我们创建新的套接字。这避免了这样的问题,即线程试图与一段时间没有通信的对等端通信时,必须先清除套接字缓存中的每个旧条目。

所以带着这些注释我们来看下build方法:

  public BlockReader build() throws IOException {
    Preconditions.checkNotNull(configuration);
    Preconditions
        .checkState(length >= 0, "Length must be set to a non-negative value");
    BlockReader reader = tryToCreateExternalBlockReader();
    if (reader != null) {
      return reader;
    }
    //判断是否开了近路的配置
    final ShortCircuitConf scConf = conf.getShortCircuitConf();
    try {
      if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
        if (clientContext.getUseLegacyBlockReaderLocal()) {
          reader = getLegacyBlockReaderLocal();
          if (reader != null) {
            LOG.trace("{}: returning new legacy block reader local.", this);
            return reader;
          }
        } else {
          reader = getBlockReaderLocal();
          if (reader != null) {
            LOG.trace("{}: returning new block reader local.", this);
            return reader;
          }
        }
      }
      if (scConf.isDomainSocketDataTraffic()) {
        reader = getRemoteBlockReaderFromDomain();
        if (reader != null) {
          LOG.trace("{}: returning new remote block reader using UNIX domain "
              + "socket on {}", this, pathInfo.getPath());
          return reader;
        }
      }
    } catch (IOException e) {
      LOG.debug("Block read failed. Getting remote block reader using TCP", e);
    }
    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
        "TCP reads were disabled for testing, but we failed to " +
        "do a non-TCP read.");
    return getRemoteBlockReaderFromTcp();
  }

可以看到根据配置来决定来选择使用哪种BlockReader。
接下来就是使用BlockReader来读取数据的过程,主要在readWithStrategy中的readBuffer方法

org.apache.hadoop.hdfs.DFSInputStream#readBuffer
org.apache.hadoop.hdfs.ByteArrayStrategy#readFromBlock
org.apache.hadoop.hdfs.client.impl.BlockReaderLocal#read(byte[], int, int)

ps:如果BlockReader选择的不是近路模式的话(Domain和TCP方式),则会使用BlockReaderRemote来构建一个BlockReader对象,其中会通过构建一个Sender对象并调用其readBlock方法来读取,有没有发现和之前写流程中的实现方式一样?这个readBlock方法最终会委托DataXceiver中的readBlock方法,然后构建一个BlockSender方法来send packet。