• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java UpdatedContainerInfo类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo的典型用法代码示例。如果您正苦于以下问题:Java UpdatedContainerInfo类的具体用法?Java UpdatedContainerInfo怎么用?Java UpdatedContainerInfo使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



UpdatedContainerInfo类属于org.apache.hadoop.yarn.server.resourcemanager.rmnode包,在下文中一共展示了UpdatedContainerInfo类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
 * Process a heartbeat update from a node.
 */
private synchronized void nodeUpdate(RMNode nm) {
  long start = getClock().getTime();
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
  }
  eventLog.log("HEARTBEAT", nm.getHostName());
  FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  } 
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId),
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (continuousSchedulingEnabled) {
    if (!completedContainers.isEmpty()) {
      attemptScheduling(node);
    }
  } else {
    attemptScheduling(node);
  }

  long duration = getClock().getTime() - start;
  fsOpDurations.addNodeUpdateDuration(duration);
}
 
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:FairScheduler.java


示例2: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
  }

  FiCaSchedulerNode node = getNode(nm.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  // Now node data structures are upto date and ready for scheduling.
  if(LOG.isDebugEnabled()) {
    LOG.debug("Node being looked for scheduling " + nm
      + " availableResource: " + node.getAvailableResource());
  }
}
 
开发者ID:naver,项目名称:hadoop,代码行数:35,代码来源:CapacityScheduler.java


示例3: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
 * Process a heartbeat update from a node.
 */
private synchronized void nodeUpdate(RMNode nm) {
  long start = getClock().getTime();
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
  }
  eventLog.log("HEARTBEAT", nm.getHostName());
  FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  } 
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer, node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId),
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (continuousSchedulingEnabled) {
    if (!completedContainers.isEmpty()) {
      attemptScheduling(node);
    }
  } else {
    attemptScheduling(node);
  }

  long duration = getClock().getTime() - start;
  fsOpDurations.addNodeUpdateDuration(duration);
}
 
开发者ID:yncxcw,项目名称:big-c,代码行数:43,代码来源:FairScheduler.java


示例4: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
  }

  FiCaSchedulerNode node = getNode(nm.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer, node);
    LOG.info("Container LAUNCHED:"+launchedContainer.getContainerId());
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.info("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }
  
  // Now node data structures are up to date and ready for scheduling.
  if(LOG.isDebugEnabled()) 
  {
    LOG.info("Node being looked for scheduling " + nm
      + " availableResource: " + node.getAvailableResource());
  }
}
 
开发者ID:yncxcw,项目名称:big-c,代码行数:37,代码来源:CapacityScheduler.java


示例5: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
  FiCaSchedulerNode node = getNode(rmNode.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    containerCompleted(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          node.getAvailableResource(),minimumAllocation)) {
    LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
        " available resource = " + node.getAvailableResource());

    assignContainers(node);

    LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
        + node.getAvailableResource());
  }
  
  metrics.setAvailableResourcesToQueue(
      Resources.subtract(clusterResource, usedResource));
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:38,代码来源:FifoScheduler.java


示例6: setUp

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
  InlineDispatcher rmDispatcher = new InlineDispatcher();
  
  rmContext =
      new RMContextImpl(rmDispatcher, null, null, null,
          mock(DelegationTokenRenewer.class), null, null, null, null);
  scheduler = mock(YarnScheduler.class);
  doAnswer(
      new Answer<Void>() {

        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
          final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
          eventType = event.getType();
          if (eventType == SchedulerEventType.NODE_UPDATE) {
            List<UpdatedContainerInfo> lastestContainersInfoList = 
                ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
            for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
          	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
            }
          }
          return null;
        }
      }
      ).when(scheduler).handle(any(SchedulerEvent.class));
  
  rmDispatcher.register(SchedulerEventType.class, 
      new TestSchedulerEventDispatcher());
  
  NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
  node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);

}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:35,代码来源:TestRMNodeTransitions.java


示例7: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
  FiCaSchedulerNode node = getNode(rmNode.getNodeID());
  
  // Update resource if any change
  SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
  
  List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    containerCompleted(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          node.getAvailableResource(),minimumAllocation)) {
    LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
        " available resource = " + node.getAvailableResource());

    assignContainers(node);

    LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
        + node.getAvailableResource());
  }
  
  metrics.setAvailableResourcesToQueue(
      Resources.subtract(clusterResource, usedResource));
}
 
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:41,代码来源:FifoScheduler.java


