本文整理汇总了Java中com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows类的典型用法代码示例。如果您正苦于以下问题:Java FixedWindows类的具体用法?Java FixedWindows怎么用?Java FixedWindows使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FixedWindows类属于com.google.cloud.dataflow.sdk.transforms.windowing包,在下文中一共展示了FixedWindows类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: apply
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> apply(PCollection<GameEvent> infos) {
return infos
.apply(
"LeaderboardTeamFixedWindows",
Window.<GameEvent>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(THIRTY_SECONDS)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"));
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:21,代码来源:Exercise4.java
示例2: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("mark rides", MapElements.via(new MarkRides()))
.apply("count similar", Count.perKey())
.apply("format rides", MapElements.via(new TransformRides()))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:22,代码来源:CountRides.java
示例3: doVisitTransform
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Override
protected <PT extends PTransform<? super PInput, POutput>> void
doVisitTransform(TransformTreeNode node) {
@SuppressWarnings("unchecked")
PT transform = (PT) node.getTransform();
@SuppressWarnings("unchecked")
Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
if (transformClass.isAssignableFrom(Window.Bound.class)) {
WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
if (windowFn instanceof FixedWindows) {
setBatchDuration(((FixedWindows) windowFn).getSize());
} else if (windowFn instanceof SlidingWindows) {
if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
throw new UnsupportedOperationException("Spark does not support window offsets");
}
// Sliding window size might as well set the batch duration. Applying the transformation
// will add the "slide"
setBatchDuration(((SlidingWindows) windowFn).getSize());
} else if (!(windowFn instanceof GlobalWindows)) {
throw new IllegalStateException("Windowing function not supported: " + windowFn);
}
}
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:24,代码来源:StreamingWindowPipelineDetector.java
示例4: testRun
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
.setCoder(StringUtf8Coder.of());
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
EvaluationResult res = SparkPipelineRunner.create().run(p);
res.close();
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:18,代码来源:WindowedWordCountTest.java
示例5: testRun
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords =
p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
DataflowAssert.thatIterable(output.apply(View.<String>asIterable()))
.containsInAnyOrder(EXPECTED_COUNT_SET);
EvaluationResult res = SparkPipelineRunner.create(options).run(p);
res.close();
DataflowAssertStreaming.assertNoFailures(res);
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:24,代码来源:SimpleStreamingWordCountTest.java
示例6: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("parse timestamps",
MapElements.via(
(TableRow e) ->
Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(e.get("timestamp").toString())).toEpochMilli())
.withOutputType(TypeDescriptor.of(Long.class)))
.apply("max timestamp in window", Max.longsGlobally().withoutDefaults())
.apply("transform",
MapElements.via(
(Long t) -> {
TableRow ride = new TableRow();
ride.set("timestamp", Instant.ofEpochMilli(t).toString());
return ride;
})
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("write to PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:TimestampRides.java
示例7: generateCompleteWindowData
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public PCollection<KV<String, TSProto>> generateCompleteWindowData(Pipeline pipeline,
List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {
LOG.info("Check to see that time streams with missing 'ticks' have been corrected");
PCollection<KV<String, TSProto>> tsData = setupDataInput(pipeline, data);
PCollection<KV<String, TSProto>> windowedData =
tsData.apply("CandleResolutionWindow", Window.<KV<String, TSProto>>into(FixedWindows
.of(Duration.standardSeconds(((FXTimeSeriesPipelineOptions) pipeline.getOptions())
.getCandleResolution()))));
// Determine streams that are missing in this Window and generate values for them
PCollection<KV<String, TSProto>> generatedValues =
windowedData
.apply(
"DetectMissingTimeSeriesValues",
Combine.globally(new DetectMissingTimeSeriesValuesCombiner(packetConfig))
.withoutDefaults()).apply(ParDo.of(new CreateMissingTimeSeriesValuesDoFn()))
.setName("CreateMissingTimeSeriesValues");
// Flatten the live streams and the generated streams together
PCollection<KV<String, TSProto>> completeWindowData =
PCollectionList.of(windowedData).and(generatedValues)
.apply("MergeGeneratedLiveValues", Flatten.<KV<String, TSProto>>pCollections());
return completeWindowData;
}
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:33,代码来源:FXTimeSeriesPipelineSRGTests.java
示例8: testRun
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
// test read from Kafka
SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
Pipeline p = Pipeline.create(options);
Map<String, String> kafkaParams = ImmutableMap.of(
"metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
"auto.offset.reset", "smallest"
);
PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class,
StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC),
kafkaParams));
PCollection<KV<String, String>> windowedWords = kafkaInput
.apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn()));
DataflowAssert.thatIterable(formattedKV.apply(View.<String>asIterable()))
.containsInAnyOrder(EXPECTED);
EvaluationResult res = SparkPipelineRunner.create(options).run(p);
res.close();
DataflowAssertStreaming.assertNoFailures(res);
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:31,代码来源:KafkaStreamingTest.java
示例9: testRun
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedW1 =
w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollection<String> w2 =
p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedW2 =
w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());
DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
.containsInAnyOrder(EXPECTED_UNION);
EvaluationResult res = SparkPipelineRunner.create(options).run(p);
res.close();
DataflowAssertStreaming.assertNoFailures(res);
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:28,代码来源:FlattenStreamingTest.java
示例10: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
.apply(
Window.named("WindowIntoSessions")
.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
.apply(
Window.named("WindowToExtractSessionMean")
.<Integer>into(
FixedWindows.of(
Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java
示例11: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
.apply(
ParDo.named("FilterOutSpammers")
.withSideInputs(spammersView)
.of(
new DoFn<GameEvent, GameEvent>() {
@Override
public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
c.output(c.element());
}
}
}))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:79,代码来源:Exercise5.java
示例12: apply
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> apply(PCollection<GameEvent> input) {
return input
.apply(Window.into(FixedWindows.of(duration)))
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"));
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:7,代码来源:Exercise2.java
示例13: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// [START EXERCISE 5 PART b]:
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
// Use ParDo with spammersView side input to filter out spammers.
.apply(/* TODO: YOUR CODE GOES HERE */ new ChangeMe<PCollection<GameEvent>, GameEvent>())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [START EXERCISE 5 PART b]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:70,代码来源:Exercise5.java
示例14: main
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
// DataflowExampleUtils creates the necessary input sources to simplify execution of this
// Pipeline.
DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
/**
* Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input;
if (options.isUnbounded()) {
LOG.info("Reading from PubSub.");
/**
* Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
* specified as an argument. The data elements' timestamps will come from the pubsub
* injection.
*/
input = pipeline
.apply(PubsubIO.Read.topic(options.getPubsubTopic()));
} else {
/** Else, this is a bounded pipeline. Read from the GCS file. */
input = pipeline
.apply(TextIO.Read.from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn()));
}
/**
* Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
* Concept #5: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
* Concept #6: Format the results for a BigQuery table, then write to BigQuery.
* The BigQuery output source supports both bounded and unbounded data.
*/
wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
PipelineResult result = pipeline.run();
/**
* To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
* runs for a limited time, and publishes to the input PubSub topic.
*
* With an unbounded input source, you will need to explicitly shut down this pipeline when you
* are done with it, so that you do not continue to be charged for the instances. You can do
* this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
* pipelines. The PubSub topic will also be deleted at this time.
*/
exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
}
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:70,代码来源:WindowedWordCount.java
注:本文中的com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论