一、选取合适的Block位置并排序
block位置其实就是指向某一DataNode,所以也可以理解为找到一系列合适的DataNode列表。
//常规使用客户端向HDFS读取数据的代码片段,接下来来以此分析
FSDataInputStream is = dfs.open(new Path("/a/b/c.txt"));
byte[] writeBuf = new byte[1024];
is.read(writeBuf);
这里分为两步,open和read,第一节这里分析下open。
org.apache.hadoop.fs.FileSystem#open
org.apache.hadoop.fs.DistributedFileSystem#open
和写流程一样的doCall方法,我们看一下里面做了什么
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
这里有两个流程,一个是获取Block位置,然后将结果包装到DFSInputStream中,我们先来看第一步。
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException {
checkOpen();
// Get block info from namenode
try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
return openInternal(locatedBlocks, src, verifyChecksum);
}
}
以下是获取Block位置的调用栈,我们直接看下do中关键的方法,其中的逻辑有判断处理还没有提交,客户端正在写数据的数据;判断是否有副本损坏的节点或不可读的副本;从blocksMap中获取DatanodeStorageInfo,最后将包含选择好的machines数组传入LocatedBlock构造函数。
org.apache.hadoop.hdfs.DFSClient#getLocatedBlocks
org.apache.hadoop.hdfs.DFSClient#callGetBlockLocations
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#getBlockLocations
org.apache.hadoop.hdfs.server.namenode.FSNamesystem#getBlockLocations
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp#getBlockLocations
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#createLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#createLocatedBlockList
do {
locatedBlocks.addBlock(
createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& !locatedBlocks.isBlockMax());
private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
final BlockInfo blk, final long pos) throws IOException {
//判断是否有写入的流程在运行
if (!blk.isComplete()) {
final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
if (blk.isStriped()) {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false);
} else {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
return null == locatedBlocks
? newLocatedBlock(eb, storages, pos, false)
: locatedBlocks.newLocatedBlock(eb, storages, pos, false);
}
}
// get block locations
NumberReplicas numReplicas = countNodes(blk);
final int numCorruptNodes = numReplicas.corruptReplicas();
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for {}"
+ " blockMap has {} but corrupt replicas map has {}",
blk, numCorruptNodes, numCorruptReplicas);
}
final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt;
if (blk.isStriped()) {
BlockInfoStriped sblk = (BlockInfoStriped) blk;
isCorrupt = numCorruptReplicas != 0 &&
numReplicas.liveReplicas() < sblk.getRealDataBlockNum();
} else {
isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
}
int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
numMachines -= numReplicas.maintenanceNotForReadReplicas();
DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
int j = 0, i = 0;
if (numMachines > 0) {
final boolean noCorrupt = (numCorruptReplicas == 0);
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
if (storage.getState() != State.FAILED) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
// Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
if (d.isInMaintenance()
|| (d.isEnteringMaintenance() && !d.isAlive())) {
continue;
}
if (noCorrupt) {
machines[j++] = storage;
i = setBlockIndices(blk, blockIndices, i, storage);
} else {
final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
if (isCorrupt || !replicaCorrupt) {
machines[j++] = storage;
i = setBlockIndices(blk, blockIndices, i, storage);
}
}
}
}
}
if(j < machines.length) {
machines = Arrays.copyOf(machines, j);
}
assert j == machines.length :
"isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
return blockIndices == null
? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt)
: locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt)
: newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}
接下来说一下openInternal方法,其中构造了一个DFSInputStream对象,其中需要关注下openInfo方法中的fetchLocatedBlocksAndGetLastBlockLength方法。
拿到LocatedBlocks之后,需要进行排序,目的是尽可能的排除badnode和计算传输距离来加快读取block的时间。
org.apache.hadoop.hdfs.server.namenode.FSNamesystem#sortLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#sortLocatedBlocks
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#sortLocatedBlock
org.apache.hadoop.net.NetworkTopology#sortByDistance
本质上就是构造了一个Comparator的实现类,来对一个集合里的元素进行条件复杂的排序,把badnode放在后面,然后再根据网络拓扑的传输距离再排序。
最后通过getBlockLocations的返回值,客户端就可以和数据节点联系,并通过数据节点的流式接口,读取文件内容了。
二、读取数据
这里分析下read方法,HdfsDataInputStream继承于FSDataInputStream继承于DataInputStream继承于FilterInputStream继承于InputStream,所以可以用父类们的所有read方法,在open中其实对返回的DFSInputStream进行了包装,这个包装类就是FSDataInputStream,但是本质上我们还是对DFSInputStream流进行操作,那不管是调用自己的还是父类的read方法,最后本质上都会回到本身。
@Override
public synchronized int read() throws IOException {
if (oneByteBuf == null) {
oneByteBuf = new byte[1];
}
int ret = read(oneByteBuf, 0, 1);
return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
}
@Override
public synchronized int read(@Nonnull final byte buf[], int off, int len)
throws IOException {
validatePositionedReadArgs(pos, buf, off, len);
if (len == 0) {
return 0;
}
ReaderStrategy byteArrayReader =
new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
return readWithStrategy(byteArrayReader);
}
我们来看下readWithStrategy方法
protected synchronized int readWithStrategy(ReaderStrategy strategy)
throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
int len = strategy.getTargetLength();
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
if (pos > blockEnd || currentNode == null) {
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized(infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
int result = readBuffer(strategy, realLen, corruptedBlocks);
if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
updateReadStatistics(readStatistics, result, blockReader);
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
result);
if (readStatistics.getBlockType() == BlockType.STRIPED) {
dfsClient.updateFileSystemECReadStats(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
checkInterrupted(e);
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNode != null) {
addToDeadNodes(currentNode);
}
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occurred.
reportCheckSumFailure(corruptedBlocks,
getCurrentBlockLocationsLength(), false);
}
}
}
return -1;
}
首先看下blockSeekTo,这个方法会在DataNode上打开一个DataInputStream来读取数据,具体实现逻辑在其中的getBlockAt方法,这个方法会从给定位置获取Block(LocatedBlock对象),优先从缓存中查询,如果查不到则会从NameNode中查询。
接着看下blockSeekTo中的chooseDataNode方法,此方法会选择一个合适的DataNode,这里逻辑很简单,遍历副本信息数组,找到第一个不在deadNodes和ignoreNodes的DataNode,如果遍历结束找不到合适的DataNode则会提示没有可用的DataNode包含此block,最后会连接此DataNode并返回一个DNAddrPair对象。
protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
StorageType storageType = null;
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
// index to get storage type.
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
if (chosenNode == null) {
reportLostBlock(block, ignoredNodes);
return null;
}
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}
接下来会对blockReader赋值,这里需要看一下内部的getBlockReader方法,可以看到是通过build来构造一个BlockReader对象,我们来重点看下build方法的注释。
blockReader = getBlockReader(targetBlock, offsetIntoBlock,
targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
storageType, chosenNode);
protected BlockReader getBlockReader(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode) throws IOException {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
CachingStrategy curCachingStrategy;
boolean shortCircuitForbidden;
synchronized (infoLock) {
curCachingStrategy = cachingStrategy;
shortCircuitForbidden = shortCircuitForbidden();
}
return new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(datanode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetInBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(length).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
}
/**
* Build a BlockReader with the given options.
*
* This function will do the best it can to create a block reader that meets
* all of our requirements. We prefer short-circuit block readers
* (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
* former avoid the overhead of socket communication. If short-circuit is
* unavailable, our next fallback is data transfer over UNIX domain sockets,
* if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't
* work, we will try to create a remote block reader that operates over TCP
* sockets.
*
* There are a few caches that are important here.
*
* The ShortCircuitCache stores file descriptor objects which have been passed
* from the DataNode.
*
* The DomainSocketFactory stores information about UNIX domain socket paths
* that we not been able to use in the past, so that we don't waste time
* retrying them over and over. (Like all the caches, it does have a timeout,
* though.)
*
* The PeerCache stores peers that we have used in the past. If we can reuse
* one of these peers, we avoid the overhead of re-opening a socket. However,
* if the socket has been timed out on the remote end, our attempt to reuse
* the socket may end with an IOException. For that reason, we limit our
* attempts at socket reuse to dfs.client.cached.conn.retry times. After
* that, we create new sockets. This avoids the problem where a thread tries
* to talk to a peer that it hasn't talked to in a while, and has to clean out
* every entry in a socket cache full of stale entries.
*/
翻译:
用给定的选项构建一个BlockReader。
此函数将尽可能的创建满足我们所有要求的块读取器。我们更偏向使用近路块读取器(BlockReaderLocal和BlockReaderLocalLeacy)而不是远程的方式,因为前者避免了套接字通信的开销。如果近路不可用,我们的下一个备选是通过UNIX域套接字传输数据,如果dfs.client.domain.socket.data.traffic已经启用。如果这不起作用,我们将尝试创建一个在TCP套接字上操作的远程块读取器。
这里有一些重要的缓存。
ShortCircuitCache存储从DataNode传递过来的文件描述符对象。
DomainSocketFactory存储了过去我们无法使用的UNIX域套接字路径的信息,这样我们就不会浪费时间一次又一次地重新尝试它们。(与所有缓存一样,它也有一个超时。)
PeerCache存储了我们过去使用过的对等节点。如果我们可以重用这些对等体中的一个,我们就可以避免重新打开套接字的开销。然而,如果套接字在远端超时,我们尝试重用套接字可能会以IOException结束。出于这个原因,我们将套接字重用的尝试次数限制为dfs.client.cached.conn.retry times。之后,我们创建新的套接字。这避免了这样的问题,即线程试图与一段时间没有通信的对等端通信时,必须先清除套接字缓存中的每个旧条目。
所以带着这些注释我们来看下build方法:
public BlockReader build() throws IOException {
Preconditions.checkNotNull(configuration);
Preconditions
.checkState(length >= 0, "Length must be set to a non-negative value");
BlockReader reader = tryToCreateExternalBlockReader();
if (reader != null) {
return reader;
}
//判断是否开了近路的配置
final ShortCircuitConf scConf = conf.getShortCircuitConf();
try {
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal();
if (reader != null) {
LOG.trace("{}: returning new legacy block reader local.", this);
return reader;
}
} else {
reader = getBlockReaderLocal();
if (reader != null) {
LOG.trace("{}: returning new block reader local.", this);
return reader;
}
}
}
if (scConf.isDomainSocketDataTraffic()) {
reader = getRemoteBlockReaderFromDomain();
if (reader != null) {
LOG.trace("{}: returning new remote block reader using UNIX domain "
+ "socket on {}", this, pathInfo.getPath());
return reader;
}
}
} catch (IOException e) {
LOG.debug("Block read failed. Getting remote block reader using TCP", e);
}
Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
"TCP reads were disabled for testing, but we failed to " +
"do a non-TCP read.");
return getRemoteBlockReaderFromTcp();
}
可以看到根据配置来决定来选择使用哪种BlockReader。
接下来就是使用BlockReader来读取数据的过程,主要在readWithStrategy中的readBuffer方法
org.apache.hadoop.hdfs.DFSInputStream#readBuffer
org.apache.hadoop.hdfs.ByteArrayStrategy#readFromBlock
org.apache.hadoop.hdfs.client.impl.BlockReaderLocal#read(byte[], int, int)
ps:如果BlockReader选择的不是近路模式的话(Domain和TCP方式),则会使用BlockReaderRemote来构建一个BlockReader对象,其中会通过构建一个Sender对象并调用其readBlock方法来读取,有没有发现和之前写流程中的实现方式一样?这个readBlock方法最终会委托DataXceiver中的readBlock方法,然后构建一个BlockSender方法来send packet。