本文整理汇总了Java中com.google.cloud.dataflow.sdk.transforms.MapElements类的典型用法代码示例。如果您正苦于以下问题:Java MapElements类的具体用法?Java MapElements怎么用?Java MapElements使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MapElements类属于com.google.cloud.dataflow.sdk.transforms包,在下文中一共展示了MapElements类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: loadArtistCreditsByKey
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
@org.junit.Test
public void loadArtistCreditsByKey() {
DirectPipeline p = DirectPipeline.createForTest();
Long artistCreditIds[] = {634509L, 846332L};
PCollection<String> text = p.apply(Create.of(artistCreditLinesOfJson)).setCoder(StringUtf8Coder.of());
PCollection<KV<Long, MusicBrainzDataObject>> artistCredits = MusicBrainzTransforms.loadTableFromText(text, "artist_credit_name", "artist_credit");
PCollection<Long> artistCreditIdPCollection =
artistCredits.apply(MapElements.via((KV<Long, MusicBrainzDataObject> kv) -> {
Long k = kv.getKey();
return k;
})
.withOutputType(new TypeDescriptor<Long>() {
})
);
DataflowAssert.that(artistCreditIdPCollection).containsInAnyOrder(634509L, 846332L);
}
开发者ID:GoogleCloudPlatform,项目名称:bigquery-etl-dataflow-sample,代码行数:17,代码来源:MusicBrainzTransformsTest.java
示例2: loadArtistsWithMapping
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
@org.junit.Test
public void loadArtistsWithMapping() {
DirectPipeline p = DirectPipeline.createForTest();
PCollection<String> artistText = p.apply("artist", Create.of(artistLinesOfJson)).setCoder(StringUtf8Coder.of());
Map<String, PCollectionView<Map<Long, String>>> maps = new HashMap<>();
PCollection<String> areaMapText = p.apply("area", Create.of(areaLinesOfJson)).setCoder(StringUtf8Coder.of());
PCollectionView<Map<Long, String>> areamap = MusicBrainzTransforms.loadMapFromText(areaMapText, "id", "area");
maps.put("area", areamap);
PCollection<KV<Long, MusicBrainzDataObject>> loadedArtists = MusicBrainzTransforms.loadTableFromText(artistText, "artist", "id", maps);
PCollection<String> areas = loadedArtists.apply("areaLabels", MapElements.via((KV<Long, MusicBrainzDataObject> row) -> {
return (String) row.getValue().getColumnValue("area");
}).withOutputType(new TypeDescriptor<String>() {
}));
DataflowAssert.that(areas).satisfies((areaLabels) -> {
List<String> theList = new ArrayList<>();
areaLabels.forEach(theList::add);
assert (theList.contains("Canada"));
return null;
});
}
开发者ID:GoogleCloudPlatform,项目名称:bigquery-etl-dataflow-sample,代码行数:27,代码来源:MusicBrainzTransformsTest.java
示例3: apply
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> apply(PCollection<GameEvent> gameEvents) {
return gameEvents
.apply(
MapElements.via((GameEvent event) -> KV.of(event.getKey(field), event.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
.apply(Sum.<String>integersPerKey());
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:9,代码来源:Exercise1.java
示例4: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("parse timestamps",
MapElements.via(
(TableRow e) ->
Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(e.get("timestamp").toString())).toEpochMilli())
.withOutputType(TypeDescriptor.of(Long.class)))
.apply("max timestamp in window", Max.longsGlobally().withoutDefaults())
.apply("transform",
MapElements.via(
(Long t) -> {
TableRow ride = new TableRow();
ride.set("timestamp", Instant.ofEpochMilli(t).toString());
return ride;
})
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("write to PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:TimestampRides.java
示例5: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("extract dollars",
MapElements.via((TableRow x) -> Double.parseDouble(x.get("meter_increment").toString()))
.withOutputType(TypeDescriptor.of(Double.class)))
.apply("fixed window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("trigger",
Window.<Double>triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("sum whole window", Sum.doublesGlobally().withoutDefaults())
.apply("format rides", ParDo.of(new TransformRides()))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:32,代码来源:ExactDollarRides.java
示例6: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("key rides by rideid",
MapElements.via((TableRow ride) -> KV.of(ride.get("ride_id").toString(), ride))
.withOutputType(new TypeDescriptor<KV<String, TableRow>>() {}))
.apply("session windows on rides with early firings",
Window.<KV<String, TableRow>>into(
Sessions.withGapDuration(Duration.standardMinutes(60)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(2000))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("group ride points on same ride", Combine.perKey(new LatestPointCombine()))
.apply("discard key",
MapElements.via((KV<String, TableRow> a) -> a.getValue())
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:LatestRides.java
示例7: joinArtistCreditsWithRecordings
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
@org.junit.Test
public void joinArtistCreditsWithRecordings() {
DirectPipeline p = DirectPipeline.createForTest();
PCollection<String> artistCreditText = p.apply("artistCredits", Create.of(artistCreditLinesOfJson)).setCoder(StringUtf8Coder.of());
PCollection<KV<Long, MusicBrainzDataObject>> artistCredits = MusicBrainzTransforms.loadTableFromText(artistCreditText, "artist_credit_name", "artist_credit");
PCollection<String> recordingText = p.apply("recordings", Create.of(recordingLinesOfJson)).setCoder(StringUtf8Coder.of());
PCollection<KV<Long, MusicBrainzDataObject>> recordings = MusicBrainzTransforms.loadTableFromText(recordingText, "recording", "artist_credit");
PCollection<MusicBrainzDataObject> joinedRecordings = MusicBrainzTransforms.innerJoin("artist credits with recordings", artistCredits, recordings);
PCollection<Long> recordingIds = joinedRecordings.apply(MapElements.via((MusicBrainzDataObject mbo) -> (Long) mbo.getColumnValue("recording_id")).
withOutputType(new TypeDescriptor<Long>() {
}));
Long bieberRecording = 17069165L;
Long bieberRecording2 = 15508507L;
DataflowAssert.that(recordingIds).satisfies((longs) -> {
List<Long> theList = new ArrayList<Long>();
longs.forEach(theList::add);
assert (theList.contains(bieberRecording));
assert (theList.contains(bieberRecording2));
return null;
});
PCollection<Long> numberJoined = joinedRecordings.apply("count joined recrodings", Count.globally());
PCollection<Long> numberOfArtistCredits = artistCredits.apply("count artist credits", Count.globally());
DirectPipelineRunner.EvaluationResults results = p.run();
long joinedRecordingsCount = results.getPCollection(numberJoined).get(0);
assert (448 == joinedRecordingsCount);
}
开发者ID:GoogleCloudPlatform,项目名称:bigquery-etl-dataflow-sample,代码行数:38,代码来源:MusicBrainzTransformsTest.java
示例8: run
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void run() {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("chrome-oven-144308");
options.setFilesToStage(
detectClassPathResourcesToStage(
DataflowPipelineRunner.class.getClassLoader()
)
);
options.setStagingLocation("gs://dataflow-chrome-oven-144308/stagingForScheduledPipeline");
Pipeline p = Pipeline.create(options);
System.out.println("get here 0");
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
System.out.println("get here 1");
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String>perElement())
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(KV<String, Long> input) {
System.out.println("get here 3");
return input.getKey() + ": " + input.getValue();
}
}))
.apply(TextIO.Write.to("gs://dataflow-chrome-oven-144308/scheduled"));
p.run();
}
开发者ID:viktort,项目名称:appengine-cron-example,代码行数:41,代码来源:ScheduledMinimalWordCount.java
示例9: testText
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
@Test
public void testText() throws Exception {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
EvaluationResult res = SparkPipelineRunner.create().run(p);
res.close();
int count = 0;
Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", "bob: 2");
for (File f : tmpDir.getRoot().listFiles(new FileFilter() {
@Override public boolean accept(File pathname) {
return pathname.getName().matches("out-.*\\.txt");
}
})) {
count++;
for (String line : Files.readLines(f, Charsets.UTF_8)) {
assertTrue(line + " not found", expected.remove(line));
}
}
assertEquals(3, count);
assertTrue(expected.isEmpty());
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:28,代码来源:NumShardsTest.java
示例10: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// [START EXERCISE 6]:
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
// Window the user events into sessions with gap options.getSessionGap() minutes. Make sure
// to use an outputTimeFn that sets the output timestamp to the end of the window. This will
// allow us to compute means on sessions based on their end times, rather than their start
// times.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<KV<String, Integer>>, KV<String, Integer>>())
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
// In streaming we don't just ask "what is the mean value" we must ask "what is the mean
// value for some window of time". To compute periodic means of session durations, we
// re-window the session durations.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<Integer>, Integer>())
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [END EXERCISE 6]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:65,代码来源:Exercise6.java
示例11: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
.apply(
Window.named("WindowIntoSessions")
.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
.apply(
Window.named("WindowToExtractSessionMean")
.<Integer>into(
FixedWindows.of(
Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java
示例12: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
.apply(
ParDo.named("FilterOutSpammers")
.withSideInputs(spammersView)
.of(
new DoFn<GameEvent, GameEvent>() {
@Override
public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
c.output(c.element());
}
}
}))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:79,代码来源:Exercise5.java
示例13: main
import com.google.cloud.dataflow.sdk.transforms.MapElements; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// [START EXERCISE 5 PART b]:
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
// Use ParDo with spammersView side input to filter out spammers.
.apply(/* TODO: YOUR CODE GOES HERE */ new ChangeMe<PCollection<GameEvent>, GameEvent>())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [START EXERCISE 5 PART b]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:70,代码来源:Exercise5.java
注:本文中的com.google.cloud.dataflow.sdk.transforms.MapElements类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论