• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java PubsubIO类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.io.gcp.pubsub.PubsubIO的典型用法代码示例。如果您正苦于以下问题:Java PubsubIO类的具体用法?Java PubsubIO怎么用?Java PubsubIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



PubsubIO类属于org.apache.beam.sdk.io.gcp.pubsub包,在下文中一共展示了PubsubIO类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: sourceEventsFromPubsub

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
 * Return source of events from Pubsub.
 */
private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
  String shortSubscription = shortSubscription(now);
  NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);

  PubsubIO.Read<PubsubMessage> io =
      PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
          .withIdAttribute(NexmarkUtils.PUBSUB_ID);
  if (!configuration.usePubsubPublishTime) {
    io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
  }

  return p
    .apply(queryName + ".ReadPubsubEvents", io)
    .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        byte[] payload = c.element().getPayload();
        try {
          Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
          c.output(event);
        } catch (CoderException e) {
          LOG.error("Error while decoding Event from pusbSub message: serialization error");
        }
      }
    }));
}
 
开发者ID:apache,项目名称:beam,代码行数:30,代码来源:NexmarkLauncher.java


示例2: sinkResultsToPubsub

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
 * Send {@code formattedResults} to Pubsub.
 */
private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
  String shortTopic = shortTopic(now);
  NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
  PubsubIO.Write<String> io =
      PubsubIO.writeStrings().to(shortTopic)
          .withIdAttribute(NexmarkUtils.PUBSUB_ID);
  if (!configuration.usePubsubPublishTime) {
    io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
  }
  formattedResults.apply(queryName + ".WritePubsubResults", io);
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:NexmarkLauncher.java


示例3: expand

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
@Override
public PCollection<IndexedRecord> expand(PBegin in) {
    PCollection<PubsubMessage> pubsubMessages = null;
    if (properties.useMaxNumRecords.getValue() || properties.useMaxReadTime.getValue()) {
        pubsubMessages = in.apply(Create.of(dataset.subscription.getValue()))
                .apply(ParDo.of(new BoundedReaderFn(properties)));
    } else {// normal
        PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages().fromSubscription(String
                .format("projects/%s/subscriptions/%s", datastore.projectName.getValue(), dataset.subscription.getValue()));
        if (properties.idLabel.getValue() != null && !"".equals(properties.idLabel.getValue())) {
            pubsubRead.withIdAttribute(properties.idLabel.getValue());
        }
        if (properties.timestampLabel.getValue() != null && !"".equals(properties.timestampLabel.getValue())) {
            pubsubRead.withTimestampAttribute(properties.timestampLabel.getValue());
        }

        pubsubMessages = in.apply(pubsubRead);
    }

    switch (dataset.valueFormat.getValue()) {
    case AVRO: {
        Schema schema = new Schema.Parser().parse(dataset.avroSchema.getValue());
        return pubsubMessages.apply(ParDo.of(new ConvertToAvro(schema.toString()))).setCoder(getDefaultOutputCoder());
    }
    case CSV: {
        return (PCollection<IndexedRecord>) pubsubMessages
                .apply(ParDo.of(new ExtractCsvSplit(dataset.fieldDelimiter.getValue())))
                .apply((PTransform) ConvertToIndexedRecord.of());
    }
    default:
        throw new RuntimeException("To be implemented: " + dataset.valueFormat.getValue());

    }
}
 
开发者ID:Talend,项目名称:components,代码行数:35,代码来源:PubSubInputRuntime.java


