• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java ParDo类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.ParDo的典型用法代码示例。如果您正苦于以下问题:Java ParDo类的具体用法?Java ParDo怎么用?Java ParDo使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ParDo类属于org.apache.beam.sdk.transforms包,在下文中一共展示了ParDo类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: filterAlreadyProcessedUrls

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * @param options
 * @param pipeline
 * @param readContent
 * @return
 */
private static PCollection<InputContent> filterAlreadyProcessedUrls(
		PCollection<InputContent> readContent, Pipeline pipeline, 
		IndexerPipelineOptions options) {
	PCollection<InputContent> contentToProcess;
	String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
	PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
		.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
		.apply(ParDo.of(new GetUrlFn()));

	final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
		alreadyProcessedUrls.apply(View.<String,Long>asMap());
	  
	contentToProcess = readContent
		.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
			.withSideInputs(alreadyProcessedUrlsSideInput));
	return contentToProcess;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java


示例2: testUnwritableRemoveContainerPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnwritableRemoveContainerPipeline() throws Exception {

    final Map<String, String> dataConfiguration = singletonMap("repository",
            getClass().getResource("/dataDirectory2").toURI().toString());

    final File root = new File(getClass().getResource("/dataDirectory2").toURI());

    assumeTrue(root.setReadOnly());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(CONTAINER_KV))
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));

    PAssert.that(pCollection).empty();

    pipeline.run();
    root.setWritable(true);
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java


示例3: testUnwritableAddContainerPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnwritableAddContainerPipeline() throws Exception {

    final Map<String, String> dataConfiguration = singletonMap("repository",
            getClass().getResource("/dataDirectory2").toURI().toString());

    final File root = new File(getClass().getResource("/dataDirectory2").toURI());

    assumeTrue(root.setReadOnly());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(CONTAINER_KV))
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), true)));

    PAssert.that(pCollection).empty();

    pipeline.run();
    root.setWritable(true);
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java


示例4: testDefaultCoder

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testDefaultCoder() throws Exception {
  p.enableAbandonedNodeEnforcement(true);

  // Use MyRecord as input and output types without explicitly specifying
  // a coder (this uses the default coders, which may not be
  // SerializableCoder).
  PCollection<String> output =
      p.apply(Create.of("Hello", "World"))
      .apply(ParDo.of(new StringToRecord()))
      .apply(ParDo.of(new RecordToString()));

  PAssert.that(output)
      .containsInAnyOrder("Hello", "World");

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SerializableCoderTest.java


示例5: enrichWithCNLP

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * @param indexes
 * @return
 */
private static PCollection<ContentIndexSummary> enrichWithCNLP(
		PCollection<ContentIndexSummary> indexes, Float ratio) {
	
	PCollectionTuple splitAB = indexes
		.apply(ParDo.of(new SplitAB(ratio))
			.withOutputTags(PipelineTags.BranchA,  
				TupleTagList.of(PipelineTags.BranchB))); 
	
	PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
	PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
	
	PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
		ParDo.of(new EnrichWithCNLPEntities()));
	
	//Merge all collections with WebResource table records
	PCollectionList<ContentIndexSummary> contentIndexSummariesList = 
		PCollectionList.of(branchACol).and(enrichedBCol);
	PCollection<ContentIndexSummary> allIndexSummaries = 
		contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());

	indexes = allIndexSummaries;
	return indexes;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:FileIndexerPipeline.java


示例6: enrichWithCNLP

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * @param filteredIndexes
 * @return
 */
private static PCollection<ContentIndexSummary> enrichWithCNLP(
		PCollection<ContentIndexSummary> filteredIndexes, Float ratio) {
	
	PCollectionTuple splitAB = filteredIndexes
		.apply(ParDo.of(new SplitAB(ratio))
			.withOutputTags(PipelineTags.BranchA,  
				TupleTagList.of(PipelineTags.BranchB))); 
	
	PCollection<ContentIndexSummary> branchACol = splitAB.get(PipelineTags.BranchA);
	PCollection<ContentIndexSummary> branchBCol = splitAB.get(PipelineTags.BranchB);
	
	PCollection<ContentIndexSummary> enrichedBCol = branchBCol.apply(
		ParDo.of(new EnrichWithCNLPEntities()));
	
	//Merge all collections with WebResource table records
	PCollectionList<ContentIndexSummary> contentIndexSummariesList = 
		PCollectionList.of(branchACol).and(enrichedBCol);
	PCollection<ContentIndexSummary> allIndexSummaries = 
		contentIndexSummariesList.apply(Flatten.<ContentIndexSummary>pCollections());

	filteredIndexes = allIndexSummaries;
	return filteredIndexes;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:IndexerPipeline.java


示例7: filterSoftDuplicates

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * @param Document indexes
 * @return a POJO containing 2 PCollections: Unique docs, and Duplicates
 */
private static ContentDuplicateOrNot filterSoftDuplicates(
		PCollection<ContentIndexSummary> indexes) {
	// 
	PCollectionTuple dedupeOrNot = indexes
		.apply("Extract Text grouping key", 
			ParDo.of(new GetContentIndexSummaryKeyFn()))
		.apply("Group by Text grouping key", 
			GroupByKey.<ContentSoftDeduplicationKey, ContentIndexSummary>create())
		.apply("Eliminate Text dupes", 
			ParDo.of(new EliminateTextDupes())
				.withOutputTags(PipelineTags.indexedContentNotToDedupeTag, 
					TupleTagList.of(PipelineTags.indexedContentToDedupeTag))); 	
		
	PCollection<TableRow> dedupedWebresources = 
		dedupeOrNot.get(PipelineTags.indexedContentToDedupeTag)
			.apply(ParDo.of(new CreateWebresourceTableRowFromDupeIndexSummaryFn()));
	
	ContentDuplicateOrNot contentDuplicateOrNot = new ContentDuplicateOrNot(
		dedupeOrNot.get(PipelineTags.indexedContentNotToDedupeTag),
		dedupedWebresources);
	
	return contentDuplicateOrNot;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:28,代码来源:IndexerPipeline.java


示例8: main

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * Runs the DatastoreToGcs dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestEntities",
          DatastoreIO.v1().read()
              .withProjectId(options.getDatastoreProjectId())
              .withLiteralGqlQuery(options.getGqlQuery())
              .withNamespace(options.getNamespace()))
      .apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply("JsonToGcs", TextIO.write().to(options.getSavePath())
          .withSuffix(".json"));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java


示例9: main

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * Runs the GcsToDatastore dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestJson", TextIO.read()
          .from(options.getJsonPathPrefix()))
      .apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply(DatastoreIO.v1().write()
          .withProjectId(options.getDatastoreProjectId()));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java


示例10: testCachePipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testCachePipeline() throws Exception {

    final KV<String, String> kv = KV.of("trellis:repository/resource", null);

    final Map<String, String> dataConfiguration = singletonMap("repository",
            getClass().getResource("/root").toURI().toString());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new CacheWriter(dataConfiguration)));

    PAssert.that(pCollection).containsInAnyOrder(asList(kv));

    pipeline.run();
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java


示例11: testUnableToCachePipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testUnableToCachePipeline() throws Exception {

    final KV<String, String> kv = KV.of("trellis:repository/some-other-resource", null);

    final Map<String, String> dataConfiguration = singletonMap("repository",
            getClass().getResource("/root").toURI().toString());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new CacheWriter(dataConfiguration)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java


示例12: testInvalidDirectoryPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDirectoryPipeline() throws Exception {

    final KV<String, String> kv = KV.of("trellis:repository/resource", null);

    final Map<String, String> dataConfiguration = singletonMap("foo",
            getClass().getResource("/root").toURI().toString());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new CacheWriter(dataConfiguration)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:18,代码来源:CacheWriterTest.java


示例13: testInvalidDataPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {

    final String dataset = "<trellis:repository/resource> " +
        "<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
        "<http://www.w3.org/ns/ldp#PreferConta";
    final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);

    final Map<String, String> dataConfiguration = singletonMap("repository",
            getClass().getResource("/dataDirectory").toURI().toString());

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:21,代码来源:BeamProcessorTest.java


示例14: testInvalidDataPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {

    final String dataset = "<trellis:repository/resource> " +
        "<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
        "<http://www.w3.org/ns/ldp#PreferConta";
    final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);

    final Map<String, String> dataConfiguration = singletonMap("repository", "http://localhost/");

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new EventProcessor(dataConfiguration)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:20,代码来源:EventProcessorTest.java


示例15: main

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    pipeline
    .apply(KafkaIO.<String, String>read()
        .withBootstrapServers(options.getKafkaBootstrapServer())
        .withTopic(options.getTopic())
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withTimestampFn(new SetTimestampFn()))
    .apply("Values", ParDo.of(new ValuesFn()))

    .apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TWO_MINUTES))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TEN_MINUTES)
        .accumulatingFiredPanes())

    .apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));

    pipeline.run();
  }
 
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java


示例16: testInvalidDataPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {

    final String dataset = "<trellis:repository/resource> " +
        "<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
        "<http://www.w3.org/ns/ldp#PreferConta";
    final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);

    final String dataConfiguration = getClass().getResource("/dataDirectory").toURI().toString();

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp,项目名称:trellis-rosid,代码行数:20,代码来源:BeamProcessorTest.java


示例17: testInvalidDataPipeline

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testInvalidDataPipeline() throws Exception {

    final String dataset = "<trellis:repository/resource> " +
        "<http://purl.org/dc/terms/subject> <trellis:repository/resource/member> " +
        "<http://www.w3.org/ns/ldp#PreferConta";
    final KV<String, String> kv = KV.of("trellis:repository/resource", dataset);

    final String dataConfiguration = "http://localhost/";

    final PCollection<KV<String, String>> pCollection = pipeline
        .apply("Create", Create.of(kv))
        .apply(ParDo.of(new EventProcessor(dataConfiguration)));

    PAssert.that(pCollection).empty();

    pipeline.run();
}
 
开发者ID:trellis-ldp,项目名称:trellis-rosid,代码行数:20,代码来源:EventProcessorTest.java


示例18: main

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    PipelineOptionsFactory.register(TemplateOptions.class);
    TemplateOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(TemplateOptions.class);
    options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
            .apply(ParDo.of(new DoFn<TableRow, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    String commaSep = c.element().values()
                            .stream()
                            .map(cell -> cell.toString().trim())
                            .collect(Collectors.joining("\",\""));
                    c.output(commaSep);
                }
            }))
            .apply(TextIO.write().to(options.getOutputFile())
                    .withoutSharding()
                    .withWritableByteChannelFactory(GZIP)
            );
    pipeline.run();
}
 
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java


示例19: main

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
public static void main(String[] args) {
  Configuration conf = new Configuration(); // assume defaults on CP
  conf.setClass("mapreduce.job.inputformat.class", DwCAInputFormat.class, InputFormat.class);
  conf.setStrings("mapreduce.input.fileinputformat.inputdir", "hdfs://ha-nn/tmp/dwca-lep5.zip");
  conf.setClass("key.class", Text.class, Object.class);
  conf.setClass("value.class", ExtendedRecord.class, Object.class);

  Pipeline p = newPipeline(args, conf);
  Coders.registerAvroCoders(p, UntypedOccurrence.class, TypedOccurrence.class, ExtendedRecord.class);

  PCollection<KV<Text, ExtendedRecord>> rawRecords =
    p.apply("Read DwC-A", HadoopInputFormatIO.<Text, ExtendedRecord>read().withConfiguration(conf));

  PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
    "Convert to Avro", ParDo.of(fromExtendedRecordKVP()));

  verbatimRecords.apply(
    "Write Avro files", AvroIO.write(UntypedOccurrence.class).to("hdfs://ha-nn/tmp/dwca-lep5.avro"));

  LOG.info("Starting the pipeline");
  PipelineResult result = p.run();
  result.waitUntilFinish();
  LOG.info("Pipeline finished with state: {} ", result.getState());
}
 
开发者ID:gbif,项目名称:pipelines,代码行数:25,代码来源:DwCA2AvroPipeline.java


示例20: createPredefinedStep

import org.apache.beam.sdk.transforms.ParDo; //导入依赖的package包/类
/**
 * Returns a Step for a {@link DoFn} by creating and translating a pipeline.
 */
private static Step createPredefinedStep() throws Exception {
  DataflowPipelineOptions options = buildPipelineOptions();
  DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
  Pipeline pipeline = Pipeline.create(options);
  String stepName = "DoFn1";
  pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
      .apply(stepName, ParDo.of(new NoOpFn()))
      .apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
  DataflowRunner runner = DataflowRunner.fromOptions(options);
  runner.replaceTransforms(pipeline);
  Job job =
      translator
          .translate(
              pipeline,
              runner,
              Collections.<DataflowPackage>emptyList())
          .getJob();

  assertEquals(8, job.getSteps().size());
  Step step = job.getSteps().get(1);
  assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
  assertAllStepOutputsHaveUniqueIds(job);
  return step;
}
 
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:DataflowPipelineTranslatorTest.java



注:本文中的org.apache.beam.sdk.transforms.ParDo类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java LootEntryItem类代码示例发布时间:2022-05-21
下一篇:
Java Invocation类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap