本文整理汇总了Java中org.apache.beam.sdk.options.PipelineOptionsFactory类的典型用法代码示例。如果您正苦于以下问题:Java PipelineOptionsFactory类的具体用法?Java PipelineOptionsFactory怎么用?Java PipelineOptionsFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PipelineOptionsFactory类属于org.apache.beam.sdk.options包,在下文中一共展示了PipelineOptionsFactory类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/** Run a batch pipeline to calculate hourly team scores. */
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("ReadLogs", TextIO.read().from(options.getInput()))
.apply("SetTimestamps", WithTimestamps.of(new SetTimestampFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(ONE_HOUR)))
.apply("TeamScores", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:18,代码来源:HourlyTeamScore.java
示例2: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World"))
.apply(MapElements.via(new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return input.toUpperCase();
}
}))
.apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
}));
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:StarterPipeline.java
示例3: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/**
* Runs the DatastoreToGcs dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply("JsonToGcs", TextIO.write().to(options.getSavePath())
.withSuffix(".json"));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java
示例4: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/**
* Runs the GcsToDatastore dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestJson", TextIO.read()
.from(options.getJsonPathPrefix()))
.apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply(DatastoreIO.v1().write()
.withProjectId(options.getDatastoreProjectId()));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java
示例5: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServer())
.withTopic(options.getTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampFn(new SetTimestampFn()))
.apply("Values", ParDo.of(new ValuesFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TEN_MINUTES)
.accumulatingFiredPanes())
.apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java
示例6: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(TemplateOptions.class);
TemplateOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(TemplateOptions.class);
options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
.apply(ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String commaSep = c.element().values()
.stream()
.map(cell -> cell.toString().trim())
.collect(Collectors.joining("\",\""));
c.output(commaSep);
}
}))
.apply(TextIO.write().to(options.getOutputFile())
.withoutSharding()
.withWritableByteChannelFactory(GZIP)
);
pipeline.run();
}
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java
示例7: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // forced for this demo
Pipeline p = Pipeline.create(options);
// register Avro coders for serializing our messages
Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);
PCollection<UntypedOccurrence> verbatimRecords = p.apply(
"Read Avro", AvroIO.read(UntypedOccurrence.class).from("demo/output/data*"));
verbatimRecords.apply("Write file per Genus",
AvroIO.write(UntypedOccurrence.class)
.to("demo/output-split/data*") // prefix, is required but overwritten
.to(new GenusDynamicAvroDestinations(
FileSystems.matchNewResource("demo/output-split/data*", true))));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:24,代码来源:MultiAvroOutDemo.java
示例8: testSizes
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testSizes() throws Exception {
SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
PipelineOptions options = PipelineOptionsFactory.create();
SolrIO.Read read =
SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
// can't use equal assert as Solr collections never have same size
// (due to internal Lucene implementation)
long estimatedSize = initialSource.getEstimatedSizeBytes(options);
LOG.info("Estimated size: {}", estimatedSize);
assertThat(
"Wrong estimated size bellow minimum",
estimatedSize,
greaterThan(SolrIOTestUtils.MIN_DOC_SIZE * NUM_DOCS));
assertThat(
"Wrong estimated size beyond maximum",
estimatedSize,
lessThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS));
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:SolrIOTest.java
示例9: testProgressEmptySource
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testProgressEmptySource() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100);
try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
// before starting
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// confirm empty
assertFalse(reader.start());
// after reading empty source
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:OffsetBasedSourceTest.java
示例10: main
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args)
throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p.apply(BigQueryIO.readTableRows().from(options.getInput()))
.apply(new PlaysForWord())
.apply(BigQueryIO.writeTableRows()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:CombinePerKeyExamples.java
示例11: testBoundedToUnboundedSourceAdapterCheckpoint
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
BoundedSource<T> boundedSource,
List<T> expectedElements) throws Exception {
BoundedToUnboundedSourceAdapter<T> unboundedSource =
new BoundedToUnboundedSourceAdapter<>(boundedSource);
PipelineOptions options = PipelineOptionsFactory.create();
BoundedToUnboundedSourceAdapter<T>.Reader reader =
unboundedSource.createReader(options, null);
List<T> actual = Lists.newArrayList();
for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
actual.add(reader.getCurrent());
// checkpoint every 9 elements
if (actual.size() % 9 == 0) {
Checkpoint<T> checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
}
}
Checkpoint<T> checkpointDone = reader.getCheckpointMark();
assertTrue(checkpointDone.getResidualElements() == null
|| checkpointDone.getResidualElements().isEmpty());
assertEquals(expectedElements.size(), actual.size());
assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:UnboundedReadFromBoundedSourceTest.java
示例12: testTranslate
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslate() {
ReadBoundedTranslator translator = new ReadBoundedTranslator();
GearpumpPipelineOptions options =
PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Read.Bounded transform = mock(Read.Bounded.class);
BoundedSource source = mock(BoundedSource.class);
when(transform.getSource()).thenReturn(source);
TranslationContext translationContext = mock(TranslationContext.class);
when(translationContext.getPipelineOptions()).thenReturn(options);
JavaStream stream = mock(JavaStream.class);
PValue mockOutput = mock(PValue.class);
when(translationContext.getOutput()).thenReturn(mockOutput);
when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
translator.translate(transform, translationContext);
verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher()));
verify(translationContext).setOutputStream(mockOutput, stream);
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReadBoundedTranslatorTest.java
示例13: testProgress
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testProgress() throws IOException {
final int numRecords = 5;
@SuppressWarnings("deprecation") // testing CountingSource
BoundedSource<Long> source = CountingSource.upTo(numRecords);
try (BoundedReader<Long> reader = source.createReader(PipelineOptionsFactory.create())) {
// Check preconditions before starting. Note that CountingReader can always give an accurate
// remaining parallelism.
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(numRecords, reader.getSplitPointsRemaining());
assertTrue(reader.start());
int i = 0;
do {
assertEquals(i, reader.getSplitPointsConsumed());
assertEquals(numRecords - i, reader.getSplitPointsRemaining());
++i;
} while (reader.advance());
assertEquals(numRecords, i); // exactly numRecords calls to advance()
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(numRecords, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
}
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:CountingSourceTest.java
示例14: testGetTableSucceeds
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testGetTableSucceeds() throws Exception {
TableReference tableRef = new TableReference()
.setProjectId("projectId")
.setDatasetId("datasetId")
.setTableId("tableId");
Table testTable = new Table();
testTable.setTableReference(tableRef);
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(testTable));
BigQueryServicesImpl.DatasetServiceImpl datasetService =
new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
assertEquals(testTable, table);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:BigQueryServicesImplTest.java
示例15: testMatchingAllCalls
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testMatchingAllCalls() throws Exception {
String[] args = new String[] {STORAGE_TRACE};
GcsOptions options =
PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
options.setGcpCredential(new TestCredential());
assertNotNull(options.getGoogleApiTrace());
Storage.Objects.Get getRequest =
Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
assertEquals("TraceDestination", getRequest.get("$trace"));
Storage.Objects.List listRequest =
Transport.newStorageClient(options).build().objects().list("testProjectId");
assertEquals("TraceDestination", listRequest.get("$trace"));
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:GoogleApiDebugOptionsTest.java
示例16: testPathExistsValidation
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testPathExistsValidation() {
String[] args = new String[] {
"--runner=DataflowRunner",
"--tempLocation=gs://does/not/exist",
"--project=test-project",
"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
};
try {
Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
fail();
} catch (RuntimeException e) {
assertThat(
Throwables.getStackTraceAsString(e),
both(containsString("gs://does/not/exist"))
.and(containsString("does not exist or is not writeable")));
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:DataflowRunnerTest.java
示例17: testIsTableEmptyNoRetryForNotFound
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testIsTableEmptyNoRetryForNotFound() throws IOException, InterruptedException {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(404);
BigQueryServicesImpl.DatasetServiceImpl datasetService =
new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
TableReference tableRef = new TableReference()
.setProjectId("projectId")
.setDatasetId("datasetId")
.setTableId("tableId");
thrown.expect(IOException.class);
thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
try {
datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
} finally {
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:BigQueryServicesImplTest.java
示例18: testInvalidNumberOfWorkerHarnessThreads
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Number of worker harness threads");
thrown.expectMessage("Please make sure the value is non-negative.");
DataflowRunner.fromOptions(options);
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DataflowRunnerTest.java
示例19: testDifferentWindows
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test(expected = IllegalArgumentException.class)
public void testDifferentWindows() throws Exception {
String sql = "SELECT "
+ " order_id, site_id, count(*) as cnt "
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+ " UNION SELECT "
+ " order_id, site_id, count(*) as cnt "
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
// use a real pipeline rather than the TestPipeline because we are
// testing exceptions, the pipeline will not actually run.
Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
compilePipeline(sql, pipeline1, sqlEnv);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BeamSetOperatorRelBaseTest.java
示例20: buildPipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
private static DataflowPipelineOptions buildPipelineOptions(String ... args) throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
@Override
public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
DataflowPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setJobName("some-job-name");
options.setProject("some-project");
options.setRegion("some-region");
options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
options.setFilesToStage(new LinkedList<String>());
options.setGcsUtil(mockGcsUtil);
return options;
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:BatchStatefulParDoOverridesTest.java
注:本文中的org.apache.beam.sdk.options.PipelineOptionsFactory类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论