本文整理汇总了Java中com.google.cloud.dataflow.sdk.transforms.Flatten类的典型用法代码示例。如果您正苦于以下问题:Java Flatten类的具体用法?Java Flatten怎么用?Java Flatten使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Flatten类属于com.google.cloud.dataflow.sdk.transforms包,在下文中一共展示了Flatten类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: flattenPColl
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
@SuppressWarnings("unchecked")
@Override
public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
PCollectionList<T> pcs = sec.getInput(transform);
JavaDStream<WindowedValue<T>> first =
(JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
for (int i = 1; i < pcs.size(); i++) {
rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
}
JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
sec.setStream(transform, dstream);
}
};
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:19,代码来源:StreamingTransformTranslator.java
示例2: apply
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
@Override
public PCollection<KV<String, String>> apply(PInput input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each data file
// and add its output to a PCollectionList.
PCollectionList<KV<String, String>> filesToLines = PCollectionList.empty(pipeline);
for (final String fileLocation : files) {
PTransform<PInput, PCollection<String>> inputSource
= TextIO.Read.from(fileLocation)
.named("TextIO.Read(" + fileLocation + ")");
PCollection<KV<String, String>> oneFileToLines = pipeline
.apply(inputSource)
.apply(WithKeys.<String, String>of(fileLocation));
filesToLines = filesToLines.and(oneFileToLines);
}
return filesToLines.apply(Flatten.<KV<String, String>> pCollections())
.setCoder(getDefaultOutputCoder());
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-precipitation-pipeline,代码行数:24,代码来源:ReadDataWithFileName.java
示例3: apply
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
@Override
public PCollection<KV<String, WorkflowArgs>> apply(
PCollectionList<KV<String, WorkflowArgs>> input) {
return input
.apply(Flatten.<KV<String, WorkflowArgs>>pCollections())
.apply(Combine.globally(new Merge()));
}
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:8,代码来源:MergeBranches.java
示例4: generateCompleteWindowData
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
public PCollection<KV<String, TSProto>> generateCompleteWindowData(Pipeline pipeline,
List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {
LOG.info("Check to see that time streams with missing 'ticks' have been corrected");
PCollection<KV<String, TSProto>> tsData = setupDataInput(pipeline, data);
PCollection<KV<String, TSProto>> windowedData =
tsData.apply("CandleResolutionWindow", Window.<KV<String, TSProto>>into(FixedWindows
.of(Duration.standardSeconds(((FXTimeSeriesPipelineOptions) pipeline.getOptions())
.getCandleResolution()))));
// Determine streams that are missing in this Window and generate values for them
PCollection<KV<String, TSProto>> generatedValues =
windowedData
.apply(
"DetectMissingTimeSeriesValues",
Combine.globally(new DetectMissingTimeSeriesValuesCombiner(packetConfig))
.withoutDefaults()).apply(ParDo.of(new CreateMissingTimeSeriesValuesDoFn()))
.setName("CreateMissingTimeSeriesValues");
// Flatten the live streams and the generated streams together
PCollection<KV<String, TSProto>> completeWindowData =
PCollectionList.of(windowedData).and(generatedValues)
.apply("MergeGeneratedLiveValues", Flatten.<KV<String, TSProto>>pCollections());
return completeWindowData;
}
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:33,代码来源:FXTimeSeriesPipelineSRGTests.java
示例5: flattenPColl
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
@SuppressWarnings("unchecked")
@Override
public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
PCollectionList<T> pcs = context.getInput(transform);
JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
for (int i = 0; i < rdds.length; i++) {
rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
}
JavaRDD<WindowedValue<T>> rdd = context.getSparkContext().union(rdds);
context.setOutputRDD(transform, rdd);
}
};
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:16,代码来源:TransformTranslator.java
示例6: testRun
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
PCollectionList<String> list = PCollectionList.of(w1).and(w2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());
PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
CountWords countWords = new CountWords(regexView);
PCollectionTuple luc = union.apply(countWords);
PCollection<Long> unique = luc.get(lowerCnts).apply(
ApproximateUnique.<KV<String, Long>>globally(16));
EvaluationResult res = SparkPipelineRunner.create().run(p);
Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
Assert.assertEquals("are", actualLower.iterator().next().getKey());
Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
Iterable<Long> actualUniqCount = res.get(unique);
Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
Assert.assertEquals(18, actualTotalWords);
int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
Assert.assertEquals(6, actualMaxWordLength);
AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
.getTotalWordsAggregator());
Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
res.close();
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:33,代码来源:MultiOutputWordCountTest.java
示例7: testRun
import com.google.cloud.dataflow.sdk.transforms.Flatten; //导入依赖的package包/类
@Test
public void testRun() throws Exception {
SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedW1 =
w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollection<String> w2 =
p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedW2 =
w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());
DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
.containsInAnyOrder(EXPECTED_UNION);
EvaluationResult res = SparkPipelineRunner.create(options).run(p);
res.close();
DataflowAssertStreaming.assertNoFailures(res);
}
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:28,代码来源:FlattenStreamingTest.java
注:本文中的com.google.cloud.dataflow.sdk.transforms.Flatten类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论