示例8: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
 * Process a heartbeat update from a node.
 */
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
  }
  eventLog.log("HEARTBEAT", nm.getHostName());
  FSSchedulerNode node = nodes.get(nm.getNodeID());

  // Update resource if any change
  SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  } 
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId),
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (continuousSchedulingEnabled) {
    if (!completedContainers.isEmpty()) {
      attemptScheduling(node);
    }
  } else {
    attemptScheduling(node);
  }
}
 
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:42,代码来源:FairScheduler.java


示例9: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
  }

  FiCaSchedulerNode node = getNode(nm.getNodeID());
  
  // Update resource if any change
  SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  // Now node data structures are upto date and ready for scheduling.
  if(LOG.isDebugEnabled()) {
    LOG.debug("Node being looked for scheduling " + nm
      + " availableResource: " + node.getAvailableResource());
  }
}
 
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:38,代码来源:CapacityScheduler.java


示例10: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
  FiCaSchedulerNode node = getNode(rmNode.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }


  if (rmContext.isWorkPreservingRecoveryEnabled()
      && !rmContext.isSchedulerReadyForAllocatingContainers()) {
    return;
  }

  if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          node.getAvailableResource(),minimumAllocation)) {
    LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
        " available resource = " + node.getAvailableResource());

    assignContainers(node);

    LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
        + node.getAvailableResource());
  }

  updateAvailableResourcesMetrics();
}
 
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:FifoScheduler.java


示例11: pullContainerUpdates

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Override
public List<UpdatedContainerInfo> pullContainerUpdates() {
  return new ArrayList<UpdatedContainerInfo>();
}
 
开发者ID:naver,项目名称:hadoop,代码行数:5,代码来源:MockNodes.java


示例12: setUp

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
  InlineDispatcher rmDispatcher = new InlineDispatcher();
  
  rmContext =
      new RMContextImpl(rmDispatcher, null, null, null,
          mock(DelegationTokenRenewer.class), null, null, null, null, null);
  NodesListManager nodesListManager = mock(NodesListManager.class);
  HostsFileReader reader = mock(HostsFileReader.class);
  when(nodesListManager.getHostsReader()).thenReturn(reader);
  ((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
  scheduler = mock(YarnScheduler.class);
  doAnswer(
      new Answer<Void>() {

        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
          final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
          eventType = event.getType();
          if (eventType == SchedulerEventType.NODE_UPDATE) {
            List<UpdatedContainerInfo> lastestContainersInfoList = 
                ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
            for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
          	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
            }
          }
          return null;
        }
      }
      ).when(scheduler).handle(any(SchedulerEvent.class));
  
  rmDispatcher.register(SchedulerEventType.class, 
      new TestSchedulerEventDispatcher());
  
  rmDispatcher.register(NodesListManagerEventType.class,
      new TestNodeListManagerEventDispatcher());

  NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
  node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
  nodesListManagerEvent =  null;

}
 
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:TestRMNodeTransitions.java


示例13: updateQueueWithNodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private void updateQueueWithNodeUpdate(
        NodeUpdateSchedulerEventWrapper eventWrapper) {
  RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
  List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
  for (UpdatedContainerInfo info : containerList) {
    for (ContainerStatus status : info.getCompletedContainers()) {
      ContainerId containerId = status.getContainerId();
      SchedulerAppReport app = scheduler.getSchedulerAppInfo(
              containerId.getApplicationAttemptId());

      if (app == null) {
        // this happens for the AM container
        // The app have already removed when the NM sends the release
        // information.
        continue;
      }

      String queue =
          appQueueMap.get(containerId.getApplicationAttemptId()
            .getApplicationId());
      int releasedMemory = 0, releasedVCores = 0;
      if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
        for (RMContainer rmc : app.getLiveContainers()) {
          if (rmc.getContainerId() == containerId) {
            releasedMemory += rmc.getContainer().getResource().getMemory();
            releasedVCores += rmc.getContainer()
                    .getResource().getVirtualCores();
            break;
          }
        }
      } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
        if (preemptionContainerMap.containsKey(containerId)) {
          Resource preResource = preemptionContainerMap.get(containerId);
          releasedMemory += preResource.getMemory();
          releasedVCores += preResource.getVirtualCores();
          preemptionContainerMap.remove(containerId);
        }
      }
      // update queue counters
      updateQueueMetrics(queue, releasedMemory, releasedVCores);
    }
  }
}
 
