Administrator
Published on 2023-10-09 / 6 Visits
0

Router-based Federation

一、背景

Hadoop 社区为了解决 HDFS 横向扩展的问题,早前的版本中实现了基于 ViewFs 的 Federation 架构,而在最新的 Hadoop 版本中,社区又实现了基于 Router 的 Federation架构,并且在这个架构之上还实现了许多增强集群管理能力的特性。Router 将挂载表从 Client 中抽离了出来,解决了挂载表不一致的问题。

Federation 架构是指由多个子集群联合构成一个 Federation 集群,通常的做法是这些子集群会共享 Datanode。然后由挂载表来维护Federation Namespace 到子集群 Namespace 间的映射关系,这个挂载表存在客户端本地的配置文件或Zookeeper里,由客户端来解析从而访问到正确的子集群。

二、DFSRouter启动流程

DFSRouter的入口方法如下所示,可以看到新建了一个Router对象,并且依次调用了init和start方法,所以我们看下这两个方法都干了什么。

  public static void main(String[] argv) {
    if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(Router.class, argv, LOG);

      Router router = new Router();

      ShutdownHookManager.get().addShutdownHook(
          new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY);

      Configuration conf = new HdfsConfiguration();
      router.init(conf);
      router.start();
    } catch (Throwable e) {
      LOG.error("Failed to start router", e);
      terminate(1, e);
    }
  }

在init流程中又嵌套初始化了很多service,这里我们着重关注如下几个:

  • StateStoreService
    这个类主要功能是维护Router状态,这里放到第六节来讲。
  • ActiveNameNodeResolver
    用来获取NS的active namenode,其实现类MembershipNamenodeResolver通过与StateStoreService通信获取Active NN并维护一个本地缓存以减少对statestore的访问。
  • FileSubClusterResolver
    把一个全局名字空间映射到HDFS子集群的名字空间,这里放在第五节讲。
  • RouterRpcServer
    一个用来代理客户端请求到Active NameNode的RPC服务,这里放在第三节讲。

start方法中的逻辑也很简单,就是先更新一下Router的状态,然后加个JVM指标监控

三、DFSRouter中的RPC

在上一节Router初始化中创建了RouterRpcServer,我们来看下内部重要的部分:

    ClientNamenodeProtocolServerSideTranslatorPB
        clientProtocolServerTranslator =
            new ClientNamenodeProtocolServerSideTranslatorPB(this);
    ...        
    this.rpcServer = new RPC.Builder(this.conf)
        .setProtocol(ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(confRpcAddress.getHostName())
        .setPort(confRpcAddress.getPort())
        .setNumHandlers(handlerCount)
        .setnumReaders(readerCount)
        .setQueueSizePerHandler(handlerQueueSize)
        .setVerbose(false)
        .setAlignmentContext(new RouterStateIdContext())
        .setSecretManager(this.securityManager.getSecretManager())
        .build();
    ...
    // Create the client
    this.rpcClient = new RouterRpcClient(this.conf, this.router,
        this.namenodeResolver, this.rpcMonitor);

可以看到开始初始化了一个ClientNamenodeProtocolServerSideTranslatorPB对象,这里的作用就是接收从DFSClient发过来的RPC请求,其中的server被赋值为RouterRpcServer,关于HDFS中的RPC可以看下这篇文章彻底熟悉Hadoop RPC框架。build方法会将RouterRpcServer注册进来(registerProtocolAndImpl方法),然后再初始化一个RouterRpcClient对象,此对象用来作为访问NameNode的客户端角色。

当操作DistributeFileSystem时,实际RPC会到达RouterRpcServer,RouterRpcServer会委托RouterClientProtocol执行,RouterClientProtocol会将global路径根据MountTable解析成具体NS下的路径(rpcServer.getLocationsForPath)方法,解析部分会放在后面讲。获取到locations后,RouterRpcClient再根据挂载的策略和具体RPC类型,决定采用invokeSingle还是invokeAll,invokeSequential或者invokeConcurrent方式向NameNode发起请求,这几种invoke最终都会调用invokeMethod。

随便拿一个mkdir请求举例:

  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
      throws IOException {
    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

    final List<RemoteLocation> locations =
        rpcServer.getLocationsForPath(src, true);
    RemoteMethod method = new RemoteMethod("mkdirs",
        new Class<?>[] {String.class, FsPermission.class, boolean.class},
        new RemoteParam(), masked, createParent);

    // Create in all locations
    if (isPathAll(src)) {
      boolean success = false;
      try {
        success = rpcClient.invokeAll(locations, method);
      } catch (AccessControlException e){
        logAuditEvent(success, "mkdirs", src);
        throw e;
      }
      logAuditEvent(success, "mkdirs", src);
      return success;
    }

    if (locations.size() > 1) {
      // Check if this directory already exists
      try {
        HdfsFileStatus fileStatus = getFileInfo(src);
        if (fileStatus != null) {
          // When existing, the NN doesn't return an exception; return true
          return true;
        }
      } catch (IOException ioe) {
        // Can't query if this file exists or not.
        LOG.error("Error requesting file info for path {} while proxing mkdirs",
            src, ioe);
      }
    }

    RemoteLocation firstLocation = locations.get(0);
    boolean success = false;
    try {
      success = (boolean) rpcClient.invokeSingle(firstLocation, method);
    } catch (IOException e){
      logAuditEvent(success, "mkdirs", src);
      throw e;
    }
    logAuditEvent(success, "mkdirs", src);
    return success;
  }

那么在最终的invokeMethod方法中会使用第四节的ConnectionManager来获取连接。

四、RouterRpcClient与NameNode的连接管理

在上一节初始化RouterRpcClient时,其内部会初始一个ConnectionManager对象,其作用是连接池管理器,维护user到NN的连接池:Map pools,每个user到一个NN的连接都在一个pool内。每个连接池的大小(单个pool内最大连接数)可配置:dfs.federation.router.connection.pool-size,默认值64。

ConnectionManager其中有两个线程类,ConnectionCreator与CleanupTask,那我们首先看下ConnectionCreator中的run方法:

  public void run() {
    while (this.running) {
      try {
        ConnectionPool pool = this.queue.take();
        try {
          int total = pool.getNumConnections();
          int active = pool.getNumActiveConnections();
          if (pool.getNumConnections() < pool.getMaxSize() &&
              active >= MIN_ACTIVE_RATIO * total) {
            ConnectionContext conn = pool.newConnection();
            pool.addConnection(conn);
          } else {
            LOG.debug("Cannot add more than {} connections to {}",
                pool.getMaxSize(), pool);
          }
        } catch (IOException e) {
          LOG.error("Cannot create a new connection", e);
        }
      } catch (InterruptedException e) {
        LOG.error("The connection creator was interrupted");
        this.running = false;
      }
    }
  }

这里会从队列中拿到一个ConnectionPool对象,判断在其之上还能否建立新的连接。

 public void run() {
      long currentTime = Time.now();
      List<ConnectionPoolId> toRemove = new LinkedList<>();

      // Look for stale pools
      readLock.lock();
      try {
        for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
          ConnectionPool pool = entry.getValue();
          long lastTimeActive = pool.getLastActiveTime();
          boolean isStale =
              currentTime > (lastTimeActive + poolCleanupPeriodMs);
          if (lastTimeActive > 0 && isStale) {
            // Remove this pool
            LOG.debug("Closing and removing stale pool {}", pool);
            pool.close();
            ConnectionPoolId poolId = entry.getKey();
            toRemove.add(poolId);
          } else {
            // Keep this pool but clean connections inside
            LOG.debug("Cleaning up {}", pool);
            cleanup(pool);
          }
        }
      } finally {
        readLock.unlock();
      }

      // Remove stale pools
      if (!toRemove.isEmpty()) {
        writeLock.lock();
        try {
          for (ConnectionPoolId poolId : toRemove) {
            pools.remove(poolId);
          }
        } finally {
          writeLock.unlock();
        }
      }
    }
  }

CleanupTask中的run方法会对pools过期的连接进行清理。

ConnectionPool代表一个用户到NN连接的连接池实例,一个ConnectionPool内有多个ConnectionContext。值得注意的是,连接池和连接都是“懒加载”的,只有当有用户RPC请求getConnection时,才会创建一个pool,并且超时后会自动销毁。连接池的作用也降低了Router作为RPC转发的性能损耗,不需要每次RPC请求时都实时创建rpc连接。可以预见,在持续稳定的运行生产环境中,Router对于NameNode RPC吞吐的性能损耗很微小。

ConnectionContext代表一个实际的RPC连接,当一个client使用一个连接,numThreads会++,表示该连接是active的,不能被复用;当连接用完后,numThreads会–。

接第四节的获取连接流程,这里会从pools中查找键为由ugi,nnAddress,protocol组成的对应的ConnectionPool,如果找不到则会创建一个ConnectionPool对象放入pools中,此过程会新建一个ConnectionContext以供使用。

    connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
    ProxyAndInfo<?> client = connection.getClient();
    final Object proxy = client.getProxy();

    ret = invoke(nsId, 0, method, proxy, params);

五、路径解析

前文提到过,在每一个RPC请求都会有类似的流程,来去获取映射的路径,那么其实内部是用subclusterResolver来去处理的,前文提到过其实现类为MultipleDestinationMountTableResolver。

final List<RemoteLocation> locations =
        rpcServer.getLocationsForPath(src, true);

MultipleDestinationMountTableResolver中的getDestinationForPath方法

  public PathLocation getDestinationForPath(String path) throws IOException {
    PathLocation mountTableResult = super.getDestinationForPath(path);
    if (mountTableResult == null) {
      LOG.error("The {} cannot find a location for {}",
          super.getClass().getSimpleName(), path);
    } else if (mountTableResult.hasMultipleDestinations()) {
      DestinationOrder order = mountTableResult.getDestinationOrder();
      OrderedResolver orderedResolver = orderedResolvers.get(order);
      if (DestinationOrder.FIXED == order) {
        String firstNamespace = mountTableResult.getFixedFirstNamespace();
        mountTableResult = new PathLocation(mountTableResult, firstNamespace);
      } else if (orderedResolver == null) {
        LOG.error("Cannot find resolver for order {}", order);
      } else {
        String firstNamespace =
            orderedResolver.getFirstNamespace(path, mountTableResult);

        // Change the order of the name spaces according to the policy
        if (firstNamespace != null) {
          // This is the entity in the tree, we need to create our own copy
          mountTableResult = new PathLocation(mountTableResult, firstNamespace);
          LOG.debug("Ordered locations following {} are {}",
              order, mountTableResult);
        } else {
          LOG.error("Cannot get main namespace for path {} with order {}",
              path, order);
        }
      }
    }
    return mountTableResult;
  }

① 首先调用父类MountTableResolver的同名方法的实现,此时得到了一个PathLocation。
② 上一步得到的PathLocation如果是null,打印些log error信息。如果PathLocation是多目的地的,就进入else if (mountTableResult.hasMultipleDestinations())`分支
③ 根据DestinationOrder对象得到对应的resolver对象。
④ 根据resolver的getFirstNamespace方法得到第一个命名空间的名字(String类型),这个方法的逻辑需要你使用的具体resolver的类去实现。一会儿我们再找一个Resolver的具体实现类分析,现在我们假设,emmm,没错我们假设得到了firstNamespace。
⑤就把firstNamespace当作参数传给PathLocation的构造函数,构造函数会根据firstNamespace对传入的PathLocation对象mountTableResult排序。
⑥返回mountTableResult对象(无论是排序过的或者是没排序过的)
对于①中的父类方法,其内部逻辑是先从locationCache中查找,如果查不到就使用lookupLocation来找到最深的路径(findDeepest)。
对于④中getFirstNamespace方法,这个方法的目标是对于给定的path,以及这个path对应的PathLocation。按照具体的实现规则从PathLocation中找到第一个要处理namespace名称。

六、DFSRouter的状态管理

第一节中DFSRouter嵌套启动了StateStoreService,在StateStoreService又嵌套启动了如下服务:

  • StateStoreDriver
    用来操作StateStore实体
  • StateStoreConnectionMonitorService
    用来周期性的检查Driver是否处于连接状态
  • StateStoreCacheUpdateService
    用来周期性的更新缓存

后两个服务继承于PeriodService,说明是需要周期性运行,观察其中的periodInvoke方法可以得知其作用。
StateStoreDriver实现类为StateStoreZooKeeperImpl,用zookeeper来保存相应的StateStore,具体目录为(目录初始化过程在StateStoreDirver中的init方法):

 * PARENT_PATH
 * |--- MOUNT
 * |--- MEMBERSHIP
 * |--- REBALANCER
 * |--- ROUTERS

同时在StateStoreService初始化时,会将RecordStore加入到StateStore

  // Add supported record stores
  addRecordStore(MembershipStoreImpl.class);
  addRecordStore(MountTableStoreImpl.class);
  addRecordStore(RouterStoreImpl.class);
  addRecordStore(DisabledNameserviceStoreImpl.class);

还需要关注的是这三个变量:

  /** Supported record stores. */
  private final Map<
      Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
          recordStores;

  /** List of internal caches to update. */
  private final List<StateStoreCache> cachesToUpdateInternal;

  /** List of external caches to update. */
  private final List<StateStoreCache> cachesToUpdateExternal;

recordStores中,MembershipStateStore -> MembershipStoreImpl,MountTable -> MountTableStoreImpl,RouterStore -> RouterStoreImpl,DisableNameserviceStore -> DisableNameserviceStoreImpl。

由addRecordStore方法可知,其会把StateStoreCache的实现类加入到cachesToUpdateInternal,其中只有MembershipStateStoreImpl。

在DFSRouter初始化过程中,我们提到过ActiveNamenodeResolver和FileSubclusterResolver,其实现类分别为MembershipNamenodeResolver和MountTableResolver,两者在通过反射构造时会把自身加入到cachesToUpdateExternal。

那么我们铺垫完了,来看下StateStoreCacheUpdateService中的refreshCaches方法

  /**
   * Refresh the cache with information from the State Store. Called
   * periodically by the CacheUpdateService to maintain data caches and
   * versions.
   * @param force If we force the refresh.
   */
  public void refreshCaches(boolean force) {
    boolean success = true;
    if (isDriverReady()) {
      List<StateStoreCache> cachesToUpdate = new LinkedList<>();
      cachesToUpdate.addAll(cachesToUpdateInternal);
      cachesToUpdate.addAll(cachesToUpdateExternal);
      for (StateStoreCache cachedStore : cachesToUpdate) {
        String cacheName = cachedStore.getClass().getSimpleName();
        boolean result = false;
        try {
          result = cachedStore.loadCache(force);
        } catch (IOException e) {
          LOG.error("Error updating cache for {}", cacheName, e);
          result = false;
        }
        if (!result) {
          success = false;
          LOG.error("Cache update failed for cache {}", cacheName);
        }
      }
    } else {
      success = false;
      LOG.info("Skipping State Store cache update, driver is not ready.");
    }
    if (success) {
      // Uses local time, not driver time.
      this.cacheLastUpdateTime = Time.now();
    }
  }

可以看到其把内部缓存和外部缓存都加入到一个List中统一做更新,那我们通过先前的流程知道这个List有三个成员,分别是MembershipNamenodeResolver,MountTableResolver和MembershipStateStoreImpl,所以分别看下其loadCache方法:
MembershipNamenodeResolver中的

  public boolean loadCache(boolean force) {
    // Our cache depends on the store, update it first
    try {
      MembershipStore membership = getMembershipStore();
      membership.loadCache(force);
      DisabledNameserviceStore disabled = getDisabledNameserviceStore();
      disabled.loadCache(force);
    } catch (IOException e) {
      LOG.error("Cannot update membership from the State Store", e);
    }

    // Force refresh of active NN cache
    cacheBP.clear();
    cacheNS.clear();
    observerFirstCacheNS.clear();
    return true;
  }

MountTableResolver中的

  public boolean loadCache(boolean force) {
    try {
      // Our cache depends on the store, update it first
      MountTableStore mountTable = this.getMountTableStore();
      mountTable.loadCache(force);

      GetMountTableEntriesRequest request =
          GetMountTableEntriesRequest.newInstance("/");
      GetMountTableEntriesResponse response =
          mountTable.getMountTableEntries(request);
      List<MountTable> records = response.getEntries();
      refreshEntries(records);
    } catch (IOException e) {
      LOG.error("Cannot fetch mount table entries from State Store", e);
      return false;
    }
    return true;
  }

那么在Router启动中还有一个叫做RouterHeartbeatService,它主要用来周期性的更新Router的状态,它的periodInvoke方法只有一个步骤updateStateStore()。那么来看下这个方法:

  synchronized void updateStateStore() {
    String routerId = router.getRouterId();
    if (routerId == null) {
      LOG.error("Cannot heartbeat for router: unknown router id");
      return;
    }
    if (isStoreAvailable()) {
      //这里获取到的RouterStore是RouterStoreImpl
      RouterStore routerStore = router.getRouterStateManager();
      try {
        RouterState record = RouterState.newInstance(
            routerId, router.getStartTime(), router.getRouterState());
        StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
            getStateStoreVersion(MembershipStore.class),
            getStateStoreVersion(MountTableStore.class));
        record.setStateStoreVersion(stateStoreVersion);
        // if admin server not started then hostPort will be empty
        String hostPort =
                StateStoreUtils.getHostPortString(router.getAdminServerAddress());
        record.setAdminAddress(hostPort);
        RouterHeartbeatRequest request =
            RouterHeartbeatRequest.newInstance(record);
        //通过StateStoreDriver来更新Router State,实际是将Record put进了ZK中
        RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
        if (!response.getStatus()) {
          LOG.warn("Cannot heartbeat router {}", routerId);
        } else {
          LOG.debug("Router heartbeat for router {}", routerId);
        }
      } catch (IOException e) {
        LOG.error("Cannot heartbeat router {}", routerId, e);
      }
    } else {
      LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
    }
  }

OK,到这里我们就知道ZK下的4个目录到底是如何是被谁操作的了。
ps:在Router初始化时启动了NamenodeHeartbeatService(可选),目的是是为了保证Router与Active NN通信