本文整理汇总了Java中org.apache.beam.sdk.transforms.Combine类的典型用法代码示例。如果您正苦于以下问题:Java Combine类的具体用法?Java Combine怎么用?Java Combine使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Combine类属于org.apache.beam.sdk.transforms包,在下文中一共展示了Combine类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testMergingCustomWindows
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
Instant startInstant = new Instant(0L);
List<TimestampedValue<String>> input = new ArrayList<>();
PCollection<String> inputCollection =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
// This one will be outside of bigWindow thus not merged
TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
PCollection<String> windowedCollection =
inputCollection.apply(Window.into(new CustomWindowFn<String>()));
PCollection<Long> count =
windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because timestamp is not in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:WindowTest.java
示例2: testMergingCustomWindowsKeyedCollection
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
Instant startInstant = new Instant(0L);
PCollection<KV<Integer, String>> inputCollection =
pipeline.apply(
Create.timestamped(
TimestampedValue.of(
KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
TimestampedValue.of(
KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
// This element is not contained within the bigWindow and not merged
TimestampedValue.of(
KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
PCollection<KV<Integer, String>> windowedCollection =
inputCollection.apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
PCollection<Long> count =
windowedCollection.apply(
Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because it is not contained in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:WindowTest.java
示例3: testHifIOWithElastic
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* Test to read data from embedded Elasticsearch instance and verify whether data is read
* successfully.
*/
@Test
public void testHifIOWithElastic() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
Configuration conf = getConfiguration();
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
// Verify that the count of objects fetched using HIFInputFormat IO is correct.
PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
PCollection<String> textValues = values.apply(transformFunc);
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:HIFIOWithElasticTest.java
示例4: testHifIOWithElastic
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* This test reads data from the Elasticsearch instance and verifies whether data is read
* successfully.
*/
@Test
public void testHifIOWithElastic() throws SecurityException, IOException {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
final long expectedRowCount = 1000L;
String expectedHashCode = "42e254c8689050ed0a617ff5e80ea392";
Configuration conf = getConfiguration(options);
PCollection<KV<Text, LinkedMapWritable>> esData =
pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
// Verify that the count of objects fetched using HIFInputFormat IO is correct.
PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
PAssert.thatSingleton(count).isEqualTo(expectedRowCount);
PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
PCollection<String> textValues = values.apply(transformFunc);
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:HIFIOElasticIT.java
示例5: testHIFReadForCassandra
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* This test reads data from the Cassandra instance and verifies if data is read successfully.
*/
@Test
public void testHIFReadForCassandra() {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "1a30ad400afe4ebf5fde75f5d2d95408";
Long expectedRecordsCount = 1000L;
Configuration conf = getConfiguration(options);
PCollection<KV<Long, String>> cassandraData = pipeline.apply(HadoopInputFormatIO
.<Long, String>read().withConfiguration(conf).withValueTranslation(myValueTranslate));
PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
.isEqualTo(expectedRecordsCount);
PCollection<String> textValues = cassandraData.apply(Values.<String>create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:HIFIOCassandraIT.java
示例6: testHIFReadForCassandraQuery
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* This test reads data from the Cassandra instance based on query and verifies if data is read
* successfully.
*/
@Test
public void testHIFReadForCassandraQuery() {
String expectedHashCode = "7bead6d6385c5f4dd0524720cd320b49";
Long expectedNumRows = 1L;
Configuration conf = getConfiguration(options);
conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
+ " where token(y_id) > ? and token(y_id) <= ? "
+ "and field0 = 'user48:field0:431531'");
PCollection<KV<Long, String>> cassandraData =
pipeline.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
.withValueTranslation(myValueTranslate));
PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
.isEqualTo(expectedNumRows);
PCollection<String> textValues = cassandraData.apply(Values.<String>create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:HIFIOCassandraIT.java
示例7: testHIFReadForCassandra
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* Test to read data from embedded Cassandra instance and verify whether data is read
* successfully.
* @throws Exception
*/
@Test
public void testHIFReadForCassandra() throws Exception {
// Expected hashcode is evaluated during insertion time one time and hardcoded here.
String expectedHashCode = "1b9780833cce000138b9afa25ba63486";
Configuration conf = getConfiguration();
PCollection<KV<Long, String>> cassandraData =
p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
.withValueTranslation(myValueTranslate));
// Verify the count of data retrieved from Cassandra matches expected count.
PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
.isEqualTo(TEST_DATA_ROW_COUNT);
PCollection<String> textValues = cassandraData.apply(Values.<String>create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
p.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:HIFIOWithEmbeddedCassandraTest.java
示例8: testHIFReadForCassandraQuery
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
* Test to read data from embedded Cassandra instance based on query and verify whether data is
* read successfully.
*/
@Test
public void testHIFReadForCassandraQuery() throws Exception {
Long expectedCount = 1L;
String expectedChecksum = "f11caabc7a9fc170e22b41218749166c";
Configuration conf = getConfiguration();
conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
+ " where token(id) > ? and token(id) <= ? and scientist='Faraday1' allow filtering");
PCollection<KV<Long, String>> cassandraData =
p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
.withValueTranslation(myValueTranslate));
// Verify the count of data retrieved from Cassandra matches expected count.
PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
.isEqualTo(expectedCount);
PCollection<String> textValues = cassandraData.apply(Values.<String>create());
// Verify the output values using checksum comparison.
PCollection<String> consolidatedHashcode =
textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedChecksum);
p.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:HIFIOWithEmbeddedCassandraTest.java
示例9: RawCombine
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private RawCombine(RunnerApi.PTransform protoTransform,
RehydratedComponents rehydratedComponents) throws IOException {
this.protoTransform = protoTransform;
this.rehydratedComponents = rehydratedComponents;
this.spec = protoTransform.getSpec();
this.payload = CombinePayload.parseFrom(spec.getPayload());
// Eagerly extract the coder to throw a good exception here
try {
this.accumulatorCoder =
(Coder<AccumT>) rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
} catch (IOException exc) {
throw new IllegalArgumentException(
String.format(
"Failure extracting accumulator coder with id '%s' for %s",
payload.getAccumulatorCoderId(), Combine.class.getSimpleName()),
exc);
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:CombineTranslation.java
示例10: getAdditionalInputs
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
try {
additionalInputs.put(
new TupleTag<>(sideInputEntry.getKey()),
rehydratedComponents.getPCollection(
protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
} catch (IOException exc) {
throw new IllegalStateException(
String.format(
"Could not find input with name %s for %s transform",
sideInputEntry.getKey(), Combine.class.getSimpleName()));
}
}
return additionalInputs;
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombineTranslation.java
示例11: toProto
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@VisibleForTesting
static CombinePayload toProto(
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> combine, SdkComponents sdkComponents)
throws IOException {
GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
try {
Coder<?> accumulatorCoder = extractAccumulatorCoder(combineFn, (AppliedPTransform) combine);
Map<String, SideInput> sideInputs = new HashMap<>();
return RunnerApi.CombinePayload.newBuilder()
.setAccumulatorCoderId(sdkComponents.registerCoder(accumulatorCoder))
.putAllSideInputs(sideInputs)
.setCombineFn(toProto(combineFn))
.build();
} catch (CannotProvideCoderException e) {
throw new IllegalStateException(e);
}
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:CombineTranslation.java
示例12: extractAccumulatorCoder
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private static <K, InputT, AccumT> Coder<AccumT> extractAccumulatorCoder(
GlobalCombineFn<InputT, AccumT, ?> combineFn,
AppliedPTransform<PCollection<KV<K, InputT>>, ?, Combine.PerKey<K, InputT, ?>> transform)
throws CannotProvideCoderException {
@SuppressWarnings("unchecked")
PCollection<KV<K, InputT>> mainInput =
(PCollection<KV<K, InputT>>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(transform));
KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) mainInput.getCoder();
return AppliedCombineFn.withInputCoder(
combineFn,
transform.getPipeline().getCoderRegistry(),
inputCoder,
transform.getTransform().getSideInputs(),
((PCollection<?>) Iterables.getOnlyElement(transform.getOutputs().values()))
.getWindowingStrategy())
.getAccumulatorCoder();
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombineTranslation.java
示例13: canTranslate
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
boolean canTranslate(
Combine.PerKey<K, InputT, OutputT> transform,
FlinkStreamingTranslationContext context) {
// if we have a merging window strategy and side inputs we cannot
// translate as a proper combine. We have to group and then run the combine
// over the final grouped values.
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:FlinkStreamingTransformTranslators.java
示例14: translateHelper
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private <K, InputT, OutputT> void translateHelper(
final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
TranslationContext context) {
Combine.GroupedValues<K, InputT, OutputT> originalTransform =
primitiveTransform.getOriginalCombine();
StepTranslationContext stepContext =
context.addStep(primitiveTransform, "CombineValues");
translateInputs(
stepContext,
context.getInput(primitiveTransform),
originalTransform.getSideInputs(),
context);
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
originalTransform.getAppliedFn(
context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
context.getInput(primitiveTransform).getCoder(),
context.getInput(primitiveTransform).getWindowingStrategy());
stepContext.addEncodingInput(fn.getAccumulatorCoder());
stepContext.addInput(
PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
stepContext.addOutput(context.getOutput(primitiveTransform));
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DataflowPipelineTranslator.java
示例15: getReplacementTransform
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
AppliedPTransform<
PCollection<InputT>,
PValue,
PTransform<PCollection<InputT>, PValue>> transform) {
Combine.GloballyAsSingletonView<?, ?> combineTransform =
(Combine.GloballyAsSingletonView) transform.getTransform();
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new BatchViewOverrides.BatchViewAsSingleton(
runner,
findCreatePCollectionView(transform),
(CombineFn) combineTransform.getCombineFn(),
combineTransform.getFanout()));
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:DataflowRunner.java
示例16: expand
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PDone expand(PCollection<KV<KV<String, String>, Long>> similarPairs) {
return similarPairs
.apply(Sum.<KV<String, String>>longsPerKey())
.apply(Combine.globally(TO_LIST))
.apply("PCoAAnalysis", ParDo.of(new PCoAnalysis(dataIndices)))
.apply("FormatGraphData", ParDo
.of(new DoFn<Iterable<PCoAnalysis.GraphResult>, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterable<PCoAnalysis.GraphResult> graphResults = c.element();
for (PCoAnalysis.GraphResult result : graphResults) {
c.output(result.toString());
}
}
}))
.apply("WriteCounts", TextIO.write().to(outputFile));
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:19,代码来源:OutputPCoAFile.java
示例17: expand
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
// stationId, LaneInfo => stationId + max lane flow info
PCollection<KV<String, LaneInfo>> flowMaxes =
flowInfo.apply(Combine.<String, LaneInfo>perKey(
new MaxFlow()));
// <stationId, max lane flow info>... => row...
PCollection<TableRow> results = flowMaxes.apply(
ParDo.of(new FormatMaxesFn()));
return results;
}
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:TrafficMaxLaneFlow.java
示例18: expand
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// row... => <word, play_name> ...
PCollection<KV<String, String>> words = rows.apply(
ParDo.of(new ExtractLargeWordsFn()));
// word, play_name => word, all_plays ...
PCollection<KV<String, String>> wordAllPlays =
words.apply(Combine.<String, String>perKey(
new ConcatWords()));
// <word, all_plays>... => row...
PCollection<TableRow> results = wordAllPlays.apply(
ParDo.of(new FormatShakespeareOutputFn()));
return results;
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombinePerKeyExamples.java
示例19: expand
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<Long> expand(PCollection<InputT> input) {
return input
.apply(
"Compute HyperLogLog Structure",
Combine.globally(
ApproximateDistinctFn.<InputT>create(input.getCoder())
.withPrecision(this.precision())
.withSparseRepresentation(this.sparsePrecision())))
.apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally()));
}
开发者ID:apache,项目名称:beam,代码行数:12,代码来源:ApproximateDistinct.java
示例20: MovingFunction
import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
int numSignificantBuckets, int numSignificantSamples,
Combine.BinaryCombineLongFn function) {
this.sampleUpdateMs = sampleUpdateMs;
this.numSignificantBuckets = numSignificantBuckets;
this.numSignificantSamples = numSignificantSamples;
this.function = function;
int n = (int) (samplePeriodMs / sampleUpdateMs);
buckets = new long[n];
Arrays.fill(buckets, function.identity());
numSamples = new int[n];
Arrays.fill(numSamples, 0);
currentMsSinceEpoch = -1;
currentIndex = -1;
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:MovingFunction.java
注:本文中的org.apache.beam.sdk.transforms.Combine类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论