• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Deadline类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中scala.concurrent.duration.Deadline的典型用法代码示例。如果您正苦于以下问题:Java Deadline类的具体用法?Java Deadline怎么用?Java Deadline使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Deadline类属于scala.concurrent.duration包,在下文中一共展示了Deadline类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: awaitJobManagerGatewayAndWebPort

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Awaits the leading job manager gateway and its web monitor port.
 */
public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
	Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
	Deadline deadline = timeout.fromNow();

	while (!deadline.isOverdue()) {
		synchronized (waitLock) {
			gatewayPortFuture = leaderGatewayPortFuture;

			if (gatewayPortFuture != null) {
				break;
			}

			waitLock.wait(deadline.timeLeft().toMillis());
		}
	}

	if (gatewayPortFuture == null) {
		throw new TimeoutException("There is no JobManager available.");
	} else {
		return Await.result(gatewayPortFuture, deadline.timeLeft());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:JobManagerRetriever.java


示例2: waitForLeaderNotification

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private void waitForLeaderNotification(
		String expectedJobManagerURL,
		GatewayRetriever<JobManagerGateway> retriever,
		Deadline deadline) throws Exception {

	while (deadline.hasTimeLeft()) {
		Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();

		if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) {
			return;
		}
		else {
			Thread.sleep(100);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:WebRuntimeMonitorITCase.java


示例3: restartAfterFailure

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
	makeAFailureAndWait(eg, timeout);

	assertEquals(JobStatus.RUNNING, eg.getState());

	// Wait for deploying after async restart
	Deadline deadline = timeout.fromNow();
	waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, deadline);

	if (haltAfterRestart) {
		if (deadline.hasTimeLeft()) {
			haltExecution(eg);
		} else {
			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:ExecutionGraphRestartTest.java


示例4: getJobManagerPort

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Parses the port from the job manager logs and returns it.
 *
 * <p>If a call to this method succeeds, successive calls will directly
 * return the port and re-parse the logs.
 *
 * @param timeout Timeout for log parsing.
 * @return The port of the job manager
 * @throws InterruptedException  If interrupted while waiting before
 *                               retrying to parse the logs
 * @throws NumberFormatException If the parsed port is not a number
 */
public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
	if (jobManagerPort > 0) {
		return jobManagerPort;
	} else {
		Deadline deadline = timeout.fromNow();
		while (deadline.hasTimeLeft()) {
			Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
			if (matcher.find()) {
				String port = matcher.group(1);
				jobManagerPort = Integer.parseInt(port);
				return jobManagerPort;
			} else {
				Thread.sleep(100);
			}
		}

		throw new RuntimeException("Could not parse port from logs");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:JobManagerProcess.java


示例5: getKvState

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private static <K, S extends State, V> CompletableFuture<S> getKvState(
		final Deadline deadline,
		final QueryableStateClient client,
		final JobID jobId,
		final String queryName,
		final K key,
		final TypeInformation<K> keyTypeInfo,
		final StateDescriptor<S, V> stateDescriptor,
		final boolean failForUnknownKeyOrNamespace,
		final ScheduledExecutor executor) {

	final CompletableFuture<S> resultFuture = new CompletableFuture<>();
	getKvStateIgnoringCertainExceptions(
			deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
			stateDescriptor, failForUnknownKeyOrNamespace, executor);
	return resultFuture;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AbstractQueryableStateTestBase.java


示例6: max

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Returns the deadline to expire last.
 *
 * @param d1 first deadline
 * @param d2 second deadline
 * @return d1 if d1.timeLeft() &gt;= d2.timeLeft() else d2.  If either is null, the other is returned.
 */
@Nullable
public static Deadline max(@Nullable final Deadline d1, @Nullable final Deadline d2) {
    if (d1 == null) {
        return d2;
    }
    if (d2 == null) {
        return d1;
    }

    if (d1.timeLeft().gteq(d2.timeLeft())) {
        return d1;
    } else {
        return d2;
    }
}
 
开发者ID:tomtom-international,项目名称:speedtools,代码行数:23,代码来源:DurationUtils.java


示例7: testStopYarn

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
@Test
public void testStopYarn() throws Exception {
	// this only works if there is no active job at this point
	assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
	
	// Create a task
	final JobVertex sender = new JobVertex("Sender");
	sender.setParallelism(2);
	sender.setInvokableClass(StoppableInvokable.class);

	final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
	final JobID jid = jobGraph.getJobID();

	cluster.submitJobDetached(jobGraph);

	// wait for job to show up
	while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
		Thread.sleep(10);
	}

	final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
	final Deadline deadline = testTimeout.fromNow();

	while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
			// Request the file from the web server
			client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());

			HttpTestClient.SimpleHttpResponse response = client
					.getNextResponse(deadline.timeLeft());

			assertEquals(HttpResponseStatus.OK, response.getStatus());
			assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
			assertEquals("{}", response.getContent());
		}
		
		Thread.sleep(20);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:WebFrontendITCase.java


示例8: waitForJobRemoved

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private void waitForJobRemoved(
		JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout)
		throws Exception {

	ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
	AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);

	Future<Object> archiveFuture = jobManagerGateway.ask(JobManagerMessages.getRequestArchive(), timeout);

	ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(archiveFuture, timeout)).actor();

	AkkaActorGateway archiveGateway = new AkkaActorGateway(archive, null);

	Deadline deadline = timeout.fromNow();

	while (deadline.hasTimeLeft()) {
		JobManagerMessages.JobStatusResponse resp = JobManagerActorTestUtils
				.requestJobStatus(jobId, archiveGateway, deadline.timeLeft());

		if (resp instanceof JobManagerMessages.JobNotFound) {
			Thread.sleep(100);
		}
		else {
			return;
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ChaosMonkeyITCase.java


示例9: testStopYarn

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
@Test
public void testStopYarn() throws Exception {
	// this only works if there is no active job at this point
	assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());

	// Create a task
	final JobVertex sender = new JobVertex("Sender");
	sender.setParallelism(2);
	sender.setInvokableClass(StoppableInvokable.class);

	final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
	final JobID jid = jobGraph.getJobID();

	cluster.submitJobDetached(jobGraph);

	// wait for job to show up
	while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
		Thread.sleep(10);
	}

	final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
	final Deadline deadline = testTimeout.fromNow();

	while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
			// Request the file from the web server
			client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());

			HttpTestClient.SimpleHttpResponse response = client
				.getNextResponse(deadline.timeLeft());

			assertEquals(HttpResponseStatus.OK, response.getStatus());
			assertEquals("application/json; charset=UTF-8", response.getType());
			assertEquals("{}", response.getContent());
		}

		Thread.sleep(20);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:WebFrontendITCase.java


示例10: testRequestUnavailableHost

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Tests that a request to an unavailable host is failed with ConnectException.
 */
@Test
public void testRequestUnavailableHost() throws Exception {
	Deadline deadline = TEST_TIMEOUT.fromNow();
	AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
	KvStateClient client = null;

	try {
		client = new KvStateClient(1, stats);

		int availablePort = NetUtils.getAvailablePort();

		KvStateServerAddress serverAddress = new KvStateServerAddress(
				InetAddress.getLocalHost(),
				availablePort);

		Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);

		try {
			Await.result(future, deadline.timeLeft());
			fail("Did not throw expected ConnectException");
		} catch (ConnectException ignored) {
			// Expected
		}
	} finally {
		if (client != null) {
			client.shutDown();
		}

		assertEquals("Channel leak", 0, stats.getNumConnections());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:KvStateClientTest.java


示例11: testCancelWhileRestarting

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
@Test
public void testCancelWhileRestarting() throws Exception {
	// We want to manually control the restart and delay
	RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
	Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
	ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
	Instance instance = executionGraphInstanceTuple.f1;

	// Kill the instance and wait for the job to restart
	instance.markDead();

	Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();

	while (deadline.hasTimeLeft() &&
			executionGraph.getState() != JobStatus.RESTARTING) {

		Thread.sleep(100);
	}

	assertEquals(JobStatus.RESTARTING, executionGraph.getState());

	// Canceling needs to abort the restart
	executionGraph.cancel();

	assertEquals(JobStatus.CANCELED, executionGraph.getState());

	// The restart has been aborted
	executionGraph.restart(executionGraph.getGlobalModVersion());

	assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:ExecutionGraphRestartTest.java


示例12: waitForAllResourcesToBeAssignedAfterAsyncRestart

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph eg, Deadline deadline) throws InterruptedException {
	boolean success = false;

	while (deadline.hasTimeLeft() && !success) {
		success = true;

		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
			if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
				success = false;
				Thread.sleep(100);
				break;
			}
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:ExecutionGraphRestartTest.java


示例13: getActorRef

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Waits for the job manager to be reachable.
 *
 * <p><strong>Important:</strong> Make sure to set the timeout larger than Akka's gating
 * time. Otherwise, this will effectively not wait for the JobManager to startup, because the
 * time out will fire immediately.
 *
 * @param actorSystem Actor system to be used to resolve JobManager address.
 * @param timeout     Timeout (make sure to set larger than Akka's gating time).
 */
public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration timeout)
		throws Exception {

	if (jobManagerRef != null) {
		return jobManagerRef;
	}

	checkNotNull(actorSystem, "Actor system");

	// Deadline passes timeout ms
	Deadline deadline = timeout.fromNow();

	while (deadline.hasTimeLeft()) {
		try {
			// If the Actor is not reachable yet, this throws an Exception. Retry until the
			// deadline passes.
			this.jobManagerRef = AkkaUtils.getActorRef(
					getJobManagerAkkaURL(deadline.timeLeft()),
					actorSystem,
					deadline.timeLeft());

			return jobManagerRef;
		}
		catch (Throwable ignored) {
			// Retry
			Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
		}
	}

	throw new IllegalStateException("JobManager did not start up within " + timeout + ".");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:42,代码来源:JobManagerProcess.java


示例14: waitForTaskManagers

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Waits for a minimum number of task managers to connect to the job manager.
 *
 * @param minimumNumberOfTaskManagers Minimum number of task managers to wait for
 * @param jobManager                  Job manager actor to ask
 * @param timeout                     Timeout after which the operation fails
 * @throws Exception If the task managers don't connection with the timeout.
 */
public static void waitForTaskManagers(
		int minimumNumberOfTaskManagers,
		ActorGateway jobManager,
		FiniteDuration timeout) throws Exception {

	checkArgument(minimumNumberOfTaskManagers >= 1);
	checkNotNull(jobManager, "Job manager");
	checkNotNull(timeout, "Timeout");

	final Deadline deadline = timeout.fromNow();

	while (deadline.hasTimeLeft()) {
		Future<Object> ask = jobManager.ask(getRequestNumberRegisteredTaskManager(),
				deadline.timeLeft());

		Integer response = (Integer) Await.result(ask, deadline.timeLeft());

		// All are connected. We are done.
		if (response >= minimumNumberOfTaskManagers) {
			return;
		}
		// Waiting for more... retry
		else {
			Thread.sleep(Math.min(100, deadline.timeLeft().toMillis()));
		}
	}

	throw new IllegalStateException("Task managers not connected within deadline.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:JobManagerActorTestUtils.java


示例15: testValueState

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Tests simple value state queryable state instance. Each source emits
 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 * queried. The tests succeeds after each subtask index is queried with
 * value numElements (the latest element updated the state).
 */
@Test
public void testValueState() throws Exception {

	final Deadline deadline = TEST_TIMEOUT.fromNow();
	final long numElements = 1024L;

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStateBackend(stateBackend);
	env.setParallelism(maxParallelism);
	// Very important, because cluster is shared between tests and we
	// don't explicitly check that all slots are available before
	// submitting.
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));

	DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));

	// Value state
	ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>("any", source.getType());

	source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
		private static final long serialVersionUID = 7662520075515707428L;

		@Override
		public Integer getKey(Tuple2<Integer, Long> value) {
			return value.f0;
		}
	}).asQueryableState("hakuna", valueState);

	try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {

		final JobID jobId = autoCancellableJob.getJobId();
		final JobGraph jobGraph = autoCancellableJob.getJobGraph();

		cluster.submitJobDetached(jobGraph);

		executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:45,代码来源:AbstractQueryableStateTestBase.java


示例16: testValueStateShortcut

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Tests simple value state queryable state instance. Each source emits
 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 * queried. The tests succeeds after each subtask index is queried with
 * value numElements (the latest element updated the state).
 *
 * <p>This is the same as the simple value state test, but uses the API shortcut.
 */
@Test
public void testValueStateShortcut() throws Exception {

	final Deadline deadline = TEST_TIMEOUT.fromNow();
	final long numElements = 1024L;

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStateBackend(stateBackend);
	env.setParallelism(maxParallelism);
	// Very important, because cluster is shared between tests and we
	// don't explicitly check that all slots are available before
	// submitting.
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));

	DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestAscendingValueSource(numElements));

	// Value state shortcut
	final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
				private static final long serialVersionUID = 9168901838808830068L;

				@Override
				public Integer getKey(Tuple2<Integer, Long> value) {
					return value.f0;
				}
			}).asQueryableState("matata");

	final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
			(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();

	try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) {

		final JobID jobId = autoCancellableJob.getJobId();
		final JobGraph jobGraph = autoCancellableJob.getJobGraph();

		cluster.submitJobDetached(jobGraph);
		executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:48,代码来源:AbstractQueryableStateTestBase.java


示例17: AutoCancellableJob

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) {
	Preconditions.checkNotNull(env);

	this.cluster = Preconditions.checkNotNull(cluster);
	this.jobGraph = env.getStreamGraph().getJobGraph();
	this.deadline = Preconditions.checkNotNull(deadline);

	this.jobId = jobGraph.getJobID();
	this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:AbstractQueryableStateTestBase.java


示例18: notifyWhenJobStatusIs

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs(
		final JobID jobId, final JobStatus status, final Deadline deadline) {

	return FutureUtils.toJava(
			cluster.getLeaderGateway(deadline.timeLeft())
					.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft())
					.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:AbstractQueryableStateTestBase.java


示例19: getKvStateIgnoringCertainExceptions

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
		final Deadline deadline,
		final CompletableFuture<S> resultFuture,
		final QueryableStateClient client,
		final JobID jobId,
		final String queryName,
		final K key,
		final TypeInformation<K> keyTypeInfo,
		final StateDescriptor<S, V> stateDescriptor,
		final boolean failForUnknownKeyOrNamespace,
		final ScheduledExecutor executor) {

	if (!resultFuture.isDone()) {
		CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
		expected.whenCompleteAsync((result, throwable) -> {
			if (throwable != null) {
				if (
						throwable.getCause() instanceof CancellationException ||
						throwable.getCause() instanceof AssertionError ||
						(failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
				) {
					resultFuture.completeExceptionally(throwable.getCause());
				} else if (deadline.hasTimeLeft()) {
					getKvStateIgnoringCertainExceptions(
							deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
							stateDescriptor, failForUnknownKeyOrNamespace, executor);
				}
			} else {
				resultFuture.complete(result);
			}
		}, executor);

		resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:AbstractQueryableStateTestBase.java


示例20: executeValueQuery

import scala.concurrent.duration.Deadline; //导入依赖的package包/类
/**
 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
 * <tt>expected</tt> equals the value of the result tuple's second field.
 */
private void executeValueQuery(
		final Deadline deadline,
		final QueryableStateClient client,
		final JobID jobId,
		final String queryableStateName,
		final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor,
		final long expected) throws Exception {

	for (int key = 0; key < maxParallelism; key++) {
		boolean success = false;
		while (deadline.hasTimeLeft() && !success) {
			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
					deadline,
					client,
					jobId,
					queryableStateName,
					key,
					BasicTypeInfo.INT_TYPE_INFO,
					stateDescriptor,
					false,
					executor);

			Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();

			assertEquals("Key mismatch", key, value.f0.intValue());
			if (expected == value.f1) {
				success = true;
			} else {
				// Retry
				Thread.sleep(RETRY_TIMEOUT);
			}
		}

		assertTrue("Did not succeed query", success);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:AbstractQueryableStateTestBase.java



注:本文中的scala.concurrent.duration.Deadline类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java NormalizationH1类代码示例发布时间:2022-05-22
下一篇:
Java RestartRsHoldingMeta类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap