本文整理汇总了Java中org.apache.flink.streaming.api.functions.windowing.AllWindowFunction类的典型用法代码示例。如果您正苦于以下问题:Java AllWindowFunction类的具体用法?Java AllWindowFunction怎么用?Java AllWindowFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
AllWindowFunction类属于org.apache.flink.streaming.api.functions.windowing包,在下文中一共展示了AllWindowFunction类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public static void main(String... args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
edits
.timeWindowAll(Time.minutes(1))
.apply(new AllWindowFunction<WikipediaEditEvent, Tuple3<Date, Long, Long>, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple3<Date, Long, Long>> collector) throws Exception {
long count = 0;
long bytesChanged = 0;
for (WikipediaEditEvent event : iterable) {
count++;
bytesChanged += event.getByteDiff();
}
collector.collect(new Tuple3<>(new Date(timeWindow.getEnd()), count, bytesChanged));
}
})
.print();
env.execute();
}
开发者ID:mushketyk,项目名称:flink-examples,代码行数:27,代码来源:NumberOfWikiEditsPerWindow.java
示例2: aggregate
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Arriving data is incrementally aggregated using the given aggregate function. This means
* that the window function typically has only a single value to process when called.
*
* @param aggFunction The aggregate function that is used for incremental aggregation.
* @param windowFunction The window function.
*
* @return The data stream that is the result of applying the window function to the window.
*
* @param <ACC> The type of the AggregateFunction's accumulator
* @param <V> The type of AggregateFunction's result, and the WindowFunction's input
* @param <R> The type of the elements in the resulting stream, equal to the
* WindowFunction's result type
*/
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
AllWindowFunction<V, R, W> windowFunction) {
checkNotNull(aggFunction, "aggFunction");
checkNotNull(windowFunction, "windowFunction");
TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
aggFunction, input.getType(), null, false);
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:AllWindowedStream.java
示例3: testApplyWindowAllState
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
public void testApplyWindowAllState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<File> src = env.fromElements(new File("/"));
SingleOutputStreamOperator<?> result = src
.timeWindowAll(Time.milliseconds(1000))
.apply(new AllWindowFunction<File, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
});
validateListStateDescriptorConfigured(result);
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:StateDescriptorPassingTest.java
示例4: main
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
List<Integer> list = Arrays.asList(array);
DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
HashMap<Integer, Integer> map = new HashMap<>();
for (Integer tuple : tuples) {
Integer value = 0;
if (map.containsKey(tuple)) {
value = map.get(tuple);
}
map.put(tuple, value + 1);
}
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
}
}
});
counts.print();
env.execute("Stream WordCount");
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:31,代码来源:CountWindowTest.java
示例5: getAllWindowFunctionReturnType
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
AllWindowFunction<IN, OUT, ?> function,
TypeInformation<IN> inType) {
return TypeExtractor.getUnaryOperatorReturnType(
function,
AllWindowFunction.class,
0,
1,
new int[]{1, 0},
new int[]{2, 0},
inType,
null,
false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:AllWindowedStream.java
示例6: testApplyEventTime
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:AllWindowTranslationTest.java
示例7: testApplyProcessingTimeTime
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyProcessingTimeTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:AllWindowTranslationTest.java
示例8: testApplyWithCustomTrigger
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithCustomTrigger() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof WindowOperator);
WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:AllWindowTranslationTest.java
示例9: testApplyWithEvictor
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (Tuple2<String, Integer> in : values) {
out.collect(in);
}
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
Assert.assertTrue(operator instanceof EvictingWindowOperator);
EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:AllWindowTranslationTest.java
示例10: InternalSingleValueAllWindowFunction
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:InternalSingleValueAllWindowFunction.java
示例11: InternalIterableAllWindowFunction
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:InternalIterableAllWindowFunction.java
示例12: reduce
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Arriving data is incrementally aggregated using the given reducer.
*
* @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
return reduce(reduceFunction, function, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:AllWindowedStream.java
示例13: fold
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Arriving data is incrementally aggregated using the given fold function.
*
* @param initialValue The initial value of the fold.
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*
* @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
*/
@PublicEvolving
@Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType);
return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:AllWindowedStream.java
示例14: apply
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AllWindowedStream.java
注:本文中的org.apache.flink.streaming.api.functions.windowing.AllWindowFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论