• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java DrainDispatcher类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.hadoop.yarn.event.DrainDispatcher的典型用法代码示例。如果您正苦于以下问题:Java DrainDispatcher类的具体用法?Java DrainDispatcher怎么用?Java DrainDispatcher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DrainDispatcher类属于org.apache.hadoop.yarn.event包,在下文中一共展示了DrainDispatcher类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createSpyService

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private ResourceLocalizationService createSpyService(
    DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
    NMStateStoreService stateStore) {
  ContainerExecutor exec = mock(ContainerExecutor.class);
  LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
  DeletionService delService = mock(DeletionService.class);
  NMContext nmContext =
      new NMContext(new NMContainerTokenSecretManager(conf),
        new NMTokenSecretManagerInNM(), null,
        new ApplicationACLsManager(conf), stateStore);
  ResourceLocalizationService rawService =
    new ResourceLocalizationService(dispatcher, exec, delService,
                                    dirsHandler, nmContext);
  ResourceLocalizationService spyService = spy(rawService);
  doReturn(mockServer).when(spyService).createServer();
  doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
      isA(Configuration.class));
  doReturn(lfs).when(spyService)
      .getLocalFileContext(isA(Configuration.class));
  return spyService;
}
 
开发者ID:naver,项目名称:hadoop,代码行数:22,代码来源:TestResourceLocalizationService.java


示例2: waitForContainerCleanup

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
    NodeHeartbeatResponse resp) throws Exception {
  int waitCount = 0, cleanedConts = 0;
  List<ContainerId> contsToClean;
  do {
    dispatcher.await();
    contsToClean = resp.getContainersToCleanup();
    cleanedConts += contsToClean.size();
    if (cleanedConts >= 1) {
      break;
    }
    Thread.sleep(100);
    resp = nm.nodeHeartbeat(true);
  } while(waitCount++ < 200);

  if (contsToClean.isEmpty()) {
    LOG.error("Failed to get any containers to cleanup");
  } else {
    LOG.info("Got cleanup for " + contsToClean.get(0));
  }
  Assert.assertEquals(1, cleanedConts);
}
 
开发者ID:naver,项目名称:hadoop,代码行数:23,代码来源:TestApplicationCleanup.java


示例3: startRMs

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
protected void startRMs() throws IOException {
  rm1 = new MockRM(confForRM1, null, false){
    @Override
    protected Dispatcher createDispatcher() {
      return new DrainDispatcher();
    }
  };
  rm2 = new MockRM(confForRM2, null, false){
    @Override
    protected Dispatcher createDispatcher() {
      return new DrainDispatcher();
    }
  };
  startRMs(rm1, confForRM1, rm2, confForRM2);

}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:17,代码来源:RMHATestBase.java


示例4: createSpyService

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private ResourceLocalizationService createSpyService(
    DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
    NMStateStoreService stateStore) {
  ContainerExecutor exec = mock(ContainerExecutor.class);
  LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
  DeletionService delService = mock(DeletionService.class);
  NMContext nmContext =
      new NMContext(new NMContainerTokenSecretManager(conf),
        new NMTokenSecretManagerInNM(), null,
        new ApplicationACLsManager(conf), stateStore,null);
  ResourceLocalizationService rawService =
    new ResourceLocalizationService(dispatcher, exec, delService,
                                    dirsHandler, nmContext);
  ResourceLocalizationService spyService = spy(rawService);
  doReturn(mockServer).when(spyService).createServer();
  doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
      isA(Configuration.class));
  doReturn(lfs).when(spyService)
      .getLocalFileContext(isA(Configuration.class));
  return spyService;
}
 
开发者ID:yncxcw,项目名称:big-c,代码行数:22,代码来源:TestResourceLocalizationService.java


示例5: testVerifyAndCreateRemoteDirNonExistence

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test
public void testVerifyAndCreateRemoteDirNonExistence()
    throws Exception {
  this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
  File aNewFile = new File(String.valueOf("tmp"+System.currentTimeMillis()));
  this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, 
      aNewFile.getAbsolutePath());
  
  DrainDispatcher dispatcher = createDispatcher();
  LogAggregationService logAggregationService = spy(
      new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                super.dirsHandler));
  logAggregationService.init(this.conf);
  boolean existsBefore = aNewFile.exists();
  assertTrue("The new file already exists!", !existsBefore);

  logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
  
  boolean existsAfter = aNewFile.exists();
  assertTrue("The new aggregate file is not successfully created", existsAfter);
  aNewFile.delete(); //housekeeping
}
 
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:23,代码来源:TestLogAggregationService.java


示例6: startRMs

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
protected void startRMs() throws IOException {
  rm1 = new MockRM(confForRM1, null, false, false){
    @Override
    protected Dispatcher createDispatcher() {
      return new DrainDispatcher();
    }
  };
  rm2 = new MockRM(confForRM2, null, false, false){
    @Override
    protected Dispatcher createDispatcher() {
      return new DrainDispatcher();
    }
  };
  startRMs(rm1, confForRM1, rm2, confForRM2);

}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:17,代码来源:RMHATestBase.java


示例7: setUp

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Before
public void setUp() {
  dispatcher = new DrainDispatcher();
  this.rm = new MockRM() {
    @Override
    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
      return new SchedulerEventDispatcher(this.scheduler) {
        @Override
        public void handle(SchedulerEvent event) {
          scheduler.handle(event);
        }
      };
    }

    @Override
    protected Dispatcher createDispatcher() {
      return dispatcher;
    }
  };
  rm.start();
  amService = rm.getApplicationMasterService();
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:23,代码来源:TestAMRMRPCNodeUpdates.java


示例8: getContainerOnHost

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private
    List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
        int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
        DrainDispatcher dispatcher, MyContainerAllocator allocator)
        throws Exception {
  ContainerRequestEvent reqEvent =
      createReq(jobId, taskAttemptId, memory, hosts);
  allocator.sendRequest(reqEvent);

  // Send the request to the RM
  List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
  dispatcher.await();
  Assert.assertEquals("No of assignments must be 0", 0, assigned.size());

  // Heartbeat from the required nodeManager
  mockNM.nodeHeartbeat(true);
  dispatcher.await();

  assigned = allocator.schedule();
  dispatcher.await();
  return assigned;
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:23,代码来源:TestRMContainerAllocator.java


示例9: setUp

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Before
public void setUp() {
  dispatcher = new DrainDispatcher();
  this.rm = new MockRM() {
    @Override
    public void init(Configuration conf) {
      conf.set(
        CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
        "1.0");
      super.init(conf);
    }
    @Override
    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
      return new SchedulerEventDispatcher(this.scheduler) {
        @Override
        public void handle(SchedulerEvent event) {
          scheduler.handle(event);
        }
      };
    }

    @Override
    protected Dispatcher createDispatcher() {
      return dispatcher;
    }
  };
  rm.start();
  amService = rm.getApplicationMasterService();
}
 
开发者ID:naver,项目名称:hadoop,代码行数:30,代码来源:TestAMRMRPCNodeUpdates.java


示例10: testFailAbortDoesntHang

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test (timeout=10000)
public void testFailAbortDoesntHang() throws IOException {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
  
  DrainDispatcher dispatcher = new DrainDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = Mockito.mock(OutputCommitter.class);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();
  //Job has only 1 mapper task. No reducers
  conf.setInt(MRJobConfig.NUM_REDUCES, 0);
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
  JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);

  //Fail / finish all the tasks. This should land the JobImpl directly in the
  //FAIL_ABORT state
  for(Task t: job.tasks.values()) {
    TaskImpl task = (TaskImpl) t;
    task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
    for(TaskAttempt ta: task.getAttempts().values()) {
      task.handle(new TaskTAttemptEvent(ta.getID(),
        TaskEventType.T_ATTEMPT_FAILED));
    }
  }

  dispatcher.await();
  //Verify abortJob is called once and the job failed
  Mockito.verify(committer, Mockito.timeout(2000).times(1))
    .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
  assertJobState(job, JobStateInternal.FAILED);

  dispatcher.stop();
}
 
开发者ID:naver,项目名称:hadoop,代码行数:39,代码来源:TestJobImpl.java


示例11: finishNextNTasks

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node,
    MRApp mrApp, Iterator<Task> it, int nextN) throws Exception {
  Task task;
  for (int i=0; i<nextN; i++) {
    task = it.next();
    finishTask(rmDispatcher, node, mrApp, task);
  }
}
 
开发者ID:naver,项目名称:hadoop,代码行数:9,代码来源:TestRMContainerAllocator.java


示例12: finishTask

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
    MRApp mrApp, Task task) throws Exception {
  TaskAttempt attempt = task.getAttempts().values().iterator().next();
  List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
  contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(),
      ContainerState.COMPLETE, "", 0));
  Map<ApplicationId,List<ContainerStatus>> statusUpdate =
      new HashMap<ApplicationId,List<ContainerStatus>>(1);
  statusUpdate.put(mrApp.getAppID(), contStatus);
  node.nodeHeartbeat(statusUpdate, true);
  rmDispatcher.await();
  mrApp.getContext().getEventHandler().handle(
        new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
  mrApp.waitForState(task, TaskState.SUCCEEDED);
}
 
开发者ID:naver,项目名称:hadoop,代码行数:16,代码来源:TestRMContainerAllocator.java


示例13: getContainerOnHost

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private
    List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
        int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
        DrainDispatcher dispatcher, MyContainerAllocator allocator,
        int expectedAdditions1, int expectedRemovals1,
        int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
        throws Exception {
  ContainerRequestEvent reqEvent =
      createReq(jobId, taskAttemptId, memory, hosts);
  allocator.sendRequest(reqEvent);

  // Send the request to the RM
  List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
  dispatcher.await();
  assertBlacklistAdditionsAndRemovals(
      expectedAdditions1, expectedRemovals1, rm);
  Assert.assertEquals("No of assignments must be 0", 0, assigned.size());

  // Heartbeat from the required nodeManager
  mockNM.nodeHeartbeat(true);
  dispatcher.await();

  assigned = allocator.schedule();
  dispatcher.await();
  assertBlacklistAdditionsAndRemovals(
      expectedAdditions2, expectedRemovals2, rm);
  return assigned;
}
 
开发者ID:naver,项目名称:hadoop,代码行数:29,代码来源:TestRMContainerAllocator.java


示例14: addNodeCapacityToPlan

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
  try {
    rm.registerNode("127.0.0.1:1", memory, vCores);
    int attempts = 10;
    do {
      DrainDispatcher dispatcher =
          (DrainDispatcher) rm1.getRMContext().getDispatcher();
      dispatcher.await();
      rm.getRMContext().getReservationSystem()
          .synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
      if (rm.getRMContext().getReservationSystem()
          .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
          .getMemory() > 0) {
        break;
      }
      LOG.info("Waiting for node capacity to be added to plan");
      Thread.sleep(100);
    } while (attempts-- > 0);
    if (attempts <= 0) {
      Assert.fail("Exhausted attempts in checking if node capacity was "
          + "added to the plan");
    }

  } catch (Exception e) {
    Assert.fail(e.getMessage());
  }
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:28,代码来源:TestReservationSystemWithRMHA.java


示例15: testAllocateAfterUnregister

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test(timeout=1200000)
public void  testAllocateAfterUnregister() throws Exception {
  MyResourceManager rm = new MyResourceManager(conf);
  rm.start();
  DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
          .getDispatcher();
  // Register node1
  MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);

  // Submit an application
  RMApp app1 = rm.submitApp(2048);

  nm1.nodeHeartbeat(true);
  RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
  MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
  am1.registerAppAttempt();
  // unregister app attempt
  FinishApplicationMasterRequest req =
      FinishApplicationMasterRequest.newInstance(
         FinalApplicationStatus.KILLED, "", "");
  am1.unregisterAppAttempt(req, false);
  // request container after unregister
  am1.addRequests(new String[] { "127.0.0.1" }, GB, 1, 1);
  AllocateResponse alloc1Response = am1.schedule();

  nm1.nodeHeartbeat(true);
  rmDispatcher.await();
  alloc1Response = am1.schedule();
  Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:31,代码来源:TestApplicationMasterService.java


示例16: testRMNodeStatusAfterReconnect

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception {
  // The node(127.0.0.1:1234) reconnected with RM. When it registered with
  // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But
  // the node's heartbeat come before RM succeeded setting the id to 0.
  final DrainDispatcher dispatcher = new DrainDispatcher();
  MockRM rm = new MockRM(){
    @Override
    protected Dispatcher createDispatcher() {
      return dispatcher;
    }
  };
  rm.start();
  MockNM nm1 =
      new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
  nm1.registerNode();
  int i = 0;
  while(i < 3) {
    nm1.nodeHeartbeat(true);
    dispatcher.await();
    i++;
  }

  MockNM nm2 =
      new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
  nm2.registerNode();
  RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
  nm2.nodeHeartbeat(true);
  dispatcher.await();
  Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING,
      rmNode.getState());
  rm.stop();
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:34,代码来源:TestNMReconnect.java


示例17: testAttemptNotFoundCausesRMCommunicatorException

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test(expected = RMContainerAllocationException.class)
public void testAttemptNotFoundCausesRMCommunicatorException()
    throws Exception {

  Configuration conf = new Configuration();
  MyResourceManager rm = new MyResourceManager(conf);
  rm.start();
  DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
      .getDispatcher();

  // Submit the application
  RMApp app = rm.submitApp(1024);
  dispatcher.await();

  MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
  amNodeManager.nodeHeartbeat(true);
  dispatcher.await();

  ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
      .getAppAttemptId();
  rm.sendAMLaunched(appAttemptId);
  dispatcher.await();

  JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
  Job mockJob = mock(Job.class);
  when(mockJob.getReport()).thenReturn(
      MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
          0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
  MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
      appAttemptId, mockJob);

  // Now kill the application
  rm.killApp(app.getApplicationId());
  rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
  allocator.schedule();
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:37,代码来源:TestRMContainerAllocator.java


示例18: testRMNodeStatusAfterReconnect

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception {
  // The node(127.0.0.1:1234) reconnected with RM. When it registered with
  // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But
  // the node's heartbeat come before RM succeeded setting the id to 0.
  final DrainDispatcher dispatcher = new DrainDispatcher();
  MockRM rm = new MockRM(){
    @Override
    protected Dispatcher createDispatcher() {
      return new DrainDispatcher();
    }
  };
  rm.start();
  MockNM nm1 =
      new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
  nm1.registerNode();
  int i = 0;
  while(i < 3) {
    nm1.nodeHeartbeat(true);
    dispatcher.await();
    i++;
  }

  MockNM nm2 =
      new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
  nm2.registerNode();
  RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
  nm2.nodeHeartbeat(true);
  dispatcher.await();
  Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING,
      rmNode.getState());
  rm.stop();
}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:34,代码来源:TestNMReconnect.java


示例19: testStopAfterError

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test(timeout=20000)
@SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
  DeletionService delSrvc = mock(DeletionService.class);

  // get the AppLogAggregationImpl thread to crash
  LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
  when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());

  DrainDispatcher dispatcher = createDispatcher();
  EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
  dispatcher.register(ApplicationEventType.class, appEventHandler);

  LogAggregationService logAggregationService =
      new LogAggregationService(dispatcher, this.context, delSrvc,
                                mockedDirSvc);
  logAggregationService.init(this.conf);
  logAggregationService.start();

  ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
  logAggregationService.handle(new LogHandlerAppStartedEvent(
          application1, this.user, null,
          ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));

  logAggregationService.stop();
  assertEquals(0, logAggregationService.getNumAggregators());
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:28,代码来源:TestLogAggregationService.java


示例20: testLogAggregatorCleanup

import org.apache.hadoop.yarn.event.DrainDispatcher; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testLogAggregatorCleanup() throws Exception {
  DeletionService delSrvc = mock(DeletionService.class);

  // get the AppLogAggregationImpl thread to crash
  LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);

  DrainDispatcher dispatcher = createDispatcher();
  EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
  dispatcher.register(ApplicationEventType.class, appEventHandler);

  LogAggregationService logAggregationService =
      new LogAggregationService(dispatcher, this.context, delSrvc,
                                mockedDirSvc);
  logAggregationService.init(this.conf);
  logAggregationService.start();

  ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
  logAggregationService.handle(new LogHandlerAppStartedEvent(
          application1, this.user, null,
          ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));

  logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
  dispatcher.await();
  int timeToWait = 20 * 1000;
  while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) {
    Thread.sleep(100);
    timeToWait -= 100;
  }
  Assert.assertEquals("Log aggregator failed to cleanup!", 0,
      logAggregationService.getNumAggregators());
}
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:34,代码来源:TestLogAggregationService.java



注:本文中的org.apache.hadoop.yarn.event.DrainDispatcher类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ItemBanner类代码示例发布时间:2022-05-22
下一篇:
Java Partition类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap