• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java PipelineOptionsFactory类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.options.PipelineOptionsFactory的典型用法代码示例。如果您正苦于以下问题:Java PipelineOptionsFactory类的具体用法?Java PipelineOptionsFactory怎么用?Java PipelineOptionsFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



PipelineOptionsFactory类属于org.apache.beam.sdk.options包,在下文中一共展示了PipelineOptionsFactory类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/** Run a batch pipeline to calculate hourly team scores. */
public static void main(String[] args) throws Exception {

  Options options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  pipeline
  .apply("ReadLogs", TextIO.read().from(options.getInput()))
  .apply("SetTimestamps", WithTimestamps.of(new SetTimestampFn()))

  .apply("FixedWindows", Window.<String>into(FixedWindows.of(ONE_HOUR)))

  .apply("TeamScores", new CalculateTeamScores(options.getOutputPrefix()));

  pipeline.run();
}
 
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:18,代码来源:HourlyTeamScore.java


示例2: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) {
  Pipeline p = Pipeline.create(
      PipelineOptionsFactory.fromArgs(args).withValidation().create());

  p.apply(Create.of("Hello", "World"))
  .apply(MapElements.via(new SimpleFunction<String, String>() {
    @Override
    public String apply(String input) {
      return input.toUpperCase();
    }
  }))
  .apply(ParDo.of(new DoFn<String, Void>() {
    @ProcessElement
    public void processElement(ProcessContext c)  {
      LOG.info(c.element());
    }
  }));

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:StarterPipeline.java


示例3: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/**
 * Runs the DatastoreToGcs dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestEntities",
          DatastoreIO.v1().read()
              .withProjectId(options.getDatastoreProjectId())
              .withLiteralGqlQuery(options.getGqlQuery())
              .withNamespace(options.getNamespace()))
      .apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply("JsonToGcs", TextIO.write().to(options.getSavePath())
          .withSuffix(".json"));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java


示例4: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
/**
 * Runs the GcsToDatastore dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestJson", TextIO.read()
          .from(options.getJsonPathPrefix()))
      .apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply(DatastoreIO.v1().write()
          .withProjectId(options.getDatastoreProjectId()));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java


示例5: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    pipeline
    .apply(KafkaIO.<String, String>read()
        .withBootstrapServers(options.getKafkaBootstrapServer())
        .withTopic(options.getTopic())
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withTimestampFn(new SetTimestampFn()))
    .apply("Values", ParDo.of(new ValuesFn()))

    .apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TWO_MINUTES))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TEN_MINUTES)
        .accumulatingFiredPanes())

    .apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));

    pipeline.run();
  }
 
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java


示例6: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    PipelineOptionsFactory.register(TemplateOptions.class);
    TemplateOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(TemplateOptions.class);
    options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
            .apply(ParDo.of(new DoFn<TableRow, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    String commaSep = c.element().values()
                            .stream()
                            .map(cell -> cell.toString().trim())
                            .collect(Collectors.joining("\",\""));
                    c.output(commaSep);
                }
            }))
            .apply(TextIO.write().to(options.getOutputFile())
                    .withoutSharding()
                    .withWritableByteChannelFactory(GZIP)
            );
    pipeline.run();
}
 
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java


示例7: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  options.setRunner(DirectRunner.class); // forced for this demo
  Pipeline p = Pipeline.create(options);

  // register Avro coders for serializing our messages
  Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);

  PCollection<UntypedOccurrence> verbatimRecords = p.apply(
    "Read Avro", AvroIO.read(UntypedOccurrence.class).from("demo/output/data*"));

  verbatimRecords.apply("Write file per Genus",
                        AvroIO.write(UntypedOccurrence.class)
                              .to("demo/output-split/data*") // prefix, is required but overwritten
                              .to(new GenusDynamicAvroDestinations(
                                FileSystems.matchNewResource("demo/output-split/data*", true))));


  LOG.info("Starting the pipeline");
  PipelineResult result = p.run();
  result.waitUntilFinish();
  LOG.info("Pipeline finished with state: {} ", result.getState());
}
 
开发者ID:gbif,项目名称:pipelines,代码行数:24,代码来源:MultiAvroOutDemo.java


示例8: testSizes

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testSizes() throws Exception {
  SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);

  PipelineOptions options = PipelineOptionsFactory.create();
  SolrIO.Read read =
      SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
  SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
  // can't use equal assert as Solr collections never have same size
  // (due to internal Lucene implementation)
  long estimatedSize = initialSource.getEstimatedSizeBytes(options);
  LOG.info("Estimated size: {}", estimatedSize);
  assertThat(
      "Wrong estimated size bellow minimum",
      estimatedSize,
      greaterThan(SolrIOTestUtils.MIN_DOC_SIZE * NUM_DOCS));
  assertThat(
      "Wrong estimated size beyond maximum",
      estimatedSize,
      lessThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS));
}
 
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:SolrIOTest.java


示例9: testProgressEmptySource

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testProgressEmptySource() throws IOException {
  PipelineOptions options = PipelineOptionsFactory.create();
  CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100);
  try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
    // before starting
    assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
    assertEquals(0, reader.getSplitPointsConsumed());
    assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());

    // confirm empty
    assertFalse(reader.start());

    // after reading empty source
    assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
    assertEquals(0, reader.getSplitPointsConsumed());
    assertEquals(0, reader.getSplitPointsRemaining());
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:OffsetBasedSourceTest.java


示例10: main

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
public static void main(String[] args)
    throws Exception {

  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline p = Pipeline.create(options);

  // Build the table schema for the output table.
  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("word").setType("STRING"));
  fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

  p.apply(BigQueryIO.readTableRows().from(options.getInput()))
   .apply(new PlaysForWord())
   .apply(BigQueryIO.writeTableRows()
      .to(options.getOutput())
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

  p.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:CombinePerKeyExamples.java


示例11: testBoundedToUnboundedSourceAdapterCheckpoint

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
private <T> void testBoundedToUnboundedSourceAdapterCheckpoint(
    BoundedSource<T> boundedSource,
    List<T> expectedElements) throws Exception {
  BoundedToUnboundedSourceAdapter<T> unboundedSource =
      new BoundedToUnboundedSourceAdapter<>(boundedSource);

  PipelineOptions options = PipelineOptionsFactory.create();
  BoundedToUnboundedSourceAdapter<T>.Reader reader =
      unboundedSource.createReader(options, null);

  List<T> actual = Lists.newArrayList();
  for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
    actual.add(reader.getCurrent());
    // checkpoint every 9 elements
    if (actual.size() % 9 == 0) {
      Checkpoint<T> checkpoint = reader.getCheckpointMark();
      checkpoint.finalizeCheckpoint();
    }
  }
  Checkpoint<T> checkpointDone = reader.getCheckpointMark();
  assertTrue(checkpointDone.getResidualElements() == null
      || checkpointDone.getResidualElements().isEmpty());

  assertEquals(expectedElements.size(), actual.size());
  assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:UnboundedReadFromBoundedSourceTest.java


示例12: testTranslate

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslate() {
  ReadBoundedTranslator translator = new ReadBoundedTranslator();
  GearpumpPipelineOptions options =
      PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
  Read.Bounded transform = mock(Read.Bounded.class);
  BoundedSource source = mock(BoundedSource.class);
  when(transform.getSource()).thenReturn(source);

  TranslationContext translationContext = mock(TranslationContext.class);
  when(translationContext.getPipelineOptions()).thenReturn(options);

  JavaStream stream = mock(JavaStream.class);
  PValue mockOutput = mock(PValue.class);
  when(translationContext.getOutput()).thenReturn(mockOutput);
  when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);

  translator.translate(transform, translationContext);
  verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher()));
  verify(translationContext).setOutputStream(mockOutput, stream);
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReadBoundedTranslatorTest.java


示例13: testProgress

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testProgress() throws IOException {
  final int numRecords = 5;
  @SuppressWarnings("deprecation")  // testing CountingSource
  BoundedSource<Long> source = CountingSource.upTo(numRecords);
  try (BoundedReader<Long> reader = source.createReader(PipelineOptionsFactory.create())) {
    // Check preconditions before starting. Note that CountingReader can always give an accurate
    // remaining parallelism.
    assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
    assertEquals(0, reader.getSplitPointsConsumed());
    assertEquals(numRecords, reader.getSplitPointsRemaining());

    assertTrue(reader.start());
    int i = 0;
    do {
      assertEquals(i, reader.getSplitPointsConsumed());
      assertEquals(numRecords - i, reader.getSplitPointsRemaining());
      ++i;
    } while (reader.advance());

    assertEquals(numRecords, i); // exactly numRecords calls to advance()
    assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
    assertEquals(numRecords, reader.getSplitPointsConsumed());
    assertEquals(0, reader.getSplitPointsRemaining());
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:CountingSourceTest.java


示例14: testGetTableSucceeds

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testGetTableSucceeds() throws Exception {
  TableReference tableRef = new TableReference()
      .setProjectId("projectId")
      .setDatasetId("datasetId")
      .setTableId("tableId");

  Table testTable = new Table();
  testTable.setTableReference(tableRef);

  when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
  when(response.getStatusCode()).thenReturn(403).thenReturn(200);
  when(response.getContent())
      .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
      .thenReturn(toStream(testTable));

  BigQueryServicesImpl.DatasetServiceImpl datasetService =
      new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());

  Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);

  assertEquals(testTable, table);
  verify(response, times(2)).getStatusCode();
  verify(response, times(2)).getContent();
  verify(response, times(2)).getContentType();
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:BigQueryServicesImplTest.java


示例15: testMatchingAllCalls

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testMatchingAllCalls() throws Exception {
  String[] args = new String[] {STORAGE_TRACE};
  GcsOptions options =
      PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
  options.setGcpCredential(new TestCredential());

  assertNotNull(options.getGoogleApiTrace());

  Storage.Objects.Get getRequest =
      Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
  assertEquals("TraceDestination", getRequest.get("$trace"));

  Storage.Objects.List listRequest =
      Transport.newStorageClient(options).build().objects().list("testProjectId");
  assertEquals("TraceDestination", listRequest.get("$trace"));
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:GoogleApiDebugOptionsTest.java


示例16: testPathExistsValidation

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testPathExistsValidation() {
  String[] args = new String[] {
      "--runner=DataflowRunner",
      "--tempLocation=gs://does/not/exist",
      "--project=test-project",
      "--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
  };

  try {
    Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
    fail();
  } catch (RuntimeException e) {
    assertThat(
        Throwables.getStackTraceAsString(e),
        both(containsString("gs://does/not/exist"))
            .and(containsString("does not exist or is not writeable")));
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:DataflowRunnerTest.java


示例17: testIsTableEmptyNoRetryForNotFound

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testIsTableEmptyNoRetryForNotFound() throws IOException, InterruptedException {
  when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
  when(response.getStatusCode()).thenReturn(404);

  BigQueryServicesImpl.DatasetServiceImpl datasetService =
      new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());

  TableReference tableRef = new TableReference()
      .setProjectId("projectId")
      .setDatasetId("datasetId")
      .setTableId("tableId");

  thrown.expect(IOException.class);
  thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));

  try {
    datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
  } finally {
    verify(response, times(1)).getStatusCode();
    verify(response, times(1)).getContent();
    verify(response, times(1)).getContentType();
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:BigQueryServicesImplTest.java


示例18: testInvalidNumberOfWorkerHarnessThreads

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  FileSystems.setDefaultPipelineOptions(options);
  options.setRunner(DataflowRunner.class);
  options.setProject("foo-12345");

  options.setGcpTempLocation(VALID_TEMP_BUCKET);
  options.setGcsUtil(mockGcsUtil);

  options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);

  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage("Number of worker harness threads");
  thrown.expectMessage("Please make sure the value is non-negative.");

  DataflowRunner.fromOptions(options);
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DataflowRunnerTest.java


示例19: testDifferentWindows

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
@Test(expected = IllegalArgumentException.class)
public void testDifferentWindows() throws Exception {
  String sql = "SELECT "
      + " order_id, site_id, count(*) as cnt "
      + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
      + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
      + " UNION SELECT "
      + " order_id, site_id, count(*) as cnt "
      + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
      + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";

  // use a real pipeline rather than the TestPipeline because we are
  // testing exceptions, the pipeline will not actually run.
  Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
  compilePipeline(sql, pipeline1, sqlEnv);
  pipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BeamSetOperatorRelBaseTest.java


示例20: buildPipelineOptions

import org.apache.beam.sdk.options.PipelineOptionsFactory; //导入依赖的package包/类
private static DataflowPipelineOptions buildPipelineOptions(String ... args) throws IOException {
  GcsUtil mockGcsUtil = mock(GcsUtil.class);
  when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
    @Override
    public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
      return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
    }
  });
  when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);

  DataflowPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
  options.setRunner(DataflowRunner.class);
  options.setGcpCredential(new TestCredential());
  options.setJobName("some-job-name");
  options.setProject("some-project");
  options.setRegion("some-region");
  options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
  options.setFilesToStage(new LinkedList<String>());
  options.setGcsUtil(mockGcsUtil);
  return options;
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:BatchStatefulParDoOverridesTest.java



注:本文中的org.apache.beam.sdk.options.PipelineOptionsFactory类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ResponseParser类代码示例发布时间:2022-05-21
下一篇:
Java ProdId类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap