本文整理汇总了Java中org.apache.beam.runners.dataflow.DataflowRunner类的典型用法代码示例。如果您正苦于以下问题:Java DataflowRunner类的具体用法?Java DataflowRunner怎么用?Java DataflowRunner使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
DataflowRunner类属于org.apache.beam.runners.dataflow包,在下文中一共展示了DataflowRunner类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
* Runs the DatastoreToGcs dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply("JsonToGcs", TextIO.write().to(options.getSavePath())
.withSuffix(".json"));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java
示例2: main
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
* Runs the GcsToDatastore dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestJson", TextIO.read()
.from(options.getJsonPathPrefix()))
.apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply(DatastoreIO.v1().write()
.withProjectId(options.getDatastoreProjectId()));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java
示例3: deploy
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
public void deploy() {
// We can't store options as a member variable due to serialization concerns.
InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
options.setProject(projectId);
options.setRunner(DataflowRunner.class);
options.setStagingLocation(beamBucket + "/staging");
options.setTemplateLocation(beamBucket + "/templates/invoicing");
Pipeline p = Pipeline.create(options);
PCollection<BillingEvent> billingEvents =
p.apply(
"Read BillingEvents from Bigquery",
BigQueryIO.read(BillingEvent::parseFromRecord)
.fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
.withCoder(SerializableCoder.of(BillingEvent.class))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility());
applyTerminalTransforms(billingEvents, options.getYearMonth());
p.run();
}
开发者ID:google,项目名称:nomulus,代码行数:23,代码来源:InvoicingPipeline.java
示例4: main
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
* Runs the DatastoreToBigQuery dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
NestedValueProvider<String, String> bqJsonSchema = NestedValueProvider
.of(options.getBqJsonSchema(), new ValueProviderHelpers.GcsLoad());
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToTableRow", ParDo.of(EntityToTableRow.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.setStrictCast(options.getStrictCast())
.setTableSchemaJson(bqJsonSchema)
.build()))
.apply("TableRowToBigQuery", BigQueryIO.writeTableRows()
.to(options.getBqTableSpec())
.withJsonSchema(bqJsonSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:33,代码来源:DatastoreToBq.java
示例5: createTestServiceRunner
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
* Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
* is not expanded. This is used for verifying that even without expansion the proper errors show
* up.
*/
private Pipeline createTestServiceRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:DataflowGroupByKeyTest.java
示例6: createTestBatchRunner
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
private Pipeline createTestBatchRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:DataflowViewTest.java
示例7: createTestStreamingRunner
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
private Pipeline createTestStreamingRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:DataflowViewTest.java
示例8: main
import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
* <p>Creates a dataflow pipeline that creates the following chain:</p>
* <ol>
* <li> Reads from a Cloud Pubsub topic
* <li> Window into fixed windows of 1 minute
* <li> Applies word count transform
* <li> Creates Puts from each of the word counts in the array
* <li> Performs a Bigtable Put on the items
* </ol>
*
* @param args Arguments to use to configure the Dataflow Pipeline. The first three are required
* when running via managed resource in Google Cloud Platform. Those options should be omitted
* for LOCAL runs. The next four arguments are to configure the Bigtable connection. The last
* two items are for Cloud Pubsub.
* --runner=BlockingDataflowPipelineRunner
* --project=[dataflow project] \\
* --stagingLocation=gs://[your google storage bucket] \\
* --bigtableProjectId=[bigtable project] \\
* --bigtableInstanceId=[bigtable instance id] \\
* --bigtableTableId=[bigtable tableName]
* --inputFile=[file path on GCS]
* --pubsubTopic=projects/[project name]/topics/[topic name]
*/
public static void main(String[] args) throws Exception {
// CloudBigtableOptions is one way to retrieve the options. It's not required.
BigtablePubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);
// CloudBigtableTableConfiguration contains the project, instance and table to connect to.
CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
// In order to cancel the pipelines automatically,
// DataflowPipelineRunner is forced to be used.
// Also enables the 2 jobs to run at the same time.
options.setRunner(DataflowRunner.class);
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));
p
.apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
.apply(Window.<String> into(window))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String> perElement())
.apply(ParDo.of(MUTATION_TRANSFORM))
.apply(CloudBigtableIO.writeToTable(config));
p.run().waitUntilFinish();
// Start a second job to inject messages into a Cloud Pubsub topic
injectMessages(options);
}
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:60,代码来源:PubsubWordCount.java
注:本文中的org.apache.beam.runners.dataflow.DataflowRunner类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论