本文整理汇总了Java中org.apache.beam.sdk.Pipeline类的典型用法代码示例。如果您正苦于以下问题:Java Pipeline类的具体用法?Java Pipeline怎么用?Java Pipeline使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Pipeline类属于org.apache.beam.sdk包,在下文中一共展示了Pipeline类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: filterAlreadyProcessedUrls
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
/**
* @param options
* @param pipeline
* @param readContent
* @return
*/
private static PCollection<InputContent> filterAlreadyProcessedUrls(
PCollection<InputContent> readContent, Pipeline pipeline,
IndexerPipelineOptions options) {
PCollection<InputContent> contentToProcess;
String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetUrlFn()));
final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
alreadyProcessedUrls.apply(View.<String,Long>asMap());
contentToProcess = readContent
.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
.withSideInputs(alreadyProcessedUrlsSideInput));
return contentToProcess;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java
示例2: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
/** Run a batch pipeline to calculate hourly team scores. */
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("ReadLogs", TextIO.read().from(options.getInput()))
.apply("SetTimestamps", WithTimestamps.of(new SetTimestampFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(ONE_HOUR)))
.apply("TeamScores", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:18,代码来源:HourlyTeamScore.java
示例3: buildIOReader
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Override
public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
KafkaIO.Read<byte[], byte[]> kafkaRead = null;
if (topics != null) {
kafkaRead = KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopics(topics)
.updateConsumerProperties(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else if (topicPartitions != null) {
kafkaRead = KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopicPartitions(topicPartitions)
.updateConsumerProperties(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else {
throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
}
return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata())
.apply("in_format", getPTransformForInput());
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:BeamKafkaTable.java
示例4: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
/**
* Runs the DatastoreToGcs dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply("JsonToGcs", TextIO.write().to(options.getSavePath())
.withSuffix(".json"));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java
示例5: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
/**
* Runs the GcsToDatastore dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestJson", TextIO.read()
.from(options.getJsonPathPrefix()))
.apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply(DatastoreIO.v1().write()
.withProjectId(options.getDatastoreProjectId()));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java
示例6: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServer())
.withTopic(options.getTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampFn(new SetTimestampFn()))
.apply("Values", ParDo.of(new ValuesFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TEN_MINUTES)
.accumulatingFiredPanes())
.apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java
示例7: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(TemplateOptions.class);
TemplateOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(TemplateOptions.class);
options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
.apply(ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String commaSep = c.element().values()
.stream()
.map(cell -> cell.toString().trim())
.collect(Collectors.joining("\",\""));
c.output(commaSep);
}
}))
.apply(TextIO.write().to(options.getOutputFile())
.withoutSharding()
.withWritableByteChannelFactory(GZIP)
);
pipeline.run();
}
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java
示例8: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // forced for this demo
Pipeline p = Pipeline.create(options);
// register Avro coders for serializing our messages
Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);
PCollection<UntypedOccurrence> verbatimRecords = p.apply(
"Read Avro", AvroIO.read(UntypedOccurrence.class).from("demo/output/data*"));
verbatimRecords.apply("Write file per Genus",
AvroIO.write(UntypedOccurrence.class)
.to("demo/output-split/data*") // prefix, is required but overwritten
.to(new GenusDynamicAvroDestinations(
FileSystems.matchNewResource("demo/output-split/data*", true))));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:24,代码来源:MultiAvroOutDemo.java
示例9: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static void main(String[] args) {
Configuration conf = new Configuration(); // assume defaults on CP
conf.setClass("mapreduce.job.inputformat.class", DwCAInputFormat.class, InputFormat.class);
conf.setStrings("mapreduce.input.fileinputformat.inputdir", "hdfs://ha-nn/tmp/dwca-lep5.zip");
conf.setClass("key.class", Text.class, Object.class);
conf.setClass("value.class", ExtendedRecord.class, Object.class);
Pipeline p = newPipeline(args, conf);
Coders.registerAvroCoders(p, UntypedOccurrence.class, TypedOccurrence.class, ExtendedRecord.class);
PCollection<KV<Text, ExtendedRecord>> rawRecords =
p.apply("Read DwC-A", HadoopInputFormatIO.<Text, ExtendedRecord>read().withConfiguration(conf));
PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
"Convert to Avro", ParDo.of(fromExtendedRecordKVP()));
verbatimRecords.apply(
"Write Avro files", AvroIO.write(UntypedOccurrence.class).to("hdfs://ha-nn/tmp/dwca-lep5.avro"));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:25,代码来源:DwCA2AvroPipeline.java
示例10: test
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test
public void test() throws IOException {
TestSparkPipelineOptions options =
PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
options.setRunner(TestSparkRunner.class);
options.setForceStreaming(true);
// pipeline with a bounded read.
Pipeline pipeline = Pipeline.create(options);
// apply the BoundedReadFromUnboundedSource.
BoundedReadFromUnboundedSource<?> boundedRead =
Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
pipeline.apply(boundedRead);
// adapt reads
TestSparkRunner runner = TestSparkRunner.fromOptions(options);
runner.adaptBoundedReads(pipeline);
UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
pipeline.traverseTopologically(unboundedReadDetector);
// assert that the applied BoundedReadFromUnboundedSource
// is being treated as an unbounded read.
assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded);
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:ForceStreamingTest.java
示例11: fromProto
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) throws IOException {
TransformHierarchy transforms = new TransformHierarchy();
Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
// Keeping the PCollections straight is a semantic necessity, but being careful not to explode
// the number of coders and windowing strategies is also nice, and helps testing.
RehydratedComponents rehydratedComponents =
RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
for (String rootId : pipelineProto.getRootTransformIdsList()) {
addRehydratedTransform(
transforms,
pipelineProto.getComponents().getTransformsOrThrow(rootId),
pipeline,
pipelineProto.getComponents().getTransformsMap(),
rehydratedComponents);
}
return pipeline;
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:PipelineTranslation.java
示例12: verifySetStateUnsupported
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
.apply(
ParDo.of(
new DoFn<KV<Integer, Integer>, Void>() {
@StateId("fizzle")
private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
@ProcessElement
public void process() {}
}));
thrown.expectMessage("SetState");
thrown.expect(UnsupportedOperationException.class);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:DataflowRunnerTest.java
示例13: main
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
public static void main(String[] args)
throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p.apply(BigQueryIO.readTableRows().from(options.getInput()))
.apply(new PlaysForWord())
.apply(BigQueryIO.writeTableRows()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:CombinePerKeyExamples.java
示例14: testStreamingOnCreateMatcher
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test
public void testStreamingOnCreateMatcher() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
));
runner.run(p, mockRunner);
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:TestDataflowRunnerTest.java
示例15: testWithInvalidContext
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
private void testWithInvalidContext(JavaSparkContext jsc) {
SparkContextOptions options = getSparkContextOptions(jsc);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
try {
p.run().waitUntilFinish();
fail("Should throw an exception when The provided Spark context is null or stopped");
} catch (RuntimeException e){
assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
}
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:ProvidedSparkContextTest.java
示例16: testRunStreamingJobNotUsingPAssertThatSucceeds
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test
public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.DONE);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.run(p, mockRunner);
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:TestDataflowRunnerTest.java
示例17: execute
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
/**
* Executes the given sql.
*/
public void execute(String sqlString) throws Exception {
BeamSqlParser parser = new BeamSqlParser(sqlString);
SqlNode sqlNode = parser.impl().parseSqlStmtEof();
if (sqlNode instanceof SqlCreateTable) {
handleCreateTable((SqlCreateTable) sqlNode, metaStore);
} else {
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
.as(PipelineOptions.class);
options.setJobName("BeamPlanCreator");
Pipeline pipeline = Pipeline.create(options);
compilePipeline(sqlString, pipeline, env);
pipeline.run();
}
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:BeamSqlCli.java
示例18: testSettingOfPipelineOptionsWithCustomUserType
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test
public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.as(JacksonIncompatibleOptions.class).setJacksonIncompatible(
new JacksonIncompatible("userCustomTypeTest"));
Pipeline p = Pipeline.create(options);
p.run();
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
Map<String, Object> sdkPipelineOptions =
jobCaptor.getValue().getEnvironment().getSdkPipelineOptions();
assertThat(sdkPipelineOptions, hasKey("options"));
Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options");
assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest"));
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DataflowRunnerTest.java
示例19: testDifferentWindows
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test(expected = IllegalArgumentException.class)
public void testDifferentWindows() throws Exception {
String sql = "SELECT "
+ " order_id, site_id, count(*) as cnt "
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+ " UNION SELECT "
+ " order_id, site_id, count(*) as cnt "
+ "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
// use a real pipeline rather than the TestPipeline because we are
// testing exceptions, the pipeline will not actually run.
Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
compilePipeline(sql, pipeline1, sqlEnv);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BeamSetOperatorRelBaseTest.java
示例20: testFnApiMultiOutputOverrideNonCrashing
import org.apache.beam.sdk.Pipeline; //导入依赖的package包/类
@Test
@Ignore("TODO: BEAM-2902 Add support for user state in a ParDo.Multi once PTransformMatcher "
+ "exposes a way to know when the replacement is not required by checking that the "
+ "preceding ParDos to a GBK are key preserving.")
public void testFnApiMultiOutputOverrideNonCrashing() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
DummyStatefulDoFn fn = new DummyStatefulDoFn();
pipeline
.apply(Create.of(KV.of(1, 2)))
.apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:BatchStatefulParDoOverridesTest.java
注:本文中的org.apache.beam.sdk.Pipeline类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论