本文整理汇总了Java中org.apache.flink.api.common.JobExecutionResult类的典型用法代码示例。如果您正苦于以下问题:Java JobExecutionResult类的具体用法?Java JobExecutionResult怎么用?Java JobExecutionResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
JobExecutionResult类属于org.apache.flink.api.common包,在下文中一共展示了JobExecutionResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: execute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public JobExecutionResult execute(String jobName) throws Exception {
OptimizedPlan op = compileProgram(jobName);
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
for (Path jarFile: jarFiles) {
jobGraph.addJar(jarFile);
}
jobGraph.setClasspaths(new ArrayList<>(classPaths));
this.lastJobExecutionResult = jobExecutor.executeJobBlocking(jobGraph);
return this.lastJobExecutionResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:TestEnvironment.java
示例2: postSubmit
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
// Test accumulator results
System.out.println("Accumulator results:");
JobExecutionResult res = this.result;
System.out.println(AccumulatorHelper.getResultsFormatted(res.getAllAccumulatorResults()));
Assert.assertEquals(Integer.valueOf(3), res.getAccumulatorResult("num-lines"));
Assert.assertEquals(Double.valueOf(getParallelism()), res.getAccumulatorResult("open-close-counter"));
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = new HashMap<>();
dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
// Test distinct words (custom accumulator)
Set<StringValue> distinctWords = new HashSet<>();
distinctWords.add(new StringValue("one"));
distinctWords.add(new StringValue("two"));
distinctWords.add(new StringValue("three"));
Assert.assertEquals(distinctWords, res.getAccumulatorResult("distinct-words"));
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:AccumulatorITCase.java
示例3: run
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public void run() {
try {
JobExecutionResult result = JobClient.submitJobAndWait(
clientActorSystem,
cluster.configuration(),
cluster.highAvailabilityServices(),
graph,
timeout,
false,
getClass().getClassLoader());
resultPromise.success(result);
} catch (Exception e) {
// This was not expected... fail the test case
resultPromise.failure(e);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:ZooKeeperLeaderElectionITCase.java
示例4: testProgram
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
private void testProgram(
LocalFlinkMiniCluster localFlinkMiniCluster,
final int dataVolumeGb,
final boolean useForwarder,
final boolean isSlowSender,
final boolean isSlowReceiver,
final int parallelism) throws Exception {
JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
createJobGraph(
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism),
false);
long dataVolumeMbit = dataVolumeGb * 8192;
long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:NetworkStackThroughputITCase.java
示例5: tryExecute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
try {
return see.execute(name);
}
catch (ProgramInvocationException | JobExecutionException root) {
Throwable cause = root.getCause();
// search for nested SuccessExceptions
int depth = 0;
while (!(cause instanceof SuccessException)) {
if (cause == null || depth++ == 20) {
root.printStackTrace();
fail("Test failed: " + root.getMessage());
}
else {
cause = cause.getCause();
}
}
}
return null;
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:TestUtils.java
示例6: execute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
Optimizer pc = new Optimizer(new Configuration());
OptimizedPlan op = pc.compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
String jsonPlan = JsonPlanGenerator.generatePlan(jobGraph);
// first check that the JSON is valid
JsonParser parser = new JsonFactory().createJsonParser(jsonPlan);
while (parser.nextToken() != null) {}
validator.validateJson(jsonPlan);
throw new AbortError();
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:JsonJobGraphGenerationTest.java
示例7: testProgram
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
protected void testProgram() throws Exception {
// test verifying the number of records read and written vs the accumulator counts
readCalls = new ConcurrentLinkedQueue<Integer>();
writeCalls = new ConcurrentLinkedQueue<Integer>();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.createInput(new TestInputFormat(new Path(inputPath))).output(new TestOutputFormat());
JobExecutionResult result = env.execute();
Object a = result.getAllAccumulatorResults().get("DATA_SOURCE_ACCUMULATOR");
Object b = result.getAllAccumulatorResults().get("DATA_SINK_ACCUMULATOR");
long recordsRead = (Long) a;
long recordsWritten = (Long) b;
assertEquals(recordsRead, readCalls.size());
assertEquals(recordsWritten, writeCalls.size());
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:RichInputOutputITCase.java
示例8: execute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}
Plan p = createProgramPlan(jobName);
// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
this.lastJobExecutionResult = result;
return result;
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:LocalEnvironment.java
示例9: testAccumulator
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Test
public void testAccumulator() {
try {
final int numElements = 100;
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
env.generateSequence(1, numElements)
.map(new CountingMapper())
.output(new DiscardingOutputFormat<Long>());
JobExecutionResult result = env.execute();
assertTrue(result.getNetRuntime() >= 0);
assertEquals(numElements, (int) result.getAccumulatorResult(ACCUMULATOR_NAME));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CollectionExecutionAccumulatorsTest.java
示例10: executeRemotely
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
/**
* Executes the remote job.
*
* @param streamGraph
* Stream Graph to execute
* @param jarFiles
* List of jar file URLs to ship to the cluster
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
URL jarUrl;
try {
jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
throw new ProgramInvocationException("Could not write the user code classes to disk.", e);
}
List<URL> allJarFiles = new ArrayList<>(jarFiles.size() + 1);
allJarFiles.addAll(jarFiles);
allJarFiles.add(jarUrl);
return super.executeRemotely(streamGraph, allJarFiles);
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:ScalaShellRemoteStreamEnvironment.java
示例11: executeJobBlocking
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
/**
* This method runs a job in blocking mode. The method returns only after the job
* completed successfully, or after it failed terminally.
*
* @param job The Flink job to execute
* @return The result of the job execution
*
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
MiniClusterJobDispatcher dispatcher;
synchronized (lock) {
checkState(running, "mini cluster is not running");
dispatcher = this.jobDispatcher;
}
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
job.setAllowQueuedScheduling(true);
return dispatcher.runJobBlocking(job);
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:MiniCluster.java
示例12: runJobBlocking
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
/**
* This method runs a job in blocking mode. The method returns only after the job
* completed successfully, or after it failed terminally.
*
* @param job The Flink job to execute
* @return The result of the job execution
*
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job);
LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
synchronized (lock) {
checkState(!shutdown, "mini cluster is shut down");
checkState(runners == null, "mini cluster can only execute one job at a time");
this.runners = startJobRunners(job, sync, sync);
}
try {
return sync.getResult();
}
finally {
// always clear the status for the next job
runners = null;
clearJobRunningState(job.getJobID());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:33,代码来源:MiniClusterJobDispatcher.java
示例13: submitJobAndWait
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
/**
* Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
* the JobManager. The method blocks until the job has finished or the JobManager is no longer
* alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
* case a [[JobExecutionException]] is thrown.
*
* @param actorSystem The actor system that performs the communication.
* @param config The cluster wide configuration.
* @param highAvailabilityServices Service factory for high availability services
* @param jobGraph JobGraph describing the Flink job
* @param timeout Timeout for futures
* @param sysoutLogUpdates prints log updates to system out if true
* @param classLoader The class loader for deserializing the results
* @return The job execution result
* @throws JobExecutionException Thrown if the job
* execution fails.
*/
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) throws JobExecutionException {
JobListeningContext jobListeningContext = submitJob(
actorSystem,
config,
highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
classLoader);
return awaitJobResult(jobListeningContext);
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:JobClient.java
示例14: testSerializationWithNullValues
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Test
public void testSerializationWithNullValues() {
try {
SerializedJobExecutionResult result = new SerializedJobExecutionResult(null, 0L, null);
SerializedJobExecutionResult cloned = CommonTestUtils.createCopySerializable(result);
assertNull(cloned.getJobId());
assertEquals(0L, cloned.getNetRuntime());
assertNull(cloned.getSerializedAccumulatorResults());
JobExecutionResult jResult = result.toJobExecutionResult(getClass().getClassLoader());
assertNull(jResult.getJobID());
assertTrue(jResult.getAllAccumulatorResults().isEmpty());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:SerializedJobExecutionResultTest.java
示例15: retrieveJob
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
/**
* Reattaches to a running from the supplied job id.
* @param jobID The job id of the job to attach to
* @return The JobExecutionResult for the jobID
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
final ActorSystem actorSystem;
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new JobExecutionException(
jobID,
"Could not start the ActorSystem needed to talk to the JobManager.",
fe);
}
final JobListeningContext listeningContext = JobClient.attachToRunningJob(
jobID,
flinkConfig,
actorSystem,
highAvailabilityServices,
timeout,
printStatusDuringExecution);
return JobClient.awaitJobResult(listeningContext);
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:ClusterClient.java
示例16: executeProgram
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program");
final JobSubmissionResult result = client.run(program, parallelism);
if (null == result) {
throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
"ExecutionEnvironment.execute()");
}
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
}
} else {
logAndSysout("Job has been submitted with JobID " + result.getJobID());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:CliFrontend.java
示例17: execute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public JobExecutionResult execute(String jobName) throws Exception {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
if (env instanceof OptimizerPlanEnvironment) {
((OptimizerPlanEnvironment) env).setPlan(streamGraph);
} else if (env instanceof PreviewPlanEnvironment) {
((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
}
throw new OptimizerPlanEnvironment.ProgramAbortException();
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:StreamPlanEnvironment.java
示例18: execute
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull("Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
// execute the programs
if (ctx instanceof DetachedEnvironment) {
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
} else {
return ctx
.getClient()
.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
.getJobExecutionResult();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:StreamContextEnvironment.java
示例19: postSubmit
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
// Test accumulator results
System.out.println("Accumulator results:");
JobExecutionResult res = this.result;
System.out.println(AccumulatorHelper.getResultsFormated(res.getAllAccumulatorResults()));
Assert.assertEquals(new Integer(3), (Integer) res.getAccumulatorResult("num-lines"));
Assert.assertEquals(new Double(getDegreeOfParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
// Test distinct words (custom accumulator)
Set<StringValue> distinctWords = Sets.newHashSet();
distinctWords.add(new StringValue("one"));
distinctWords.add(new StringValue("two"));
distinctWords.add(new StringValue("three"));
Assert.assertEquals(distinctWords, res.getAccumulatorResult("distinct-words"));
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:26,代码来源:AccumulatorITCase.java
示例20: main
import org.apache.flink.api.common.JobExecutionResult; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
WordCountAccumulators wc = new WordCountAccumulators();
if (args.length < 3) {
System.err.println(wc.getDescription());
System.exit(1);
}
Plan plan = wc.getPlan(args);
JobExecutionResult result = LocalExecutor.execute(plan);
// Accumulators can be accessed by their name.
System.out.println("Number of lines counter: "+ result.getAccumulatorResult(TokenizeLine.ACCUM_NUM_LINES));
System.out.println("Words per line histogram: " + result.getAccumulatorResult(TokenizeLine.ACCUM_WORDS_PER_LINE));
System.out.println("Distinct words: " + result.getAccumulatorResult(TokenizeLine.ACCUM_DISTINCT_WORDS));
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:18,代码来源:WordCountAccumulators.java
注:本文中的org.apache.flink.api.common.JobExecutionResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论