本文整理汇总了Java中org.apache.beam.sdk.io.gcp.pubsub.PubsubIO类的典型用法代码示例。如果您正苦于以下问题:Java PubsubIO类的具体用法?Java PubsubIO怎么用?Java PubsubIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PubsubIO类属于org.apache.beam.sdk.io.gcp.pubsub包,在下文中一共展示了PubsubIO类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: sourceEventsFromPubsub
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
* Return source of events from Pubsub.
*/
private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
String shortSubscription = shortSubscription(now);
NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
PubsubIO.Read<PubsubMessage> io =
PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
return p
.apply(queryName + ".ReadPubsubEvents", io)
.apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
@ProcessElement
public void processElement(ProcessContext c) {
byte[] payload = c.element().getPayload();
try {
Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
c.output(event);
} catch (CoderException e) {
LOG.error("Error while decoding Event from pusbSub message: serialization error");
}
}
}));
}
开发者ID:apache,项目名称:beam,代码行数:30,代码来源:NexmarkLauncher.java
示例2: sinkResultsToPubsub
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
* Send {@code formattedResults} to Pubsub.
*/
private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
String shortTopic = shortTopic(now);
NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
PubsubIO.Write<String> io =
PubsubIO.writeStrings().to(shortTopic)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
formattedResults.apply(queryName + ".WritePubsubResults", io);
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:NexmarkLauncher.java
示例3: expand
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
@Override
public PCollection<IndexedRecord> expand(PBegin in) {
PCollection<PubsubMessage> pubsubMessages = null;
if (properties.useMaxNumRecords.getValue() || properties.useMaxReadTime.getValue()) {
pubsubMessages = in.apply(Create.of(dataset.subscription.getValue()))
.apply(ParDo.of(new BoundedReaderFn(properties)));
} else {// normal
PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages().fromSubscription(String
.format("projects/%s/subscriptions/%s", datastore.projectName.getValue(), dataset.subscription.getValue()));
if (properties.idLabel.getValue() != null && !"".equals(properties.idLabel.getValue())) {
pubsubRead.withIdAttribute(properties.idLabel.getValue());
}
if (properties.timestampLabel.getValue() != null && !"".equals(properties.timestampLabel.getValue())) {
pubsubRead.withTimestampAttribute(properties.timestampLabel.getValue());
}
pubsubMessages = in.apply(pubsubRead);
}
switch (dataset.valueFormat.getValue()) {
case AVRO: {
Schema schema = new Schema.Parser().parse(dataset.avroSchema.getValue());
return pubsubMessages.apply(ParDo.of(new ConvertToAvro(schema.toString()))).setCoder(getDefaultOutputCoder());
}
case CSV: {
return (PCollection<IndexedRecord>) pubsubMessages
.apply(ParDo.of(new ExtractCsvSplit(dataset.fieldDelimiter.getValue())))
.apply((PTransform) ConvertToIndexedRecord.of());
}
default:
throw new RuntimeException("To be implemented: " + dataset.valueFormat.getValue());
}
}
开发者ID:Talend,项目名称:components,代码行数:35,代码来源:PubSubInputRuntime.java
示例4: expand
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
@Override
public PDone expand(PCollection<IndexedRecord> in) {
PubSubDatasetProperties dataset = properties.getDatasetProperties();
PubSubDatastoreProperties datastore = dataset.getDatastoreProperties();
try {
createTopicSubscriptionIfNeeded(properties);
} catch (IOException e) {
throw TalendRuntimeException.createUnexpectedException(e);
}
PubsubIO.Write<PubsubMessage> pubsubWrite = PubsubIO.writeMessages()
.to(String.format("projects/%s/topics/%s", datastore.projectName.getValue(), dataset.topic.getValue()));
if (properties.idLabel.getValue() != null && !"".equals(properties.idLabel.getValue())) {
pubsubWrite.withIdAttribute(properties.idLabel.getValue());
}
if (properties.timestampLabel.getValue() != null && !"".equals(properties.timestampLabel.getValue())) {
pubsubWrite.withTimestampAttribute(properties.timestampLabel.getValue());
}
switch (dataset.valueFormat.getValue()) {
case CSV: {
return in.apply(MapElements.via(new FormatCsv(dataset.fieldDelimiter.getValue()))).apply(pubsubWrite);
}
case AVRO: {
return in.apply(MapElements.via(new FormatAvro())).apply(pubsubWrite);
}
default:
throw new RuntimeException("To be implemented: " + dataset.valueFormat.getValue());
}
}
开发者ID:Talend,项目名称:components,代码行数:34,代码来源:PubSubOutputRuntime.java
示例5: main
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
* <p>Creates a dataflow pipeline that creates the following chain:</p>
* <ol>
* <li> Reads from a Cloud Pubsub topic
* <li> Window into fixed windows of 1 minute
* <li> Applies word count transform
* <li> Creates Puts from each of the word counts in the array
* <li> Performs a Bigtable Put on the items
* </ol>
*
* @param args Arguments to use to configure the Dataflow Pipeline. The first three are required
* when running via managed resource in Google Cloud Platform. Those options should be omitted
* for LOCAL runs. The next four arguments are to configure the Bigtable connection. The last
* two items are for Cloud Pubsub.
* --runner=BlockingDataflowPipelineRunner
* --project=[dataflow project] \\
* --stagingLocation=gs://[your google storage bucket] \\
* --bigtableProjectId=[bigtable project] \\
* --bigtableInstanceId=[bigtable instance id] \\
* --bigtableTableId=[bigtable tableName]
* --inputFile=[file path on GCS]
* --pubsubTopic=projects/[project name]/topics/[topic name]
*/
public static void main(String[] args) throws Exception {
// CloudBigtableOptions is one way to retrieve the options. It's not required.
BigtablePubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);
// CloudBigtableTableConfiguration contains the project, instance and table to connect to.
CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
// In order to cancel the pipelines automatically,
// DataflowPipelineRunner is forced to be used.
// Also enables the 2 jobs to run at the same time.
options.setRunner(DataflowRunner.class);
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));
p
.apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
.apply(Window.<String> into(window))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String> perElement())
.apply(ParDo.of(MUTATION_TRANSFORM))
.apply(CloudBigtableIO.writeToTable(config));
p.run().waitUntilFinish();
// Start a second job to inject messages into a Cloud Pubsub topic
injectMessages(options);
}
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:60,代码来源:PubsubWordCount.java
示例6: injectMessages
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
private static void injectMessages(BigtablePubsubOptions options) {
String inputFile = options.getInputFile();
String topic = options.getPubsubTopic();
DataflowPipelineOptions copiedOptions = options.as(DataflowPipelineOptions.class);
copiedOptions.setStreaming(false);
copiedOptions.setNumWorkers(INJECTORNUMWORKERS);
copiedOptions.setJobName(copiedOptions.getJobName() + "-injector");
Pipeline injectorPipeline = Pipeline.create(copiedOptions);
injectorPipeline.apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new FilterEmptyStringsFn()))
.apply(PubsubIO.writeStrings().to(topic));
injectorPipeline.run().waitUntilFinish();
}
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:14,代码来源:PubsubWordCount.java
示例7: main
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
MainPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(MainPipelineOptions.class);
options.setStreaming(true);
// Set cloud resource settings depending on the situation.
//options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE); // default: AutoscalingAlgorithmType.THROUGHPUT_BASED
//options.setWorkerMachineType("n1-standard-4");
//options.setMaxNumWorkers(30);
//options.setNumWorkers(1);
//options.setDiskSizeGb(30);
Pipeline p = Pipeline.create(options);
String topicReceipt = buildTopicName(options.getProject(), options.getInputTopicReceipt());
String topicTrend = buildTopicName(options.getProject(), options.getOutputTopicTrend());
String topicHeatmap = buildTopicName(options.getProject(), options.getOutputTopicHeatmap());
String topicConsole = buildTopicName(options.getProject(), options.getOutputTopicConsole());
// Read input from each source
PCollection<Receipt> receipts = p
.apply("Read Receipt JSON From PubSub", PubsubIO.readStrings().fromTopic(topicReceipt))
.apply("Convert JSON to Entity", ParDo.of(new ConvertJsonToReceiptDoFn()));
PCollectionView<Map<String, String>> storeMasterView = p
.apply("Read store master file", TextIO.read().from(options.getInputGCSStoreMaster()))
.apply("Aggregate store master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());
PCollectionView<Map<String, String>> productMasterView = p
.apply("Read product master file", TextIO.read().from(options.getInputGCSProductMaster()))
.apply("Aggregate product master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());
PCollectionView<Map<String, String>> categoryMasterView = p
.apply("Read category master file", TextIO.read().from(options.getInputGCSCategoryMaster()))
.apply("Aggregate category master as map View", Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());
// Execute ETL processes
double filterRate = options.getFilterRate();
PCollection<Receipt> filteredReceipts = receipts.apply("Filter receipts with demo shop or hit given rate",
Filter.by((Receipt receipt) -> receipt.getStoreCode().equals("000") || Math.random() < filterRate));
PCollection<Receipt> windowedReceipts = receipts
.apply("Set sliding window", Window.into(SlidingWindows
.of(Duration.standardSeconds(options.getWindowSize()))
.every(Duration.standardSeconds(options.getWindowInterval()))));
PCollection<SummaryCombineFn.Summary> combinedReceipts = windowedReceipts
.apply("Combine Receipts by window", Combine.globally(new SummaryCombineFn()).withoutDefaults());
TupleTag<String> tagOutputTrend = new TupleTag<String>(){ private static final long serialVersionUID = 1L; };
TupleTag<String> tagOutputHeatmap = new TupleTag<String>(){ private static final long serialVersionUID = 1L; };
PCollectionTuple results = combinedReceipts
.apply("Convert Summary to JSON", ParDo
.of(new ConvertSummaryToJsonDoFn(productMasterView, storeMasterView, categoryMasterView,tagOutputTrend, tagOutputHeatmap, options.getLimitStores()))
.withSideInputs(productMasterView, storeMasterView, categoryMasterView)
.withOutputTags(tagOutputTrend, TupleTagList.of(tagOutputHeatmap)));
// Write results to each sink
receipts.apply("Convert Entity to BQ TableRow",ParDo.of(new ConvertReceiptToBQTableDoFn()))
.apply("Insert to BigQuery Table", BigQueryIO.writeTableRows().to(options.getOutputBigQueryTable())
.withSchema(ConvertReceiptToBQTableDoFn.buildBigQuerySchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
filteredReceipts
.apply("Convert Entity to JSON", ParDo.of(new ConvertReceiptToJsonDoFn(storeMasterView, productMasterView))
.withSideInputs(productMasterView, storeMasterView))
.apply("Publish to PubSub topic console", PubsubIO.writeStrings().to(topicConsole));
results.get(tagOutputTrend).apply("Publish to PubSub topic trend", PubsubIO.writeStrings().to(topicTrend));
results.get(tagOutputHeatmap).apply("Publish to PubSub topic heatmap", PubsubIO.writeStrings().to(topicHeatmap));
p.run();
}
开发者ID:topgate,项目名称:retail-demo,代码行数:78,代码来源:MainPipeline.java
示例8: main
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
// Setup Dataflow options
StreamingOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
//BQ table setup
TableSchema bqTableSchema;
try {
bqTableSchema = createTableSchema(tableSchema);
} catch (IOException e){
e.printStackTrace();
return;
}
String tableName = projectId + ":" + datasetId + "." + tableId;
Pipeline p = Pipeline.create(options);
// Read message from Pub/Sub
p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(readTopic))
// Format tweets for BigQuery - convert string to table row
.apply("Format for BigQuery", ParDo.of(new StringToRowConverter()))
// Write tweets to BigQuery
.apply("write to BQ", BigQueryIO.writeTableRows()
.to(tableName)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
.withSchema(bqTableSchema));
//run pipeline
PipelineResult result = p.run();
}
开发者ID:yuriatgoogle,项目名称:basicpipeline,代码行数:44,代码来源:basicpipeline.java
示例9: main
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
ExampleUtils exampleUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.readStrings()
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
gameEvents
.apply(
"CalculateTeamScores",
new CalculateTeamScores(
Duration.standardMinutes(options.getTeamWindowDuration()),
Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
.apply(
"WriteTeamScoreSums",
new WriteWindowedToBigQuery<KV<String, Integer>>(
options.as(GcpOptions.class).getProject(),
options.getDataset(),
options.getLeaderBoardTableName() + "_team",
configureWindowedTableWrite()));
gameEvents
.apply(
"CalculateUserScores",
new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
.apply(
"WriteUserScoreSums",
new WriteToBigQuery<KV<String, Integer>>(
options.as(GcpOptions.class).getProject(),
options.getDataset(),
options.getLeaderBoardTableName() + "_user",
configureGlobalWindowBigQueryWrite()));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
exampleUtils.waitToFinish(result);
}
开发者ID:apache,项目名称:beam,代码行数:48,代码来源:LeaderBoard.java
示例10: sinkEventsToPubsub
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; //导入依赖的package包/类
/**
* Send {@code events} to Pubsub.
*/
private void sinkEventsToPubsub(PCollection<Event> events, long now) {
String shortTopic = shortTopic(now);
NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
PubsubIO.Write<PubsubMessage> io =
PubsubIO.writeMessages().to(shortTopic)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
events.apply(queryName + ".EventToPubsubMessage",
ParDo.of(new DoFn<Event, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
c.output(new PubsubMessage(payload, new HashMap<String, String>()));
} catch (CoderException e1) {
LOG.error("Error while sending Event {} to pusbSub: serialization error",
c.element().toString());
}
}
})
)
.apply(queryName + ".WritePubsubEvents", io);
}
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:NexmarkLauncher.java
注:本文中的org.apache.beam.sdk.io.gcp.pubsub.PubsubIO类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论