本文整理汇总了Java中com.google.cloud.dataflow.sdk.io.PubsubIO类的典型用法代码示例。如果您正苦于以下问题:Java PubsubIO类的具体用法?Java PubsubIO怎么用?Java PubsubIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PubsubIO类属于com.google.cloud.dataflow.sdk.io包,在下文中一共展示了PubsubIO类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: apply
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
@Override
public PCollection<GameEvent> apply(PBegin begin) {
if (options.getInput() != null && !options.getInput().isEmpty()) {
return begin
.getPipeline()
.apply(TextIO.Read.from(options.getInput()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
.apply(
"AddEventTimestamps",
WithTimestamps.of((GameEvent i) -> new Instant(i.getTimestamp())));
} else {
return begin
.getPipeline()
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
}
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:18,代码来源:Exercise3.java
示例2: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("mark rides", MapElements.via(new MarkRides()))
.apply("count similar", Count.perKey())
.apply("format rides", MapElements.via(new TransformRides()))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:22,代码来源:CountRides.java
示例3: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("filter lower Manhattan", ParDo.of(new FilterLowerManhattan()))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:18,代码来源:FilterRides.java
示例4: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("parse timestamps",
MapElements.via(
(TableRow e) ->
Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(e.get("timestamp").toString())).toEpochMilli())
.withOutputType(TypeDescriptor.of(Long.class)))
.apply("max timestamp in window", Max.longsGlobally().withoutDefaults())
.apply("transform",
MapElements.via(
(Long t) -> {
TableRow ride = new TableRow();
ride.set("timestamp", Instant.ofEpochMilli(t).toString());
return ride;
})
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("write to PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:TimestampRides.java
示例5: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("extract dollars",
MapElements.via((TableRow x) -> Double.parseDouble(x.get("meter_increment").toString()))
.withOutputType(TypeDescriptor.of(Double.class)))
.apply("fixed window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("trigger",
Window.<Double>triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("sum whole window", Sum.doublesGlobally().withoutDefaults())
.apply("format rides", ParDo.of(new TransformRides()))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:32,代码来源:ExactDollarRides.java
示例6: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("filter a few rides",
Filter.byPredicate(
(TableRow t) -> {
String rideId = t.get("ride_id").toString();
// You can change the filter here to allow more or fewer rides through:
// rideIds starting with "a" are quite common
// rideIds starting with "ab" are rarer
// rideIds starting with "abc" are rarer still
if (rideId.startsWith("ab")) {
LOG.info("Accepted point on ride {} with order number {}} timestamp {}",
t.get("ride_id"), t.get("point_idx"), t.get("timestamp"));
return true;
}
return false;
}))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:34,代码来源:DebugFewRides.java
示例7: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("key rides by rideid",
MapElements.via((TableRow ride) -> KV.of(ride.get("ride_id").toString(), ride))
.withOutputType(new TypeDescriptor<KV<String, TableRow>>() {}))
.apply("session windows on rides with early firings",
Window.<KV<String, TableRow>>into(
Sessions.withGapDuration(Duration.standardMinutes(60)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(2000))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("group ride points on same ride", Combine.perKey(new LatestPointCombine()))
.apply("discard key",
MapElements.via((KV<String, TableRow> a) -> a.getValue())
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:LatestRides.java
示例8: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws GeneralSecurityException, IOException, ParseException, ParserConfigurationException, SAXException {
String params = null;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("--params="))
params = args[i].replaceFirst("--params=", "");
}
System.out.println(params);
init(params);
GoogleCredential credential = new GoogleCredential.Builder()
.setTransport(new NetHttpTransport())
.setJsonFactory(new JacksonFactory())
.setServiceAccountId(accountEmail)
.setServiceAccountScopes(Arrays.asList(new String[] {"https://www.googleapis.com/auth/cloud-platform"}))
.setServiceAccountPrivateKeyFromP12File(new File(keyFile))
.build();
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
// Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject(projectId);
// Your Google Cloud Storage path is required for staging local files.
options.setStagingLocation(workingBucket);
options.setGcpCredential(credential);
options.setServiceAccountName(accountEmail);
options.setServiceAccountKeyfile(keyFile);
options.setMaxNumWorkers(maxNumWorkers);
options.setDiskSizeGb(diskSizeGb);
options.setWorkerMachineType(machineType);
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.setZone(zone);
options.setStreaming(isStreaming);
options.setJobName(pipelineName);
Gson gson = new Gson();
TableSchema schema = gson.fromJson(schemaStr, TableSchema.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> streamData =
pipeline.apply(PubsubIO.Read.named("ReadFromPubsub")
.topic(String.format("projects/%1$s/topics/%2$s",projectId,pubSubTopic)));
PCollection<TableRow> tableRow = streamData.apply("ToTableRow", ParDo.of(new PrepData.ToTableRow()));
tableRow.apply(BigQueryIO.Write
.named("WriteBQTable")
.to(String.format("%1$s:%2$s.%3$s",projectId, bqDataSet, bqTable))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
System.out.println("Starting pipeline " + pipelineName);
pipeline.run();
}
开发者ID:bomboradata,项目名称:pubsub-to-bigquery,代码行数:55,代码来源:PubSubToBQPipeline.java
示例9: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
// DataflowExampleUtils creates the necessary input sources to simplify execution of this
// Pipeline.
DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
/**
* Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input;
if (options.isUnbounded()) {
LOG.info("Reading from PubSub.");
/**
* Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
* specified as an argument. The data elements' timestamps will come from the pubsub
* injection.
*/
input = pipeline
.apply(PubsubIO.Read.topic(options.getPubsubTopic()));
} else {
/** Else, this is a bounded pipeline. Read from the GCS file. */
input = pipeline
.apply(TextIO.Read.from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn()));
}
/**
* Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
* Concept #5: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
* Concept #6: Format the results for a BigQuery table, then write to BigQuery.
* The BigQuery output source supports both bounded and unbounded data.
*/
wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
PipelineResult result = pipeline.run();
/**
* To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
* runs for a limited time, and publishes to the input PubSub topic.
*
* With an unbounded input source, you will need to explicitly shut down this pipeline when you
* are done with it, so that you do not continue to be charged for the instances. You can do
* this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
* pipelines. The PubSub topic will also be deleted at this time.
*/
exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
}
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:70,代码来源:WindowedWordCount.java
示例10: main
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
// Setup Dataflow options
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.setMaxNumWorkers(3);
String projectId = options.getProject();
// Create TableSchemas from their String representation
TableSchema tweetsTableSchema;
TableSchema annotatedTweetsTableSchema;
try {
tweetsTableSchema = createTableSchema(TWEETS_TABLE_SCHEMA);
annotatedTweetsTableSchema = createTableSchema(ANNOTATED_TWEETS_TABLE_SCHEMA);
} catch (IOException e) {
e.printStackTrace();
return;
}
Pipeline p = Pipeline.create(options);
// Read tweets from Pub/Sub
PCollection<String> tweets = null;
tweets = p.apply(PubsubIO.Read.named("Read tweets from PubSub").topic("projects/" + projectId + "/topics/blackfridaytweets"));
// Format tweets for BigQuery
PCollection<TableRow> formattedTweets = tweets.apply(ParDo.named("Format tweets for BigQuery").of(new DoFormat()));
// Create a TableReference for the destination table
TableReference tableReference = new TableReference();
tableReference.setProjectId(projectId);
tableReference.setDatasetId("black_friday_analytics");
tableReference.setTableId("tweets_raw");
// Write tweets to BigQuery
formattedTweets.apply(BigQueryIO.Write.named("Write tweets to BigQuery").to(tableReference).withSchema(tweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withoutValidation());
// Filter and annotate tweets with their sentiment from NL API
// Note: if the pipeline is run as a batch pipeline, the filter condition is inverted
PCollection<String> filteredTweets = tweets.apply(ParDo.named("Filter and annotate tweets").of(new DoFilterAndProcess()));
// Format tweets for BigQuery
PCollection<TableRow> filteredFormattedTweets = filteredTweets.apply(ParDo.named("Format annotated tweets for BigQuery").of(new DoFormat()));
// Create a TableReference for the destination table
TableReference filteredTableReference = new TableReference();
filteredTableReference.setProjectId(projectId);
filteredTableReference.setDatasetId("black_friday_analytics");
filteredTableReference.setTableId("tweets_sentiment");
// Write tweets to BigQuery
filteredFormattedTweets.apply(BigQueryIO.Write.named("Write annotated tweets to BigQuery").to(filteredTableReference).withSchema(annotatedTweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
开发者ID:LorenzoRidiNoovle,项目名称:gcp-black-friday-analytics,代码行数:58,代码来源:TwitterProcessor.java
示例11: apply
import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
@Override
public PCollectionView<String> apply(PInput input) {
return input.getPipeline()
.apply(PubsubIO.Read.topic(topic).maxNumRecords(1))
.apply(View.<String>asSingleton());
}
开发者ID:SunGard-Labs,项目名称:dataflow-whitepaper,代码行数:7,代码来源:PubSubStarter.java
注:本文中的com.google.cloud.dataflow.sdk.io.PubsubIO类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论