本文整理汇总了Java中org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo类的典型用法代码示例。如果您正苦于以下问题:Java UpdatedContainerInfo类的具体用法?Java UpdatedContainerInfo怎么用?Java UpdatedContainerInfo使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
UpdatedContainerInfo类属于org.apache.hadoop.yarn.server.resourcemanager.rmnode包,在下文中一共展示了UpdatedContainerInfo类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:FairScheduler.java
示例2: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
}
开发者ID:naver,项目名称:hadoop,代码行数:35,代码来源:CapacityScheduler.java
示例3: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer, node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
开发者ID:yncxcw,项目名称:big-c,代码行数:43,代码来源:FairScheduler.java
示例4: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer, node);
LOG.info("Container LAUNCHED:"+launchedContainer.getContainerId());
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.info("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are up to date and ready for scheduling.
if(LOG.isDebugEnabled())
{
LOG.info("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
}
开发者ID:yncxcw,项目名称:big-c,代码行数:37,代码来源:CapacityScheduler.java
示例5: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
containerCompleted(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource));
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:38,代码来源:FifoScheduler.java
示例6: setUp
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:35,代码来源:TestRMNodeTransitions.java
示例7: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
containerCompleted(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource));
}
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:41,代码来源:FifoScheduler.java
示例8: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
}
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:42,代码来源:FairScheduler.java
示例9: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
}
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:38,代码来源:CapacityScheduler.java
示例10: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
updateAvailableResourcesMetrics();
}
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:FifoScheduler.java
示例11: pullContainerUpdates
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Override
public List<UpdatedContainerInfo> pullContainerUpdates() {
return new ArrayList<UpdatedContainerInfo>();
}
开发者ID:naver,项目名称:hadoop,代码行数:5,代码来源:MockNodes.java
示例12: setUp
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null, null);
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
rmDispatcher.register(NodesListManagerEventType.class,
new TestNodeListManagerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
}
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:TestRMNodeTransitions.java
示例13: updateQueueWithNodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = scheduler.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
String queue =
appQueueMap.get(containerId.getApplicationAttemptId()
.getApplicationId());
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
updateQueueMetrics(queue, releasedMemory, releasedVCores);
}
}
}
开发者ID:naver,项目名称:hadoop,代码行数:44,代码来源:ResourceSchedulerWrapper.java
示例14: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
rmNode.getAggregatedContainersUtilization());
node.setNodeUtilization(rmNode.getNodeUtilization());
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
updateAvailableResourcesMetrics();
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:47,代码来源:FifoScheduler.java
示例15: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
attemptScheduling(node);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:48,代码来源:FairScheduler.java
示例16: setUp
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
null, null, mock(DelegationTokenRenewer.class), null, null, null,
null, null);
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
rmDispatcher.register(NodesListManagerEventType.class,
new TestNodeListManagerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:44,代码来源:TestRMNodeTransitions.java
示例17: updateQueueWithNodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = super.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
String queue = appQueueMap.get(containerId.getApplicationAttemptId());
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
updateQueueMetrics(queue, releasedMemory, releasedVCores);
}
}
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:42,代码来源:SLSCapacityScheduler.java
示例18: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer, node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.info("Container FINISHED by FifoScheduler by nodeUpdate: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
updateAvailableResourcesMetrics();
}
开发者ID:yncxcw,项目名称:big-c,代码行数:43,代码来源:FifoScheduler.java
示例19: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (reservedAppSchedulable != null &&
!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ " on node " + nm);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ " on node: " + nm);
node.getReservedAppSchedulable().assignReservedContainer(node);
}
}
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
queueMgr.getRootQueue().assignContainer(node),
Resources.none())) {
assignedContainers++;
assignedContainer = true;
}
if (!assignedContainer) { break; }
if (!assignMultiple) { break; }
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
updateRootQueueMetrics();
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:73,代码来源:FairScheduler.java
示例20: nodeUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; //导入依赖的package包/类
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication =
getApplication(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
|
请发表评论