本文整理汇总了Java中org.apache.beam.sdk.transforms.windowing.Window类的典型用法代码示例。如果您正苦于以下问题:Java Window类的具体用法?Java Window怎么用?Java Window使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Window类属于org.apache.beam.sdk.transforms.windowing包,在下文中一共展示了Window类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(Options.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
KafkaIO.Read<byte[], String> kafkaIOReader = KafkaIO.read()
.withBootstrapServers("192.168.99.100:32771")
.withTopics(Arrays.asList("beam".split(",")))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withValueCoder(StringUtf8Coder.of());
p.apply(kafkaIOReader.withoutMetadata())
.apply(Values.<String>create())
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
}
开发者ID:0x0ece,项目名称:beam-starter,代码行数:25,代码来源:StreamWordCount.java
示例2: main
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
/** Run a batch pipeline to calculate hourly team scores. */
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("ReadLogs", TextIO.read().from(options.getInput()))
.apply("SetTimestamps", WithTimestamps.of(new SetTimestampFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(ONE_HOUR)))
.apply("TeamScores", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:18,代码来源:HourlyTeamScore.java
示例3: main
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServer())
.withTopic(options.getTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampFn(new SetTimestampFn()))
.apply("Values", ParDo.of(new ValuesFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TEN_MINUTES)
.accumulatingFiredPanes())
.apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java
示例4: expand
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos.apply("LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:LeaderBoard.java
示例5: testElementsAtAlmostPositiveInfinity
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
public void testElementsAtAlmostPositiveInfinity() throws IOException {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
CreateStream<String> source =
CreateStream.of(StringUtf8Coder.of(), batchDuration())
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
.advanceNextBatchWatermarkToInfinity();
FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
PCollection<String> windowedValues = p.apply(source)
.apply(Window.<String>into(windows))
.apply(WithKeys.<Integer, String>of(1))
.apply(GroupByKey.<Integer, String>create())
.apply(Values.<Iterable<String>>create())
.apply(Flatten.<String>iterables());
PAssert.that(windowedValues)
.inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
.containsInAnyOrder("foo", "bar");
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:CreateStreamTest.java
示例6: testTotalFlow
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testTotalFlow () {
PCollection<KV<String, Integer>> flow = pipeline
.apply(Create.timestamped(TIME_STAMPED_INPUT))
.apply(ParDo.of(new ExtractFlowInfo()));
PCollection<TableRow> totalFlow = flow
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new TotalFlow("default"));
PCollection<String> results = totalFlow.apply(ParDo.of(new FormatResults()));
PAssert.that(results)
.containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2));
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:TriggerExampleTest.java
示例7: expand
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Override
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
.apply(Reify.<T>windows())
.apply(
WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
.withKeyType(new TypeDescriptor<Integer>() {}))
.apply(
Window.into(
new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
originalWindowFn.windowCoder()))
.triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
// all values have the same key so they all appear as a single output element
.apply(GroupByKey.<Integer, ValueInSingleWindow<T>>create())
.apply(Values.<Iterable<ValueInSingleWindow<T>>>create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:GatherAllPanes.java
示例8: window
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
return new TransformEvaluator<Window.Assign<T>>() {
@Override
public void evaluate(Window.Assign<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<T>> inRDD =
((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
if (TranslationUtils.skipAssignWindows(transform, context)) {
context.putDataset(transform, new BoundedDataset<>(inRDD));
} else {
context.putDataset(transform, new BoundedDataset<>(
inRDD.map(new SparkAssignWindowFn<>(transform.getWindowFn()))));
}
}
@Override
public String toNativeString() {
return "map(new <windowFn>())";
}
};
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:TransformTranslator.java
示例9: testReshuffleAfterSessionsAndGroupByKey
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterSessionsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input = pipeline
.apply(Create.of(GBK_TESTABLE_KVS)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(10))))
.apply(GroupByKey.<String, Integer>create());
PCollection<KV<String, Iterable<Integer>>> output = input
.apply(Reshuffle.<String, Iterable<Integer>>of());
PAssert.that(output).satisfies(new AssertThatHasExpectedContents());
assertEquals(
input.getWindowingStrategy(),
output.getWindowingStrategy());
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReshuffleTest.java
示例10: testReshuffleAfterFixedWindowsAndGroupByKey
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterFixedWindowsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input = pipeline
.apply(Create.of(GBK_TESTABLE_KVS)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(Window.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(10L))))
.apply(GroupByKey.<String, Integer>create());
PCollection<KV<String, Iterable<Integer>>> output = input
.apply(Reshuffle.<String, Iterable<Integer>>of());
PAssert.that(output).satisfies(new AssertThatHasExpectedContents());
assertEquals(
input.getWindowingStrategy(),
output.getWindowingStrategy());
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReshuffleTest.java
示例11: testReshuffleAfterSlidingWindowsAndGroupByKey
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input = pipeline
.apply(Create.of(GBK_TESTABLE_KVS)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(Window.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(10L))))
.apply(GroupByKey.<String, Integer>create());
PCollection<KV<String, Iterable<Integer>>> output = input
.apply(Reshuffle.<String, Iterable<Integer>>of());
PAssert.that(output).satisfies(new AssertThatHasExpectedContents());
assertEquals(
input.getWindowingStrategy(),
output.getWindowingStrategy());
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReshuffleTest.java
示例12: testReshuffleWithTimestampsStreaming
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testReshuffleWithTimestampsStreaming() {
TestStream<Long> stream =
TestStream.create(VarLongCoder.of())
.advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L)))
.addElements(
TimestampedValue.of(0L, new Instant(0L)),
TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))),
TimestampedValue.of(
2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L))))
.advanceWatermarkToInfinity();
PCollection<KV<String, Long>> input =
pipeline
.apply(stream).apply(WithKeys.<String, Long>of(""))
.apply(
Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(10L))));
PCollection<KV<String, Long>> reshuffled = input.apply(Reshuffle.<String, Long>of());
PAssert.that(reshuffled.apply(Values.<Long>create())).containsInAnyOrder(0L, 1L, 2L);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:ReshuffleTest.java
示例13: testSampleAny
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testSampleAny() {
PCollection<Integer> input =
pipeline
.apply(
Create.timestamped(ImmutableList.of(tv(0), tv(1), tv(2), tv(3), tv(4), tv(5)))
.withCoder(BigEndianIntegerCoder.of()))
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))));
PCollection<Integer> output = input.apply(Sample.<Integer>any(2));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(2, Arrays.asList(0, 1, 2)));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(3000), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(2, Arrays.asList(3, 4, 5)));
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SampleTest.java
示例14: testSampleAnyZero
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testSampleAnyZero() {
PCollection<Integer> input =
pipeline.apply(
Create.timestamped(ImmutableList.of(tv(0), tv(1), tv(2), tv(3), tv(4), tv(5)))
.withCoder(BigEndianIntegerCoder.of()));
PCollection<Integer> output = input
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))))
.apply(Sample.<Integer>any(0));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(0, EMPTY));
PAssert.that(output)
.inWindow(new IntervalWindow(new Instant(3000), Duration.standardSeconds(3)))
.satisfies(new VerifyCorrectSample<>(0, EMPTY));
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SampleTest.java
示例15: testInvalidWindowsService
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
public void testInvalidWindowsService() {
Pipeline p = createTestServiceRunner();
List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
PCollection<KV<String, Integer>> input =
p.apply(Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(1))));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("GroupByKey must have a valid Window merge function");
input
.apply("GroupByKey", GroupByKey.<String, Integer>create())
.apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DataflowGroupByKeyTest.java
示例16: testCompatibleWindowFnPropagation
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testCompatibleWindowFnPropagation() {
PCollection<String> input1 =
p.apply("CreateInput1", Create.of("Input1"))
.apply("Window1",
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))));
PCollection<String> input2 =
p.apply("CreateInput2", Create.of("Input2"))
.apply("Window2",
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(2))));
PCollection<String> output =
PCollectionList.of(input1).and(input2)
.apply(Flatten.<String>pCollections());
p.run();
Assert.assertTrue(output.getWindowingStrategy().getWindowFn().isCompatible(
Sessions.withGapDuration(Duration.standardMinutes(2))));
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:FlattenTest.java
示例17: testIncompatibleWindowFnPropagationFailure
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
public void testIncompatibleWindowFnPropagationFailure() {
p.enableAbandonedNodeEnforcement(false);
PCollection<String> input1 =
p.apply("CreateInput1", Create.of("Input1"))
.apply("Window1", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<String> input2 =
p.apply("CreateInput2", Create.of("Input2"))
.apply("Window2", Window.<String>into(FixedWindows.of(Duration.standardMinutes(2))));
try {
PCollectionList.of(input1).and(input2)
.apply(Flatten.<String>pCollections());
Assert.fail("Exception should have been thrown");
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Inputs to Flatten had incompatible window windowFns"));
}
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:FlattenTest.java
示例18: verifyMergingStatefulParDoRejected
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
.apply(Window.<KV<Integer, Integer>>into(Sessions.withGapDuration(Duration.millis(1))))
.apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() {
@StateId("fizzle")
private final StateSpec<ValueState<Void>> voidState = StateSpecs.value();
@ProcessElement
public void process() {}
}));
thrown.expectMessage("merging");
thrown.expect(UnsupportedOperationException.class);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:DataflowRunnerTest.java
示例19: needToEmit
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
/**
* Do we need to emit?
*/
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
// The pane has elements.
return true;
}
if (timing == Timing.ON_TIME
&& windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) {
// This is an empty ON_TIME pane.
return true;
}
if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
// This is known to be the final pane, and the user has requested it even when empty.
return true;
}
return false;
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:ReduceFnRunner.java
示例20: testIdentityWindowFnPropagation
import org.apache.beam.sdk.transforms.windowing.Window; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testIdentityWindowFnPropagation() {
List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
PCollection<KV<String, Integer>> input =
p.apply(Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<KV<String, Iterable<Integer>>> output =
input.apply(GroupByKey.<String, Integer>create());
p.run();
Assert.assertTrue(output.getWindowingStrategy().getWindowFn().isCompatible(
FixedWindows.of(Duration.standardMinutes(1))));
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:GroupByKeyTest.java
注:本文中的org.apache.beam.sdk.transforms.windowing.Window类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论