开发者ID:naver,项目名称:hadoop,代码行数:44,代码来源:ResourceSchedulerWrapper.java


示例14: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
  FiCaSchedulerNode node = getNode(rmNode.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  // Updating node resource utilization
  node.setAggregatedContainersUtilization(
      rmNode.getAggregatedContainersUtilization());
  node.setNodeUtilization(rmNode.getNodeUtilization());

  if (rmContext.isWorkPreservingRecoveryEnabled()
      && !rmContext.isSchedulerReadyForAllocatingContainers()) {
    return;
  }

  if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          node.getAvailableResource(),minimumAllocation)) {
    LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
        " available resource = " + node.getAvailableResource());

    assignContainers(node);

    LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
        + node.getAvailableResource());
  }

  updateAvailableResourcesMetrics();
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:47,代码来源:FifoScheduler.java


示例15: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
 * Process a heartbeat update from a node.
 */
private synchronized void nodeUpdate(RMNode nm) {
  long start = getClock().getTime();
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
  }
  eventLog.log("HEARTBEAT", nm.getHostName());
  FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  } 
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId),
        completedContainer, RMContainerEventType.FINISHED);
  }

  if (continuousSchedulingEnabled) {
    if (!completedContainers.isEmpty()) {
      attemptScheduling(node);
    }
  } else {
    attemptScheduling(node);
  }

  // Updating node resource utilization
  node.setAggregatedContainersUtilization(
      nm.getAggregatedContainersUtilization());
  node.setNodeUtilization(nm.getNodeUtilization());

  long duration = getClock().getTime() - start;
  fsOpDurations.addNodeUpdateDuration(duration);
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:48,代码来源:FairScheduler.java


示例16: setUp

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
  InlineDispatcher rmDispatcher = new InlineDispatcher();
  
  rmContext =
      new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
        null, null, mock(DelegationTokenRenewer.class), null, null, null,
        null, null);
  NodesListManager nodesListManager = mock(NodesListManager.class);
  HostsFileReader reader = mock(HostsFileReader.class);
  when(nodesListManager.getHostsReader()).thenReturn(reader);
  ((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
  scheduler = mock(YarnScheduler.class);
  doAnswer(
      new Answer<Void>() {

        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
          final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
          eventType = event.getType();
          if (eventType == SchedulerEventType.NODE_UPDATE) {
            List<UpdatedContainerInfo> lastestContainersInfoList = 
                ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
            for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
          	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
            }
          }
          return null;
        }
      }
      ).when(scheduler).handle(any(SchedulerEvent.class));
  
  rmDispatcher.register(SchedulerEventType.class, 
      new TestSchedulerEventDispatcher());
  
  rmDispatcher.register(NodesListManagerEventType.class,
      new TestNodeListManagerEventDispatcher());

  NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
  node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
  nodesListManagerEvent =  null;

}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:44,代码来源:TestRMNodeTransitions.java


示例17: updateQueueWithNodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private void updateQueueWithNodeUpdate(
        NodeUpdateSchedulerEventWrapper eventWrapper) {
  RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
  List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
  for (UpdatedContainerInfo info : containerList) {
    for (ContainerStatus status : info.getCompletedContainers()) {
      ContainerId containerId = status.getContainerId();
      SchedulerAppReport app = super.getSchedulerAppInfo(
              containerId.getApplicationAttemptId());

      if (app == null) {
        // this happens for the AM container
        // The app have already removed when the NM sends the release
        // information.
        continue;
      }

      String queue = appQueueMap.get(containerId.getApplicationAttemptId());
      int releasedMemory = 0, releasedVCores = 0;
      if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
        for (RMContainer rmc : app.getLiveContainers()) {
          if (rmc.getContainerId() == containerId) {
            releasedMemory += rmc.getContainer().getResource().getMemory();
            releasedVCores += rmc.getContainer()
                    .getResource().getVirtualCores();
            break;
          }
        }
      } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
        if (preemptionContainerMap.containsKey(containerId)) {
          Resource preResource = preemptionContainerMap.get(containerId);
          releasedMemory += preResource.getMemory();
          releasedVCores += preResource.getVirtualCores();
          preemptionContainerMap.remove(containerId);
        }
      }
      // update queue counters
      updateQueueMetrics(queue, releasedMemory, releasedVCores);
    }
  }
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:42,代码来源:SLSCapacityScheduler.java


示例18: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
  FiCaSchedulerNode node = getNode(rmNode.getNodeID());
  
  List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer, node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.info("Container FINISHED by FifoScheduler by nodeUpdate: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }


  if (rmContext.isWorkPreservingRecoveryEnabled()
      && !rmContext.isSchedulerReadyForAllocatingContainers()) {
    return;
  }

  if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
          node.getAvailableResource(),minimumAllocation)) {
    LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
        " available resource = " + node.getAvailableResource());

    assignContainers(node);

    LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
        + node.getAvailableResource());
  }

  updateAvailableResourcesMetrics();
}
 
开发者ID:yncxcw,项目名称:big-c,代码行数:43,代码来源:FifoScheduler.java


示例19: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
 * Process a heartbeat update from a node.
 */
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
  }
  eventLog.log("HEARTBEAT", nm.getHostName());
  FSSchedulerNode node = nodes.get(nm.getNodeID());

  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  } 
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId),
        completedContainer, RMContainerEventType.FINISHED);
  }

  // Assign new containers...
  // 1. Check for reserved applications
  // 2. Schedule if there are no reservations

  AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
  if (reservedAppSchedulable != null) {
    Priority reservedPriority = node.getReservedContainer().getReservedPriority();
    if (reservedAppSchedulable != null &&
        !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
      // Don't hold the reservation if app can no longer use it
      LOG.info("Releasing reservation that cannot be satisfied for application "
          + reservedAppSchedulable.getApp().getApplicationAttemptId()
          + " on node " + nm);
      reservedAppSchedulable.unreserve(reservedPriority, node);
      reservedAppSchedulable = null;
    } else {
      // Reservation exists; try to fulfill the reservation
      LOG.info("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApp().getApplicationAttemptId()
          + " on node: " + nm);

      node.getReservedAppSchedulable().assignReservedContainer(node);
    }
  }
  if (reservedAppSchedulable == null) {
    // No reservation, schedule at queue which is farthest below fair share
    int assignedContainers = 0;
    while (node.getReservedContainer() == null) {
      boolean assignedContainer = false;
      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
            queueMgr.getRootQueue().assignContainer(node),
            Resources.none())) {
        assignedContainers++;
        assignedContainer = true;
      }
      if (!assignedContainer) { break; }
      if (!assignMultiple) { break; }
      if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
    }
  }
  updateRootQueueMetrics();
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:73,代码来源:FairScheduler.java


示例20: nodeUpdate

import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
  }

  FiCaSchedulerNode node = getNode(nm.getNodeID());
  List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
  List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
  List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  for(UpdatedContainerInfo containerInfo : containerInfoList) {
    newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
    completedContainers.addAll(containerInfo.getCompletedContainers());
  }
  
  // Processing the newly launched containers
  for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    containerLaunchedOnNode(launchedContainer.getContainerId(), node);
  }

  // Process completed containers
  for (ContainerStatus completedContainer : completedContainers) {
    ContainerId containerId = completedContainer.getContainerId();
    LOG.debug("Container FINISHED: " + containerId);
    completedContainer(getRMContainer(containerId), 
        completedContainer, RMContainerEventType.FINISHED);
  }

  // Now node data structures are upto date and ready for scheduling.
  if(LOG.isDebugEnabled()) {
    LOG.debug("Node being looked for scheduling " + nm
      + " availableResource: " + node.getAvailableResource());
  }

  // Assign new containers...
  // 1. Check for reserved applications
  // 2. Schedule if there are no reservations

  RMContainer reservedContainer = node.getReservedContainer();
  if (reservedContainer != null) {
    FiCaSchedulerApp reservedApplication = 
        getApplication(reservedContainer.getApplicationAttemptId());
    
    // Try to fulfill the reservation
    LOG.info("Trying to fulfill reservation for application " + 
         

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java FastDtoa类代码示例发布时间:2022-05-22
下一篇:
Java GuiModList类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap