本文整理汇总了Java中org.apache.beam.sdk.transforms.ParDo类的典型用法代码示例。如果您正苦于以下问题:Java ParDo类的具体用法?Java ParDo怎么用?Java ParDo使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ParDo类属于org.apache.beam.sdk.transforms包,在下文中一共展示了ParDo类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: filterAlreadyProcessedUrls
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* @param options
* @param pipeline
* @param readContent
* @return
*/
private static PCollection<InputContent> filterAlreadyProcessedUrls(
PCollection<InputContent> readContent, Pipeline pipeline,
IndexerPipelineOptions options) {
PCollection<InputContent> contentToProcess;
String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetUrlFn()));
final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
alreadyProcessedUrls.apply(View.<String,Long>asMap());
contentToProcess = readContent
.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
.withSideInputs(alreadyProcessedUrlsSideInput));
return contentToProcess;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java
示例2: testUnwritableRemoveContainerPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnwritableRemoveContainerPipeline() throws Exception {
final Map<String, String> dataConfiguration = singletonMap("repository",
getClass().getResource("/dataDirectory2").toURI().toString());
final File root = new File(getClass().getResource("/dataDirectory2").toURI());
assumeTrue(root.setReadOnly());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(CONTAINER_KV))
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));
PAssert.that(pCollection).empty();
pipeline.run();
root.setWritable(true);
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java
示例3: testUnwritableAddContainerPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnwritableAddContainerPipeline() throws Exception {
final Map<String, String> dataConfiguration = singletonMap("repository",
getClass().getResource("/dataDirectory2").toURI().toString());
final File root = new File(getClass().getResource("/dataDirectory2").toURI());
assumeTrue(root.setReadOnly());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(CONTAINER_KV))
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), true)));
PAssert.that(pCollection).empty();
pipeline.run();
root.setWritable(true);
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java
示例4: testDefaultCoder
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testDefaultCoder() throws Exception {
p.enableAbandonedNodeEnforcement(true);
// Use MyRecord as input and output types without explicitly specifying
// a coder (this uses the default coders, which may not be
// SerializableCoder).
PCollection<String> output =
p.apply(Create.of("Hello", "World"))
.apply(ParDo.of(new StringToRecord()))
.apply(ParDo.of(new RecordToString()));
PAssert.that(output)
.containsInAnyOrder("Hello", "World");
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SerializableCoderTest.java
示例5: enrichWithCNLP
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* @param indexes
* @return
*/
private static PCollection<ContentIndexSummary> enrichWithCNLP(
PCollection<ContentIndexSummary> indexes, Float ratio) {
PCollectionTuple splitAB = indexes
.apply(ParDo.of(new SplitAB(ratio))
.withOutputTags(PipelineTags.BranchA,
TupleTagList.of(PipelineTags.BranchB)));
PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
ParDo.of(new EnrichWithCNLPEntities()));
//Merge all collections with WebResource table records
PCollectionList<ContentIndexSummary> contentIndexSummariesList =
PCollectionList.of(branchACol).and(enrichedBCol);
PCollection<ContentIndexSummary> allIndexSummaries =
contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());
indexes = allIndexSummaries;
return indexes;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:FileIndexerPipeline.java
示例6: enrichWithCNLP
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* @param filteredIndexes
* @return
*/
private static PCollection<ContentIndexSummary> enrichWithCNLP(
PCollection<ContentIndexSummary> filteredIndexes, Float ratio) {
PCollectionTuple splitAB = filteredIndexes
.apply(ParDo.of(new SplitAB(ratio))
.withOutputTags(PipelineTags.BranchA,
TupleTagList.of(PipelineTags.BranchB)));
PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
ParDo.of(new EnrichWithCNLPEntities()));
//Merge all collections with WebResource table records
PCollectionList<ContentIndexSummary> contentIndexSummariesList =
PCollectionList.of(branchACol).and(enrichedBCol);
PCollection<ContentIndexSummary> allIndexSummaries =
contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());
filteredIndexes = allIndexSummaries;
return filteredIndexes;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:IndexerPipeline.java
示例7: filterSoftDuplicates
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* @param Document indexes
* @return a POJO containing 2 PCollections: Unique docs, and Duplicates
*/
private static ContentDuplicateOrNot filterSoftDuplicates(
PCollection<ContentIndexSummary> indexes) {
//
PCollectionTuple dedupeOrNot = indexes
.apply("Extract Text grouping key",
ParDo.of(new GetContentIndexSummaryKeyFn()))
.apply("Group by Text grouping key",
GroupByKey.<ContentSoftDeduplicationKey, ContentIndexSummary>create())
.apply("Eliminate Text dupes",
ParDo.of(new EliminateTextDupes())
.withOutputTags(PipelineTags.indexedContentNotToDedupeTag,
TupleTagList.of(PipelineTags.indexedContentToDedupeTag)));
PCollection<TableRow> dedupedWebresources =
dedupeOrNot.get(PipelineTags.indexedContentToDedupeTag)
.apply(ParDo.of(new CreateWebresourceTableRowFromDupeIndexSummaryFn()));
ContentDuplicateOrNot contentDuplicateOrNot = new ContentDuplicateOrNot(
dedupeOrNot.get(PipelineTags.indexedContentNotToDedupeTag),
dedupedWebresources);
return contentDuplicateOrNot;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:IndexerPipeline.java
示例8: main
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* Runs the DatastoreToGcs dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply("JsonToGcs", TextIO.write().to(options.getSavePath())
.withSuffix(".json"));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java
示例9: main
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* Runs the GcsToDatastore dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestJson", TextIO.read()
.from(options.getJsonPathPrefix()))
.apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.build()))
.apply(DatastoreIO.v1().write()
.withProjectId(options.getDatastoreProjectId()));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java
示例10: testCachePipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testCachePipeline() throws Exception {
final KV<String, String> kv = KV.of("trellis:repository/resource", null);
final Map<String, String> dataConfiguration = singletonMap("repository",
getClass().getResource("/root").toURI().toString());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new CacheWriter(dataConfiguration)));
PAssert.that(pCollection).containsInAnyOrder(asList(kv));
pipeline.run();
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java
示例11: testUnableToCachePipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnableToCachePipeline() throws Exception {
final KV<String, String> kv = KV.of("trellis:repository/some-other-resource", null);
final Map<String, String> dataConfiguration = singletonMap("repository",
getClass().getResource("/root").toURI().toString());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new CacheWriter(dataConfiguration)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java
示例12: testInvalidDirectoryPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDirectoryPipeline() throws Exception {
final KV<String, String> kv = KV.of("trellis:repository/resource", null);
final Map<String, String> dataConfiguration = singletonMap("foo",
getClass().getResource("/root").toURI().toString());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new CacheWriter(dataConfiguration)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java
示例13: testInvalidDataPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {
final String dataset = "<trellis:repository/resource> " +
"<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
"<http://www.w3.org/ns/ldp#PreferConta";
final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);
final Map<String, String> dataConfiguration = singletonMap("repository",
getClass().getResource("/dataDirectory").toURI().toString());
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java
示例14: testInvalidDataPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {
final String dataset = "<trellis:repository/resource> " +
"<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
"<http://www.w3.org/ns/ldp#PreferConta";
final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);
final Map<String, String> dataConfiguration = singletonMap("repository", "http://localhost/");
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new EventProcessor(dataConfiguration)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:20,代码来源:EventProcessorTest.java
示例15: main
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServer())
.withTopic(options.getTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampFn(new SetTimestampFn()))
.apply("Values", ParDo.of(new ValuesFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TEN_MINUTES)
.accumulatingFiredPanes())
.apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java
示例16: testInvalidDataPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {
final String dataset = "<trellis:repository/resource> " +
"<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
"<http://www.w3.org/ns/ldp#PreferConta";
final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);
final String dataConfiguration = getClass().getResource("/dataDirectory").toURI().toString();
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp,项目名称:trellis-rosid,代码行数:20,代码来源:BeamProcessorTest.java
示例17: testInvalidDataPipeline
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {
final String dataset = "<trellis:repository/resource> " +
"<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
"<http://www.w3.org/ns/ldp#PreferConta";
final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);
final String dataConfiguration = "http://localhost/";
final PCollection<KV<String, String>> pCollection = pipeline
.apply("Create", Create.of(kv))
.apply(ParDo.of(new EventProcessor(dataConfiguration)));
PAssert.that(pCollection).empty();
pipeline.run();
}
开发者ID:trellis-ldp,项目名称:trellis-rosid,代码行数:20,代码来源:EventProcessorTest.java
示例18: main
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(TemplateOptions.class);
TemplateOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(TemplateOptions.class);
options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
.apply(ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String commaSep = c.element().values()
.stream()
.map(cell -> cell.toString().trim())
.collect(Collectors.joining("\",\""));
c.output(commaSep);
}
}))
.apply(TextIO.write().to(options.getOutputFile())
.withoutSharding()
.withWritableByteChannelFactory(GZIP)
);
pipeline.run();
}
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java
示例19: main
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) {
Configuration conf = new Configuration(); // assume defaults on CP
conf.setClass("mapreduce.job.inputformat.class", DwCAInputFormat.class, InputFormat.class);
conf.setStrings("mapreduce.input.fileinputformat.inputdir", "hdfs://ha-nn/tmp/dwca-lep5.zip");
conf.setClass("key.class", Text.class, Object.class);
conf.setClass("value.class", ExtendedRecord.class, Object.class);
Pipeline p = newPipeline(args, conf);
Coders.registerAvroCoders(p, UntypedOccurrence.class, TypedOccurrence.class, ExtendedRecord.class);
PCollection<KV<Text, ExtendedRecord>> rawRecords =
p.apply("Read DwC-A", HadoopInputFormatIO.<Text, ExtendedRecord>read().withConfiguration(conf));
PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
"Convert to Avro", ParDo.of(fromExtendedRecordKVP()));
verbatimRecords.apply(
"Write Avro files", AvroIO.write(UntypedOccurrence.class).to("hdfs://ha-nn/tmp/dwca-lep5.avro"));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:25,代码来源:DwCA2AvroPipeline.java
示例20: createPredefinedStep
import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
* Returns a Step for a {@link DoFn} by creating and translating a pipeline.
*/
private static Step createPredefinedStep() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
String stepName = "DoFn1";
pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(stepName, ParDo.of(new NoOpFn()))
.apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
Job job =
translator
.translate(
pipeline,
runner,
Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(8, job.getSteps().size());
Step step = job.getSteps().get(1);
assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
assertAllStepOutputsHaveUniqueIds(job);
return step;
}
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:DataflowPipelineTranslatorTest.java
注:本文中的org.apache.beam.sdk.transforms.ParDo类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论