本文整理汇总了Java中com.google.cloud.dataflow.sdk.transforms.Combine类的典型用法代码示例。如果您正苦于以下问题:Java Combine类的具体用法?Java Combine怎么用?Java Combine使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Combine类属于com.google.cloud.dataflow.sdk.transforms包,在下文中一共展示了Combine类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: apply
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, DuplicationMetrics>> apply(final PCollection<GATKRead> preads) {
return preads
.apply(Filter.by(
new SerializableFunction<GATKRead, Boolean>() {
private static final long serialVersionUID = 0;
public Boolean apply(GATKRead element) {
return !element.isSecondaryAlignment() && !element.isSupplementaryAlignment();
}
}))
.apply(generateMetricsByLibrary())
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(DuplicationMetrics.class)))
// Combine metrics for reads from the same library.
.apply(Combine.<String, DuplicationMetrics>perKey(new CombineMetricsFn()))
// For each library, finalize the metrics by computing derived metrics and dividing paired counters
// by 2.
.apply(finalizeMetrics());
}
开发者ID:broadinstitute,项目名称:gatk-dataflow,代码行数:20,代码来源:MarkDuplicatesDataflowUtils.java
示例2: createAggregator
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
/**
* Creates and aggregator and associates it with the specified name.
*
* @param named Name of aggregator.
* @param combineFn Combine function used in aggregation.
* @param <IN> Type of inputs to aggregator.
* @param <INTER> Intermediate data type
* @param <OUT> Type of aggregator outputs.
* @return Specified aggregator
*/
public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
String named,
Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
@SuppressWarnings("unchecked")
Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named);
if (aggregator == null) {
@SuppressWarnings("unchecked")
NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
new NamedAggregators.CombineFunctionState<>(
(Combine.CombineFn<IN, INTER, OUT>) combineFn,
(Coder<IN>) getCoder(combineFn),
this);
accum.add(new NamedAggregators(named, state));
aggregator = new SparkAggregator<>(named, state);
aggregators.put(named, aggregator);
}
return aggregator;
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:29,代码来源:SparkRuntimeContext.java
示例3: testCSVToMapLineCombineFn
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void testCSVToMapLineCombineFn() {
final String[] csv = new String[] {
"000,Tokyo,120",
"001,Osaka,100",
"002,Kyoto,140"
};
final List<String> csvlist = Arrays.asList(csv);
Pipeline p = TestPipeline.create();
PCollection<String> maplines = p.apply(Create.of(csvlist)).setCoder(StringUtf8Coder.of());
PCollectionView<Map<String,String>> mapview = maplines.apply(Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());
final String[] dummy = new String[] {
"000",
"001",
"002"
};
List<String> dummylist = Arrays.asList(dummy);
DoFnTester<String,String> fnTester = DoFnTester.of(new AAA(mapview));
fnTester.setSideInputInGlobalWindow(mapview, csvlist);
//dummylines.apply(ParDo.of(fnTester));
List<String> results = fnTester.processBatch(dummylist);
System.out.println(results);
//p.apply()
}
开发者ID:topgate,项目名称:retail-demo,代码行数:30,代码来源:MainPipelineTest.java
示例4: main
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) {
CustomPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.named("read from PubSub")
.topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()))
.apply("key rides by rideid",
MapElements.via((TableRow ride) -> KV.of(ride.get("ride_id").toString(), ride))
.withOutputType(new TypeDescriptor<KV<String, TableRow>>() {}))
.apply("session windows on rides with early firings",
Window.<KV<String, TableRow>>into(
Sessions.withGapDuration(Duration.standardMinutes(60)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(2000))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("group ride points on same ride", Combine.perKey(new LatestPointCombine()))
.apply("discard key",
MapElements.via((KV<String, TableRow> a) -> a.getValue())
.withOutputType(TypeDescriptor.of(TableRow.class)))
.apply(PubsubIO.Write.named("WriteToPubsub")
.topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
.withCoder(TableRowJsonCoder.of()));
p.run();
}
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:LatestRides.java
示例5: apply
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(ParDo.named("BreakFusion").of(new DummyMapFn<T>()))
.apply(Combine.<String, T>perKey(new First<T>()))
.apply(Values.<T>create());
}
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:8,代码来源:BreakFusion.java
示例6: apply
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, WorkflowArgs>> apply(
PCollection<KV<String, WorkflowArgs>> input) {
return input
.apply(ParDo.named("Prepare").of(new Gather(task)))
.apply(Combine.perKey(new SortArgs()))
.apply(ParDo.named("CombineOutputs").of(new CombineArgs()));
}
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:9,代码来源:DockerDo.java
示例7: apply
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, WorkflowArgs>> apply(
PCollectionList<KV<String, WorkflowArgs>> input) {
return input
.apply(Flatten.<KV<String, WorkflowArgs>>pCollections())
.apply(Combine.globally(new Merge()));
}
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:8,代码来源:MergeBranches.java
示例8: generateCompleteWindowData
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public PCollection<KV<String, TSProto>> generateCompleteWindowData(Pipeline pipeline,
List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {
LOG.info("Check to see that time streams with missing 'ticks' have been corrected");
PCollection<KV<String, TSProto>> tsData = setupDataInput(pipeline, data);
PCollection<KV<String, TSProto>> windowedData =
tsData.apply("CandleResolutionWindow", Window.<KV<String, TSProto>>into(FixedWindows
.of(Duration.standardSeconds(((FXTimeSeriesPipelineOptions) pipeline.getOptions())
.getCandleResolution()))));
// Determine streams that are missing in this Window and generate values for them
PCollection<KV<String, TSProto>> generatedValues =
windowedData
.apply(
"DetectMissingTimeSeriesValues",
Combine.globally(new DetectMissingTimeSeriesValuesCombiner(packetConfig))
.withoutDefaults()).apply(ParDo.of(new CreateMissingTimeSeriesValuesDoFn()))
.setName("CreateMissingTimeSeriesValues");
// Flatten the live streams and the generated streams together
PCollection<KV<String, TSProto>> completeWindowData =
PCollectionList.of(windowedData).and(generatedValues)
.apply("MergeGeneratedLiveValues", Flatten.<KV<String, TSProto>>pCollections());
return completeWindowData;
}
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:33,代码来源:FXTimeSeriesPipelineSRGTests.java
示例9: grouped
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
@Override
public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
@SuppressWarnings("unchecked")
JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
(JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
context.setOutputRDD(transform,
inRDD.map(new KVFunction<>(keyed)));
}
};
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:14,代码来源:TransformTranslator.java
示例10: getCoder
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
try {
if (combiner.getClass() == Sum.SumIntegerFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
} else if (combiner.getClass() == Sum.SumLongFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
} else if (combiner.getClass() == Sum.SumDoubleFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
} else if (combiner.getClass() == Min.MinIntegerFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
} else if (combiner.getClass() == Min.MinLongFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
} else if (combiner.getClass() == Min.MinDoubleFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
} else if (combiner.getClass() == Max.MaxIntegerFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
} else if (combiner.getClass() == Max.MaxLongFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
} else if (combiner.getClass() == Max.MaxDoubleFn.class) {
return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
} else {
throw new IllegalArgumentException("unsupported combiner in Aggregator: "
+ combiner.getClass().getName());
}
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine default coder for combiner", e);
}
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:29,代码来源:SparkRuntimeContext.java
示例11: CombineFunctionState
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public CombineFunctionState(
Combine.CombineFn<IN, INTER, OUT> combineFn,
Coder<IN> inCoder,
SparkRuntimeContext ctxt) {
this.combineFn = combineFn;
this.inCoder = inCoder;
this.ctxt = ctxt;
this.state = combineFn.createAccumulator();
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:10,代码来源:NamedAggregators.java
示例12: readObject
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ctxt = (SparkRuntimeContext) ois.readObject();
combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
inCoder = (Coder<IN>) ois.readObject();
try {
state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
.decode(ois, Coder.Context.NESTED);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
}
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:13,代码来源:NamedAggregators.java
示例13: test
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void test() throws Exception {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
EvaluationResult res = SparkPipelineRunner.create().run(p);
assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
res.close();
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:12,代码来源:CombineGloballyTest.java
示例14: test
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void test() throws Exception {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
List<String> empty = Collections.emptyList();
PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
EvaluationResult res = SparkPipelineRunner.create().run(p);
assertEquals("", Iterables.getOnlyElement(res.get(output)));
res.close();
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:13,代码来源:EmptyInputTest.java
示例15: main
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// [START EXERCISE 6]:
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
// Window the user events into sessions with gap options.getSessionGap() minutes. Make sure
// to use an outputTimeFn that sets the output timestamp to the end of the window. This will
// allow us to compute means on sessions based on their end times, rather than their start
// times.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<KV<String, Integer>>, KV<String, Integer>>())
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
// In streaming we don't just ask "what is the mean value" we must ask "what is the mean
// value for some window of time". To compute periodic means of session durations, we
// re-window the session durations.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<Integer>, Integer>())
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [END EXERCISE 6]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:65,代码来源:Exercise6.java
示例16: main
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
.apply(
Window.named("WindowIntoSessions")
.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
.apply(
Window.named("WindowToExtractSessionMean")
.<Integer>into(
FixedWindows.of(
Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java
示例17: createCompleteAggregates
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public PCollection<KV<String, TSAggValueProto>> createCompleteAggregates(Pipeline pipeline,
List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {
PCollection<KV<String, TSProto>> completeWindowData =
generateCompleteWindowData(pipeline, data, packetConfig);
PCollection<KV<String, TSAggValueProto>> parital =
completeWindowData.apply("CreatePartialAggregates",
Combine.perKey(new PartialTimeSeriesAggCombiner()));
PCollection<KV<String, TSAggValueProto>> paritalWithWindowBoundary =
parital.apply(ParDo.of(new EmbedWindowTimeIntoAggregateDoFn()));
PCollection<KV<String, TSAggValueProto>> completeAggregationStage1 =
paritalWithWindowBoundary.apply(
"completeAggregationStage1",
Window.<KV<String, TSAggValueProto>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.accumulatingFiredPanes());
PCollection<KV<String, TSAggValueProto>> completeAggregationStage2 =
completeAggregationStage1.apply("CreateCompleteCandles",
Combine.perKey(new CompleteTimeSeriesAggCombiner())).apply("FlattenIterables",
ParDo.of(new FlattenKVIterableDoFn()));
PCollection<KV<String, TSAggValueProto>> completeAggregationStage3 =
completeAggregationStage2.apply("ResetTimestampsAfterGlobalWindow",
ParDo.of(new DoFn<KV<String, TSAggValueProto>, KV<String, TSAggValueProto>>() {
@Override
public void processElement(
DoFn<KV<String, TSAggValueProto>, KV<String, TSAggValueProto>>.ProcessContext c)
throws Exception {
if (c.timestamp().isBefore(new Instant(32530703764000L))) {
if (c.timestamp().isAfter(
new Instant(c.element().getValue().getCloseState().getTime()))) {
LOG.error("BUG There was a timestamp before current :: "
+ TextFormat.shortDebugString(c.element().getValue()));
} else {
c.outputWithTimestamp(c.element(), new Instant(c.element().getValue()
.getCloseTime()));
}
}
}
}));
return completeAggregationStage3;
}
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:57,代码来源:FXTimeSeriesPipelineSRGTests.java
示例18: apply
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<FlagStatus> apply(final PCollection<GATKRead> input) {
return input.apply(Combine.globally(new CombineCounts()));
}
开发者ID:broadinstitute,项目名称:gatk-dataflow,代码行数:5,代码来源:FlagStatusDataflowTransform.java
示例19: createAggregatorInternal
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
String named,
Combine.CombineFn<AI, ?, AO> combineFn) {
return mRuntimeContext.createAggregator(named, combineFn);
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:7,代码来源:SparkProcessContext.java
示例20: combineGlobally
import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
return new TransformEvaluator<Combine.Globally<I, O>>() {
@Override
public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
@SuppressWarnings("unchecked")
JavaRDDLike<WindowedValue<I>, ?> inRdd =
(JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
final Coder<I> iCoder = context.getInput(transform).getCoder();
final Coder<A> aCoder;
try {
aCoder = globally.getAccumulatorCoder(
context.getPipeline().getCoderRegistry(), iCoder);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
}
// Use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
JavaRDD<byte[]> inRddBytes = inRdd
.map(WindowingHelpers.<I>unwindowFunction())
.map(CoderHelpers.toByteFunction(iCoder));
/*A*/ byte[] acc = inRddBytes.aggregate(
CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
@Override
public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
A a = CoderHelpers.fromByteArray(ab, aCoder);
I i = CoderHelpers.fromByteArray(ib, iCoder);
return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
}
},
new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
@Override
public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
// don't use Guava's ImmutableList.of as values may be null
List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
A merged = globally.mergeAccumulators(accumulators);
return CoderHelpers.toByteArray(merged, aCoder);
}
}
);
O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
Coder<O> coder = context.getOutput(transform).getCoder();
JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
// don't use Guava's ImmutableList.of as output may be null
CoderHelpers.toByteArrays(Collections.singleton(output), coder));
context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
.map(WindowingHelpers.<O>windowFunction()));
}
};
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:60,代码来源:TransformTranslator.java
注:本文中的com.google.cloud.dataflow.sdk.transforms.Combine类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论