一、背景
Hadoop 社区为了解决 HDFS 横向扩展的问题,早前的版本中实现了基于 ViewFs 的 Federation 架构,而在最新的 Hadoop 版本中,社区又实现了基于 Router 的 Federation架构,并且在这个架构之上还实现了许多增强集群管理能力的特性。Router 将挂载表从 Client 中抽离了出来,解决了挂载表不一致的问题。
Federation 架构是指由多个子集群联合构成一个 Federation 集群,通常的做法是这些子集群会共享 Datanode。然后由挂载表来维护Federation Namespace 到子集群 Namespace 间的映射关系,这个挂载表存在客户端本地的配置文件或Zookeeper里,由客户端来解析从而访问到正确的子集群。
二、DFSRouter启动流程
DFSRouter的入口方法如下所示,可以看到新建了一个Router对象,并且依次调用了init和start方法,所以我们看下这两个方法都干了什么。
public static void main(String[] argv) {
if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(Router.class, argv, LOG);
Router router = new Router();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY);
Configuration conf = new HdfsConfiguration();
router.init(conf);
router.start();
} catch (Throwable e) {
LOG.error("Failed to start router", e);
terminate(1, e);
}
}
在init流程中又嵌套初始化了很多service,这里我们着重关注如下几个:
- StateStoreService
这个类主要功能是维护Router状态,这里放到第六节来讲。 - ActiveNameNodeResolver
用来获取NS的active namenode,其实现类MembershipNamenodeResolver通过与StateStoreService通信获取Active NN并维护一个本地缓存以减少对statestore的访问。 - FileSubClusterResolver
把一个全局名字空间映射到HDFS子集群的名字空间,这里放在第五节讲。 - RouterRpcServer
一个用来代理客户端请求到Active NameNode的RPC服务,这里放在第三节讲。
start方法中的逻辑也很简单,就是先更新一下Router的状态,然后加个JVM指标监控
三、DFSRouter中的RPC
在上一节Router初始化中创建了RouterRpcServer,我们来看下内部重要的部分:
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
...
this.rpcServer = new RPC.Builder(this.conf)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(confRpcAddress.getHostName())
.setPort(confRpcAddress.getPort())
.setNumHandlers(handlerCount)
.setnumReaders(readerCount)
.setQueueSizePerHandler(handlerQueueSize)
.setVerbose(false)
.setAlignmentContext(new RouterStateIdContext())
.setSecretManager(this.securityManager.getSecretManager())
.build();
...
// Create the client
this.rpcClient = new RouterRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor);
可以看到开始初始化了一个ClientNamenodeProtocolServerSideTranslatorPB对象,这里的作用就是接收从DFSClient发过来的RPC请求,其中的server被赋值为RouterRpcServer,关于HDFS中的RPC可以看下这篇文章彻底熟悉Hadoop RPC框架。build方法会将RouterRpcServer注册进来(registerProtocolAndImpl方法),然后再初始化一个RouterRpcClient对象,此对象用来作为访问NameNode的客户端角色。
当操作DistributeFileSystem时,实际RPC会到达RouterRpcServer,RouterRpcServer会委托RouterClientProtocol执行,RouterClientProtocol会将global路径根据MountTable解析成具体NS下的路径(rpcServer.getLocationsForPath)方法,解析部分会放在后面讲。获取到locations后,RouterRpcClient再根据挂载的策略和具体RPC类型,决定采用invokeSingle还是invokeAll,invokeSequential或者invokeConcurrent方式向NameNode发起请求,这几种invoke最终都会调用invokeMethod。
随便拿一个mkdir请求举例:
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
RemoteMethod method = new RemoteMethod("mkdirs",
new Class<?>[] {String.class, FsPermission.class, boolean.class},
new RemoteParam(), masked, createParent);
// Create in all locations
if (isPathAll(src)) {
boolean success = false;
try {
success = rpcClient.invokeAll(locations, method);
} catch (AccessControlException e){
logAuditEvent(success, "mkdirs", src);
throw e;
}
logAuditEvent(success, "mkdirs", src);
return success;
}
if (locations.size() > 1) {
// Check if this directory already exists
try {
HdfsFileStatus fileStatus = getFileInfo(src);
if (fileStatus != null) {
// When existing, the NN doesn't return an exception; return true
return true;
}
} catch (IOException ioe) {
// Can't query if this file exists or not.
LOG.error("Error requesting file info for path {} while proxing mkdirs",
src, ioe);
}
}
RemoteLocation firstLocation = locations.get(0);
boolean success = false;
try {
success = (boolean) rpcClient.invokeSingle(firstLocation, method);
} catch (IOException e){
logAuditEvent(success, "mkdirs", src);
throw e;
}
logAuditEvent(success, "mkdirs", src);
return success;
}
那么在最终的invokeMethod方法中会使用第四节的ConnectionManager来获取连接。
四、RouterRpcClient与NameNode的连接管理
在上一节初始化RouterRpcClient时,其内部会初始一个ConnectionManager对象,其作用是连接池管理器,维护user到NN的连接池:Map
ConnectionManager其中有两个线程类,ConnectionCreator与CleanupTask,那我们首先看下ConnectionCreator中的run方法:
public void run() {
while (this.running) {
try {
ConnectionPool pool = this.queue.take();
try {
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
if (pool.getNumConnections() < pool.getMaxSize() &&
active >= MIN_ACTIVE_RATIO * total) {
ConnectionContext conn = pool.newConnection();
pool.addConnection(conn);
} else {
LOG.debug("Cannot add more than {} connections to {}",
pool.getMaxSize(), pool);
}
} catch (IOException e) {
LOG.error("Cannot create a new connection", e);
}
} catch (InterruptedException e) {
LOG.error("The connection creator was interrupted");
this.running = false;
}
}
}
这里会从队列中拿到一个ConnectionPool对象,判断在其之上还能否建立新的连接。
public void run() {
long currentTime = Time.now();
List<ConnectionPoolId> toRemove = new LinkedList<>();
// Look for stale pools
readLock.lock();
try {
for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
ConnectionPool pool = entry.getValue();
long lastTimeActive = pool.getLastActiveTime();
boolean isStale =
currentTime > (lastTimeActive + poolCleanupPeriodMs);
if (lastTimeActive > 0 && isStale) {
// Remove this pool
LOG.debug("Closing and removing stale pool {}", pool);
pool.close();
ConnectionPoolId poolId = entry.getKey();
toRemove.add(poolId);
} else {
// Keep this pool but clean connections inside
LOG.debug("Cleaning up {}", pool);
cleanup(pool);
}
}
} finally {
readLock.unlock();
}
// Remove stale pools
if (!toRemove.isEmpty()) {
writeLock.lock();
try {
for (ConnectionPoolId poolId : toRemove) {
pools.remove(poolId);
}
} finally {
writeLock.unlock();
}
}
}
}
CleanupTask中的run方法会对pools过期的连接进行清理。
ConnectionPool代表一个用户到NN连接的连接池实例,一个ConnectionPool内有多个ConnectionContext。值得注意的是,连接池和连接都是“懒加载”的,只有当有用户RPC请求getConnection时,才会创建一个pool,并且超时后会自动销毁。连接池的作用也降低了Router作为RPC转发的性能损耗,不需要每次RPC请求时都实时创建rpc连接。可以预见,在持续稳定的运行生产环境中,Router对于NameNode RPC吞吐的性能损耗很微小。
ConnectionContext代表一个实际的RPC连接,当一个client使用一个连接,numThreads会++,表示该连接是active的,不能被复用;当连接用完后,numThreads会–。
接第四节的获取连接流程,这里会从pools中查找键为由ugi,nnAddress,protocol组成的对应的ConnectionPool,如果找不到则会创建一个ConnectionPool对象放入pools中,此过程会新建一个ConnectionContext以供使用。
connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();
ret = invoke(nsId, 0, method, proxy, params);
五、路径解析
前文提到过,在每一个RPC请求都会有类似的流程,来去获取映射的路径,那么其实内部是用subclusterResolver来去处理的,前文提到过其实现类为MultipleDestinationMountTableResolver。
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
MultipleDestinationMountTableResolver中的getDestinationForPath方法
public PathLocation getDestinationForPath(String path) throws IOException {
PathLocation mountTableResult = super.getDestinationForPath(path);
if (mountTableResult == null) {
LOG.error("The {} cannot find a location for {}",
super.getClass().getSimpleName(), path);
} else if (mountTableResult.hasMultipleDestinations()) {
DestinationOrder order = mountTableResult.getDestinationOrder();
OrderedResolver orderedResolver = orderedResolvers.get(order);
if (DestinationOrder.FIXED == order) {
String firstNamespace = mountTableResult.getFixedFirstNamespace();
mountTableResult = new PathLocation(mountTableResult, firstNamespace);
} else if (orderedResolver == null) {
LOG.error("Cannot find resolver for order {}", order);
} else {
String firstNamespace =
orderedResolver.getFirstNamespace(path, mountTableResult);
// Change the order of the name spaces according to the policy
if (firstNamespace != null) {
// This is the entity in the tree, we need to create our own copy
mountTableResult = new PathLocation(mountTableResult, firstNamespace);
LOG.debug("Ordered locations following {} are {}",
order, mountTableResult);
} else {
LOG.error("Cannot get main namespace for path {} with order {}",
path, order);
}
}
}
return mountTableResult;
}
① 首先调用父类MountTableResolver的同名方法的实现,此时得到了一个PathLocation。
② 上一步得到的PathLocation如果是null,打印些log error信息。如果PathLocation是多目的地的,就进入else if (mountTableResult.hasMultipleDestinations())`分支
③ 根据DestinationOrder对象得到对应的resolver对象。
④ 根据resolver的getFirstNamespace方法得到第一个命名空间的名字(String类型),这个方法的逻辑需要你使用的具体resolver的类去实现。一会儿我们再找一个Resolver的具体实现类分析,现在我们假设,emmm,没错我们假设得到了firstNamespace。
⑤就把firstNamespace当作参数传给PathLocation的构造函数,构造函数会根据firstNamespace对传入的PathLocation对象mountTableResult排序。
⑥返回mountTableResult对象(无论是排序过的或者是没排序过的)
对于①中的父类方法,其内部逻辑是先从locationCache中查找,如果查不到就使用lookupLocation来找到最深的路径(findDeepest)。
对于④中getFirstNamespace方法,这个方法的目标是对于给定的path,以及这个path对应的PathLocation。按照具体的实现规则从PathLocation中找到第一个要处理namespace名称。
六、DFSRouter的状态管理
第一节中DFSRouter嵌套启动了StateStoreService,在StateStoreService又嵌套启动了如下服务:
- StateStoreDriver
用来操作StateStore实体 - StateStoreConnectionMonitorService
用来周期性的检查Driver是否处于连接状态 - StateStoreCacheUpdateService
用来周期性的更新缓存
后两个服务继承于PeriodService,说明是需要周期性运行,观察其中的periodInvoke方法可以得知其作用。
StateStoreDriver实现类为StateStoreZooKeeperImpl,用zookeeper来保存相应的StateStore,具体目录为(目录初始化过程在StateStoreDirver中的init方法):
* PARENT_PATH
* |--- MOUNT
* |--- MEMBERSHIP
* |--- REBALANCER
* |--- ROUTERS
同时在StateStoreService初始化时,会将RecordStore加入到StateStore
// Add supported record stores
addRecordStore(MembershipStoreImpl.class);
addRecordStore(MountTableStoreImpl.class);
addRecordStore(RouterStoreImpl.class);
addRecordStore(DisabledNameserviceStoreImpl.class);
还需要关注的是这三个变量:
/** Supported record stores. */
private final Map<
Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
recordStores;
/** List of internal caches to update. */
private final List<StateStoreCache> cachesToUpdateInternal;
/** List of external caches to update. */
private final List<StateStoreCache> cachesToUpdateExternal;
recordStores中,MembershipStateStore -> MembershipStoreImpl,MountTable -> MountTableStoreImpl,RouterStore -> RouterStoreImpl,DisableNameserviceStore -> DisableNameserviceStoreImpl。
由addRecordStore方法可知,其会把StateStoreCache的实现类加入到cachesToUpdateInternal,其中只有MembershipStateStoreImpl。
在DFSRouter初始化过程中,我们提到过ActiveNamenodeResolver和FileSubclusterResolver,其实现类分别为MembershipNamenodeResolver和MountTableResolver,两者在通过反射构造时会把自身加入到cachesToUpdateExternal。
那么我们铺垫完了,来看下StateStoreCacheUpdateService中的refreshCaches方法
/**
* Refresh the cache with information from the State Store. Called
* periodically by the CacheUpdateService to maintain data caches and
* versions.
* @param force If we force the refresh.
*/
public void refreshCaches(boolean force) {
boolean success = true;
if (isDriverReady()) {
List<StateStoreCache> cachesToUpdate = new LinkedList<>();
cachesToUpdate.addAll(cachesToUpdateInternal);
cachesToUpdate.addAll(cachesToUpdateExternal);
for (StateStoreCache cachedStore : cachesToUpdate) {
String cacheName = cachedStore.getClass().getSimpleName();
boolean result = false;
try {
result = cachedStore.loadCache(force);
} catch (IOException e) {
LOG.error("Error updating cache for {}", cacheName, e);
result = false;
}
if (!result) {
success = false;
LOG.error("Cache update failed for cache {}", cacheName);
}
}
} else {
success = false;
LOG.info("Skipping State Store cache update, driver is not ready.");
}
if (success) {
// Uses local time, not driver time.
this.cacheLastUpdateTime = Time.now();
}
}
可以看到其把内部缓存和外部缓存都加入到一个List中统一做更新,那我们通过先前的流程知道这个List有三个成员,分别是MembershipNamenodeResolver,MountTableResolver和MembershipStateStoreImpl,所以分别看下其loadCache方法:
MembershipNamenodeResolver中的
public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first
try {
MembershipStore membership = getMembershipStore();
membership.loadCache(force);
DisabledNameserviceStore disabled = getDisabledNameserviceStore();
disabled.loadCache(force);
} catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e);
}
// Force refresh of active NN cache
cacheBP.clear();
cacheNS.clear();
observerFirstCacheNS.clear();
return true;
}
MountTableResolver中的
public boolean loadCache(boolean force) {
try {
// Our cache depends on the store, update it first
MountTableStore mountTable = this.getMountTableStore();
mountTable.loadCache(force);
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
List<MountTable> records = response.getEntries();
refreshEntries(records);
} catch (IOException e) {
LOG.error("Cannot fetch mount table entries from State Store", e);
return false;
}
return true;
}
那么在Router启动中还有一个叫做RouterHeartbeatService,它主要用来周期性的更新Router的状态,它的periodInvoke方法只有一个步骤updateStateStore()。那么来看下这个方法:
synchronized void updateStateStore() {
String routerId = router.getRouterId();
if (routerId == null) {
LOG.error("Cannot heartbeat for router: unknown router id");
return;
}
if (isStoreAvailable()) {
//这里获取到的RouterStore是RouterStoreImpl
RouterStore routerStore = router.getRouterStateManager();
try {
RouterState record = RouterState.newInstance(
routerId, router.getStartTime(), router.getRouterState());
StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
getStateStoreVersion(MembershipStore.class),
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
// if admin server not started then hostPort will be empty
String hostPort =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
record.setAdminAddress(hostPort);
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
//通过StateStoreDriver来更新Router State,实际是将Record put进了ZK中
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
if (!response.getStatus()) {
LOG.warn("Cannot heartbeat router {}", routerId);
} else {
LOG.debug("Router heartbeat for router {}", routerId);
}
} catch (IOException e) {
LOG.error("Cannot heartbeat router {}", routerId, e);
}
} else {
LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
}
}
OK,到这里我们就知道ZK下的4个目录到底是如何是被谁操作的了。
ps:在Router初始化时启动了NamenodeHeartbeatService(可选),目的是是为了保证Router与Active NN通信