示例4: expand

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
@Override
public PDone expand(PCollection<IndexedRecord> in) {
    PubSubDatasetProperties dataset = properties.getDatasetProperties();
    PubSubDatastoreProperties datastore = dataset.getDatastoreProperties();

    try {
        createTopicSubscriptionIfNeeded(properties);
    } catch (IOException e) {
        throw TalendRuntimeException.createUnexpectedException(e);
    }

    PubsubIO.Write<PubsubMessage> pubsubWrite = PubsubIO.writeMessages()
            .to(String.format("projects/%s/topics/%s", datastore.projectName.getValue(), dataset.topic.getValue()));

    if (properties.idLabel.getValue() != null && !"".equals(properties.idLabel.getValue())) {
        pubsubWrite.withIdAttribute(properties.idLabel.getValue());
    }
    if (properties.timestampLabel.getValue() != null && !"".equals(properties.timestampLabel.getValue())) {
        pubsubWrite.withTimestampAttribute(properties.timestampLabel.getValue());
    }

    switch (dataset.valueFormat.getValue()) {
    case CSV: {
        return in.apply(MapElements.via(new FormatCsv(dataset.fieldDelimiter.getValue()))).apply(pubsubWrite);
    }
    case AVRO: {
        return in.apply(MapElements.via(new FormatAvro())).apply(pubsubWrite);
    }
    default:
        throw new RuntimeException("To be implemented: " + dataset.valueFormat.getValue());
    }

}
 
开发者ID:Talend,项目名称:components,代码行数:34,代码来源:PubSubOutputRuntime.java


示例5: main

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
 * <p>Creates a dataflow pipeline that creates the following chain:</p>
 * <ol>
 *   <li> Reads from a Cloud Pubsub topic
 *   <li> Window into fixed windows of 1 minute
 *   <li> Applies word count transform
 *   <li> Creates Puts from each of the word counts in the array
 *   <li> Performs a Bigtable Put on the items
 * </ol>
 *
 * @param args Arguments to use to configure the Dataflow Pipeline.  The first three are required
 *   when running via managed resource in Google Cloud Platform.  Those options should be omitted
 *   for LOCAL runs.  The next four arguments are to configure the Bigtable connection. The last
 *   two items are for Cloud Pubsub.
 *        --runner=BlockingDataflowPipelineRunner
 *        --project=[dataflow project] \\
 *        --stagingLocation=gs://[your google storage bucket] \\
 *        --bigtableProjectId=[bigtable project] \\
 *        --bigtableInstanceId=[bigtable instance id] \\
 *        --bigtableTableId=[bigtable tableName]
 *        --inputFile=[file path on GCS]
 *        --pubsubTopic=projects/[project name]/topics/[topic name]
 */

public static void main(String[] args) throws Exception {
  // CloudBigtableOptions is one way to retrieve the options.  It's not required.
  BigtablePubsubOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);

  // CloudBigtableTableConfiguration contains the project, instance and table to connect to.
  CloudBigtableTableConfiguration config =
      new CloudBigtableTableConfiguration.Builder()
      .withProjectId(options.getBigtableProjectId())
      .withInstanceId(options.getBigtableInstanceId())
      .withTableId(options.getBigtableTableId())
      .build();

  // In order to cancel the pipelines automatically,
  // DataflowPipelineRunner is forced to be used.
  // Also enables the 2 jobs to run at the same time.
  options.setRunner(DataflowRunner.class);

  options.as(DataflowPipelineOptions.class).setStreaming(true);
  Pipeline p = Pipeline.create(options);

  FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));

  p
      .apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
      .apply(Window.<String> into(window))
      .apply(ParDo.of(new ExtractWordsFn()))
      .apply(Count.<String> perElement())
      .apply(ParDo.of(MUTATION_TRANSFORM))
      .apply(CloudBigtableIO.writeToTable(config));

  p.run().waitUntilFinish();
  // Start a second job to inject messages into a Cloud Pubsub topic
  injectMessages(options);
}
 
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:60,代码来源:PubsubWordCount.java


示例6: injectMessages

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
private static void injectMessages(BigtablePubsubOptions options) {
  String inputFile = options.getInputFile();
  String topic = options.getPubsubTopic();
  DataflowPipelineOptions copiedOptions = options.as(DataflowPipelineOptions.class);
  copiedOptions.setStreaming(false);
  copiedOptions.setNumWorkers(INJECTORNUMWORKERS);
  copiedOptions.setJobName(copiedOptions.getJobName() + "-injector");
  Pipeline injectorPipeline = Pipeline.create(copiedOptions);
  injectorPipeline.apply(TextIO.read().from(inputFile))
      .apply(ParDo.of(new FilterEmptyStringsFn()))
      .apply(PubsubIO.writeStrings().to(topic));
  injectorPipeline.run().waitUntilFinish();
}
 
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:14,代码来源:PubsubWordCount.java


示例7: main

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {

        MainPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(MainPipelineOptions.class);
        options.setStreaming(true);

        // Set cloud resource settings depending on the situation.
        //options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE); // default: AutoscalingAlgorithmType.THROUGHPUT_BASED
        //options.setWorkerMachineType("n1-standard-4");
        //options.setMaxNumWorkers(30);
        //options.setNumWorkers(1);
        //options.setDiskSizeGb(30);

        Pipeline p = Pipeline.create(options);

        String topicReceipt = buildTopicName(options.getProject(), options.getInputTopicReceipt());
        String topicTrend = buildTopicName(options.getProject(), options.getOutputTopicTrend());
        String topicHeatmap = buildTopicName(options.getProject(), options.getOutputTopicHeatmap());
        String topicConsole = buildTopicName(options.getProject(), options.getOutputTopicConsole());


        // Read input from each source
        PCollection<Receipt> receipts = p
                .apply("Read Receipt JSON From PubSub", PubsubIO.readStrings().fromTopic(topicReceipt))
                .apply("Convert JSON to Entity", ParDo.of(new ConvertJsonToReceiptDoFn()));

        PCollectionView<Map<String, String>> storeMasterView = p
                .apply("Read store master file", TextIO.read().from(options.getInputGCSStoreMaster()))
                .apply("Aggregate store master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());

        PCollectionView<Map<String, String>> productMasterView = p
                .apply("Read product master file", TextIO.read().from(options.getInputGCSProductMaster()))
                .apply("Aggregate product master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());

        PCollectionView<Map<String, String>> categoryMasterView = p
                .apply("Read category master file", TextIO.read().from(options.getInputGCSCategoryMaster()))
                .apply("Aggregate category master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());


        // Execute ETL processes
        double filterRate = options.getFilterRate();
        PCollection<Receipt> filteredReceipts = receipts.apply("Filter receipts with demo shop or hit given rate",
                Filter.by((Receipt receipt) -> receipt.getStoreCode().equals("000") || Math.random() < filterRate));

        PCollection<Receipt> windowedReceipts = receipts
                .apply("Set sliding window", Window.into(SlidingWindows
                        .of(Duration.standardSeconds(options.getWindowSize()))
                        .every(Duration.standardSeconds(options.getWindowInterval()))));
        PCollection<SummaryCombineFn.Summary> combinedReceipts = windowedReceipts
                .apply("Combine Receipts by window", Combine.globally(new SummaryCombineFn()).withoutDefaults());

        TupleTag<String> tagOutputTrend  = new TupleTag<String>(){ private static final long serialVersionUID = 1L; };
        TupleTag<String> tagOutputHeatmap = new TupleTag<String>(){ private static final long serialVersionUID = 1L; };

        PCollectionTuple results = combinedReceipts
                .apply("Convert Summary to JSON", ParDo
                        .of(new ConvertSummaryToJsonDoFn(productMasterView, storeMasterView, categoryMasterView,tagOutputTrend, tagOutputHeatmap, options.getLimitStores()))
                        .withSideInputs(productMasterView, storeMasterView, categoryMasterView)
                        .withOutputTags(tagOutputTrend, TupleTagList.of(tagOutputHeatmap)));


        // Write results to each sink
        receipts.apply("Convert Entity to BQ TableRow",ParDo.of(new ConvertReceiptToBQTableDoFn()))
                .apply("Insert to BigQuery Table", BigQueryIO.writeTableRows().to(options.getOutputBigQueryTable())
                        .withSchema(ConvertReceiptToBQTableDoFn.buildBigQuerySchema())
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        filteredReceipts
                .apply("Convert Entity to JSON", ParDo.of(new ConvertReceiptToJsonDoFn(storeMasterView, productMasterView))
                        .withSideInputs(productMasterView, storeMasterView))
                .apply("Publish to PubSub topic console", PubsubIO.writeStrings().to(topicConsole));

        results.get(tagOutputTrend).apply("Publish to PubSub topic trend", PubsubIO.writeStrings().to(topicTrend));
        results.get(tagOutputHeatmap).apply("Publish to PubSub topic heatmap", PubsubIO.writeStrings().to(topicHeatmap));

        p.run();
    }
 
开发者ID:topgate,项目名称:retail-demo,代码行数:78,代码来源:MainPipeline.java


示例8: main

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {

    	// Setup Dataflow options
        StreamingOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingOptions.class);
        options.setStreaming(true);


        Pipeline pipeline = Pipeline.create(options);

        //BQ table setup
        TableSchema bqTableSchema;
        try {
            bqTableSchema = createTableSchema(tableSchema);
        } catch (IOException e){
            e.printStackTrace();
            return;
        }
        
        String tableName = projectId + ":" + datasetId + "." + tableId;
        
        Pipeline p = Pipeline.create(options);

        // Read message from Pub/Sub
        p.apply("ReadFromPubSub", PubsubIO.readStrings()
            .fromTopic(readTopic))
        
        // Format tweets for BigQuery - convert string to table row
        .apply("Format for BigQuery", ParDo.of(new StringToRowConverter()))

        
        // Write tweets to BigQuery
        .apply("write to BQ", BigQueryIO.writeTableRows()
            .to(tableName)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
            .withSchema(bqTableSchema));

        //run pipeline
        PipelineResult result = p.run();
    }
 
开发者ID:yuriatgoogle,项目名称:basicpipeline,代码行数:44,代码来源:basicpipeline.java


示例9: main

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    ExampleUtils exampleUtils = new ExampleUtils(options);
    Pipeline pipeline = Pipeline.create(options);

    // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
    // data elements, and parse the data.
    PCollection<GameActionInfo> gameEvents = pipeline
        .apply(PubsubIO.readStrings()
            .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));

    gameEvents
        .apply(
            "CalculateTeamScores",
            new CalculateTeamScores(
                Duration.standardMinutes(options.getTeamWindowDuration()),
                Duration.standardMinutes(options.getAllowedLateness())))
        // Write the results to BigQuery.
        .apply(
            "WriteTeamScoreSums",
            new WriteWindowedToBigQuery<KV<String, Integer>>(
                options.as(GcpOptions.class).getProject(),
                options.getDataset(),
                options.getLeaderBoardTableName() + "_team",
                configureWindowedTableWrite()));
    gameEvents
        .apply(
            "CalculateUserScores",
            new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
        // Write the results to BigQuery.
        .apply(
            "WriteUserScoreSums",
            new WriteToBigQuery<KV<String, Integer>>(
                options.as(GcpOptions.class).getProject(),
                options.getDataset(),
                options.getLeaderBoardTableName() + "_user",
                configureGlobalWindowBigQueryWrite()));

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
    exampleUtils.waitToFinish(result);
  }
 
开发者ID:apache,项目名称:beam,代码行数:48,代码来源:LeaderBoard.java


示例10: sinkEventsToPubsub

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
 * Send {@code events} to Pubsub.
 */
private void sinkEventsToPubsub(PCollection<Event> events, long now) {
  String shortTopic = shortTopic(now);
  NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);

  PubsubIO.Write<PubsubMessage> io =
      PubsubIO.writeMessages().to(shortTopic)
          .withIdAttribute(NexmarkUtils.PUBSUB_ID);
  if (!configuration.usePubsubPublishTime) {
    io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
  }

  events.apply(queryName + ".EventToPubsubMessage",
          ParDo.of(new DoFn<Event, PubsubMessage>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              try {
                byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
                c.output(new PubsubMessage(payload, new HashMap<String, String>()));
              } catch (CoderException e1) {
                LOG.error("Error while sending Event {} to pusbSub: serialization error",
                    c.element().toString());
              }
            }
          })
      )
      .apply(queryName + ".WritePubsubEvents", io);
}
 
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:NexmarkLauncher.java



注:本文中的org.apache.beam.sdk.io.gcp.pubsub.PubsubIO类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java LibraryContainer类代码示例发布时间:2022-05-23
下一篇:
Java Invoker类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap