Yarn Capacity Scheduler Introduction

Yarn Resource Allocation Overview

Yarn’s three classic schedulers, FIFO (which is rarely used), Fair Scheduler, and the topic of today’s discussion, Yarn’s Capacity Scheduler, are familiar to many. The official documentation provides a detailed explanation of the configuration parameters for the Capacity Scheduler. Today, I will use a practical example to help you understand the principles of this allocation mechanism.

Practical Example

First, we need to understand how Yarn obtains information about the available resources in the current cluster, i.e., how much resources are still available. Yarn obtains the available resources in the current cluster by checking the usage of resources in the file system mounted under /proc in Linux. In this example, let’s assume that the Yarn cluster has a total of 11GB of memory resources (for the sake of simplification, we only consider memory resources, assume there is only one node in the entire cluster, one application launches only one container, and we do not consider the issue of virtual memory). User A submits an application A that requires 10GB of memory. After application A starts, the actual resource usage is 1GB. Next, user B submits a similar application B that requests 10GB of memory resources, and the actual usage of the application after it starts is also 1GB.

From the perspective of the Yarn Application Master (AM), the entire process in this example is as follows:

  1. Yarn will go through all the registered ApplicationMaster to allocate requested resource in Capacity Scheduler. Eventually they will call the void allocate(ApplicationAttemptId appAttemptId, AllocateRequest request, AllocateResponse response) throws YarnException` function to try to allocate requested resource.
 @Override
  @Lock(Lock.NoLock.class)
  public Allocation allocate(ApplicationAttemptId applicationAttemptId,
      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
      List<ContainerId> release, List<String> blacklistAdditions,
      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed or non existent application " +
          applicationAttemptId.getApplicationId());
      return EMPTY_ALLOCATION;
    }

    // The allocate may be the leftover from previous attempt, and it will
    // impact current attempt, such as confuse the request and allocation for
    // current attempt's AM container.
    // Note outside precondition check for the attempt id may be
    // outdated here, so double check it here is necessary.
    if (!application.getApplicationAttemptId().equals(applicationAttemptId)) {
      LOG.error("Calling allocate on previous or removed " +
          "or non existent application attempt " + applicationAttemptId);
      return EMPTY_ALLOCATION;
    }

    // Handle all container updates
    handleContainerUpdates(application, updateRequests);

    // Release containers
    releaseContainers(release, application);

    AbstractLeafQueue updateDemandForQueue = null;

    // Sanity check for new allocation requests
    normalizeResourceRequests(ask);

    // Normalize scheduling requests
    normalizeSchedulingRequests(schedulingRequests);

    Allocation allocation;

    // make sure we aren't stopping/removing the application
    // when the allocate comes in
    application.getWriteLock().lock();
    try {
      if (application.isStopped()) {
        return EMPTY_ALLOCATION;
      }

      // Process resource requests
      if (!ask.isEmpty() || (schedulingRequests != null && !schedulingRequests
          .isEmpty())) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "allocate: pre-update " + applicationAttemptId + " ask size ="
                  + ask.size());
          application.showRequests();
        }

        // Update application requests
        if (application.updateResourceRequests(ask) || application
            .updateSchedulingRequests(schedulingRequests)) {
          updateDemandForQueue = (AbstractLeafQueue) application.getQueue();
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("allocate: post-update");
          application.showRequests();
        }
      }

      application.updateBlacklist(blacklistAdditions, blacklistRemovals);

      allocation = application.getAllocation(getResourceCalculator(),
          getClusterResource(), getMinimumResourceCapability());
    } finally {
      application.getWriteLock().unlock();
    }

    if (updateDemandForQueue != null && !application
        .isWaitingForAMContainer()) {
      updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
    }

    LOG.debug("Allocation for application {} : {} with cluster resource : {}",
        applicationAttemptId, allocation, getClusterResource());
    return allocation;
  }

2. To obtain the current available resources in the cluster, we first ignore concepts such as reservation and preemption and try to simplify the process as much as possible for easier understanding. Let’s focus on the application.getAllocation() .

allocation = application.getAllocation(getResourceCalculator(),

Actually all the resource will allocated in the Capacity Scheduler. In the run()function, Capacity Scheduler will keep calling a function called schedule(cs). cs here means the Capacity Scheduler.

static void schedule(CapacityScheduler cs) throws InterruptedException{
    // First randomize the start point
    int current = 0;
    Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();

    // If nodes size is 0 (when there are no node managers registered,
    // we can return from here itself.
    int nodeSize = nodes.size();
    if(nodeSize == 0) {
      return;
    }
      if (!cs.multiNodePlacementEnabled) {
    int start = random.nextInt(nodeSize);

    boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs);

    // Allocate containers of node [start, end)
    for (FiCaSchedulerNode node : nodes) {
      if (current++ >= start) {
        if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
          continue;
        }
        cs.allocateContainersToNode(node.getNodeID(), false);
      }
    }
    ......

As it described, it will scan all the nodes registered, and try to allocate container on the nodes. To prevent ourselves from being submerged by the ocean of code, I jumped to the straight point.

private CSAssignment allocateContainerOnSingleNode(
      CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
      boolean withNodeHeartbeat) {
    LOG.debug("Trying to schedule on node: {}, available: {}",
        node.getNodeName(), node.getUnallocatedResource());
        ......
            // First check if we can schedule
    // When this time look at one node only, try schedule if the node
    // has any available or killable resource
    if (calculator.computeAvailableContainers(Resources
            .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
        minimumAllocation) <= 0) {
      LOG.debug("This node " + node.getNodeID() + " doesn't have sufficient "
          + "available or preemptible resource for minimum allocation");
      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
          "", getRootQueue().getQueuePath(), ActivityState.REJECTED,
          ActivityDiagnosticConstant.
              INIT_CHECK_SINGLE_NODE_RESOURCE_INSUFFICIENT);
      ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
          node);
      return null;
    }

    return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);

Here we need to focus on two key points

  • How the scheduler determine if there is enough resource to be allocated.
  • The details of allocating resource on a nodemanager.

Let’s start from the 1st point. When we look at the lines

if (calculator.computeAvailableContainers(Resources
            .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
        minimumAllocation) <= 0) {

`getUnallocatedResource()`

  public synchronized void updateTotalResource(Resource resource){
    this.totalResource = resource;
    this.unallocatedResource = Resources.subtract(totalResource,
        this.allocatedResource);
  }

Obviously we can see that AvailableResource = TotalResource - AllocatedResource. Just hold this general view point now, and let’s move on to the second point, “The details of allocating resource on a nodemanager.”

public CSAssignment assignContainers(Resource clusterResource,
      CandidateNodeSet<FiCaSchedulerNode> candidates,
      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);

    if (LOG.isDebugEnabled()) {
      LOG.debug("assignContainers: partition=" + candidates.getPartition()
          + " #applications=" + orderingPolicy.getNumSchedulableEntities());
    }

    setPreemptionAllowed(currentResourceLimits, candidates.getPartition());

    // Check for reserved resources, try to allocate reserved container first.
    CSAssignment assignment = allocateFromReservedContainer(clusterResource,
        candidates, currentResourceLimits, schedulingMode);
    if (null != assignment) {
      return assignment;
    }
    ......
          // Try to schedule
      assignment = application.assignContainers(clusterResource,
          candidates, currentResourceLimits, schedulingMode, null);

We traced it step by step and finally found this function:

  @Override
  public CSAssignment assignContainers(Resource clusterResource,
      CandidateNodeSet<FiCaSchedulerNode> candidates,
      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
      RMContainer reservedContainer) {
    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);

    if (reservedContainer == null) {
      // Check if application needs more resource, skip if it doesn't need more.
      if (!application.hasPendingResourceRequest(candidates.getPartition(),
          schedulingMode)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
              + ", because it doesn't need more resource, schedulingMode="
              + schedulingMode.name() + " node-label=" + candidates
              .getPartition());
        }
        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
            activitiesManager, node, application, null,
            ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE,
            ActivityLevel.APP);
        return CSAssignment.SKIP_ASSIGNMENT;
      }
      
      // Schedule in priority order
      for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
        ContainerAllocation result = allocate(clusterResource, candidates,
            schedulingMode, resourceLimits, schedulerKey, null);
            ......

Then in the RegularAllocater

 ContainerAllocation tryAllocateOnNode(Resource clusterResource,
      FiCaSchedulerNode node, SchedulingMode schedulingMode,
      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
      RMContainer reservedContainer) {
    ContainerAllocation result;

    // Sanity checks before assigning to this node
    result = checkIfNodeBlackListed(node, schedulerKey);
    if (null != result) {
      return result;
    }

    // Inform the application it is about to get a scheduling opportunity
    // TODO, we may need to revisit here to see if we should add scheduling
    // opportunity here
    application.addSchedulingOpportunity(schedulerKey);

    // Try to allocate containers on node
    result =
        assignContainersOnNode(clusterResource, node, schedulerKey,
            reservedContainer, schedulingMode, resourceLimits);
    
    if (null == reservedContainer) {
      if (result.getAllocationState() == AllocationState.PRIORITY_SKIPPED) {
        // Don't count 'skipped nodes' as a scheduling opportunity!
        application.subtractSchedulingOpportunity(schedulerKey);
      }
    }
    
    return result;
  }

let’s continue tracing inside, `assignContainer`. After a series of check of the available resource and containers on the node, it will new a `ContainerApplication` and return.

  private ContainerAllocation assignContainer(Resource clusterResource,
      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
      PendingAsk pendingAsk, NodeType type, RMContainer rmContainer,
      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
        .......
    if (availableContainers > 0) {
      // Allocate...
      // We will only do continuous reservation when this is not allocated from
      // reserved container
      if (rmContainer == null && reservationsContinueLooking) {
        // when reservationsContinueLooking is set, we may need to unreserve
        // some containers to meet this queue, its parents', or the users'
        // resource limits.
        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
          if (!needToUnreserve) {
            // If we shouldn't allocate/reserve new container then we should
            // unreserve one the same size we are asking for since the
            // currentResoureLimits.getAmountNeededUnreserve could be zero. If
            // the limit was hit then use the amount we need to unreserve to be
            // under the limit.
            resourceNeedToUnReserve = capability;
          }
          unreservedContainer = application.findNodeToUnreserve(node,
                  schedulerKey, resourceNeedToUnReserve);
          // When (minimum-unreserved-resource > 0 OR we cannot allocate
          // new/reserved
          // container (That means we *have to* unreserve some resource to
          // continue)). If we failed to unreserve some resource, we can't
          // continue.
          if (null == unreservedContainer) {
            // Skip the locality request
            ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
                activitiesManager, node, application, schedulerKey,
                ActivityDiagnosticConstant.
                    NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED,
                ActivityLevel.NODE);
            return ContainerAllocation.LOCALITY_SKIPPED;
          }
        }
      }

      ContainerAllocation result = new ContainerAllocation(unreservedContainer,
          pendingAsk.getPerAllocationResource(), AllocationState.ALLOCATED);
      result.containerNodeType = type;
      result.setToKillContainers(toKillContainers);
      return result;

Then it will really start to allocate container on the nodemanager


  ContainerAllocation doAllocation(ContainerAllocation allocationResult,
      FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
      RMContainer reservedContainer) {
    // Create the container if necessary
    Container container =
        getContainer(reservedContainer, node,
            allocationResult.getResourceToBeAllocated(), schedulerKey);

    // something went wrong getting/creating the container
    if (container == null) {
      application
          .updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
      LOG.warn("Couldn't get container for allocation!");
      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
          node, application, schedulerKey,
          ActivityDiagnosticConstant.APPLICATION_COULD_NOT_GET_CONTAINER,
          ActivityState.REJECTED, ActivityLevel.APP);
      return ContainerAllocation.APP_SKIPPED;
    }

    if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
      // When allocating container
      allocationResult = handleNewContainerAllocation(allocationResult, node,
          schedulerKey, container);

And Finally we got this point

 public RMContainer allocate(FiCaSchedulerNode node,
      SchedulerRequestKey schedulerKey, Container container) {
    readLock.lock();
    try {

      if (isStopped) {
        return null;
      }

      // Required sanity check - AM can call 'allocate' to update resource
      // request without locking the scheduler, hence we need to check
      if (getOutstandingAsksCount(schedulerKey) <= 0) {
        return null;
      }

      AppPlacementAllocator<FiCaSchedulerNode> ps =
          appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
      if (null == ps) {
        LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
            + " for application=" + getApplicationId() + " schedulerRequestKey="
            + schedulerKey);
        return null;
      }

      // Create RMContainer
      RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
          this.getApplicationAttemptId(), node.getNodeID(),
          appSchedulingInfo.getUser(), this.rmContext,
          ps.getPrimaryRequestedNodePartition());

Let’s get back to our experiment, upon receiving the application submission request from User A for 10GB of memory, and upon verifying that the current cluster resources are 11GB, it is determined that there are sufficient resources. As such, resources are allocated and the corresponding containers are launched to run the application.

Through the above three steps, we can see that the resources reported by the NodeManager are `TotalResource – AllocatedResource = AvailableResource`. Therefore, when the second application request is made, assuming that there is only one node in the current cluster, the available resources for the cluster when the second job is submitted would be `11GB (total) – 10GB (actual usage) = 1GB`, which will not satisfie the submission conditions. However, once the spark dynamic allocated executors feature is enabled, it’s very likely that most of the containers will be recycled so that there will be still availableResource which is greater than 1GB left.

Conclusion

In this experiment, we can draw the following two conclusions:

  • If the real usage of the applications is just around 10% of the allocated resource, the resource will be reserved and kept wasted.

This scenario highlights the importance of carefully managing and optimizing resource allocation in a YARN cluster. It is recommended to accurately estimate the actual memory needs of each application and adjust the requested memory accordingly to avoid over-allocating resources. This can help to avoid situations where applications cannot acquire the resources they need due to insufficient available memory in the cluster.

  • Assuming we optimize the parameters so that the user’s request for resources is reduced from `10GB + 10GB + 9GB = 29GB` to `5GB + 5GB + 5GB = 15GB`, we can say that we have saved 15GB of resources for the cluster.

Here, we will not discuss the issue of overallocation. In the following blogs, I will continue to update some of the features of YARN in oversubscription.