本文整理汇总了Java中org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor类的典型用法代码示例。如果您正苦于以下问题:Java BoundedOutOfOrdernessTimestampExtractor类的具体用法?Java BoundedOutOfOrdernessTimestampExtractor怎么用?Java BoundedOutOfOrdernessTimestampExtractor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
BoundedOutOfOrdernessTimestampExtractor类属于org.apache.flink.streaming.api.functions.timestamps包,在下文中一共展示了BoundedOutOfOrdernessTimestampExtractor类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// parse arguments
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
// create streaming environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// enable fault-tolerance
env.enableCheckpointing(1000);
// enable restarts
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));
env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));
// run each operator separately
env.disableOperatorChaining();
// get data from Kafka
Properties kParams = params.getProperties();
kParams.setProperty("group.id", UUID.randomUUID().toString());
DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
@Override
public long extractTimestamp(ObjectNode jsonNodes) {
return jsonNodes.get("timestamp_ms").asLong();
}
}).name("Timestamp extractor");
// filter out records without lang field
DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");
// select only lang = "en" tweets
DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");
// write to file system
RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");
// build aggregates (count per language) using window (10 seconds tumbling):
DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
.timeWindow(Time.seconds(10))
.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");
// write window aggregate to ElasticSearch
List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());
languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");
// word-count on the tweet stream
DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
// get text from tweets
.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
// split text into (word, 1) tuples
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] splits = s.split(" ");
for (String sp : splits) {
collector.collect(new Tuple2<>(sp, 1L));
}
}
}).name("Tokenize words")
// group by word
.keyBy(0)
// build 1 min windows, compute every 10 seconds --> count word frequency
.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
// build top n every 10 seconds
.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");
// write top Ns to Kafka topic
topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");
env.execute("Streaming ETL");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java
示例2: testInitialFinalAndWatermarkUnderflow
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; //导入依赖的package包/类
@Test
public void testInitialFinalAndWatermarkUnderflow() {
BoundedOutOfOrdernessTimestampExtractor<Long> extractor = new LongExtractor(Time.milliseconds(10L));
assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
extractor.extractTimestamp(Long.MIN_VALUE, -1L);
// the following two lines check for underflow.
// We have a max latency of 5 millis.
// We insert an element with ts of Long.MIN_VALUE + 2, which will now be the max ts,
// then when getting the next watermark, we would have Long.MIN_VALUE + 2 - 5 which
// would lead to underflow.
extractor.extractTimestamp(Long.MIN_VALUE + 2, -1);
assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
extractor.extractTimestamp(Long.MAX_VALUE, -1L);
assertEquals(Long.MAX_VALUE - 10, extractor.getCurrentWatermark().getTimestamp());
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:BoundedOutOfOrdernessTimestampExtractorTest.java
示例3: runValidTests
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; //导入依赖的package包/类
private void runValidTests(BoundedOutOfOrdernessTimestampExtractor<Long> extractor) {
assertEquals(new Watermark(Long.MIN_VALUE), extractor.getCurrentWatermark());
assertEquals(13L, extractor.extractTimestamp(13L, 0L));
assertEquals(13L, extractor.extractTimestamp(13L, 0L));
assertEquals(14L, extractor.extractTimestamp(14L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(new Watermark(10L), extractor.getCurrentWatermark());
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(500L, extractor.extractTimestamp(500L, 0L));
assertEquals(new Watermark(490L), extractor.getCurrentWatermark());
assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 0L));
assertEquals(new Watermark(Long.MAX_VALUE - 11), extractor.getCurrentWatermark());
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:BoundedOutOfOrdernessTimestampExtractorTest.java
示例4: testInitializationAndRuntime
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; //导入依赖的package包/类
@Test
public void testInitializationAndRuntime() {
Time maxAllowedLateness = Time.milliseconds(10L);
BoundedOutOfOrdernessTimestampExtractor<Long> extractor =
new LongExtractor(maxAllowedLateness);
assertEquals(maxAllowedLateness.toMilliseconds(),
extractor.getMaxOutOfOrdernessInMillis());
runValidTests(extractor);
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:BoundedOutOfOrdernessTimestampExtractorTest.java
示例5: main
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
FlinkPravegaParams helper = new FlinkPravegaParams(params);
StreamId stream = helper.createStreamFromParam("input", "examples/turbineHeatTest");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1. read and decode the sensor events from a Pravega stream
long startTime = params.getLong("start", 0L);
FlinkPravegaReader<String> reader = helper.newReader(stream, startTime, String.class);
DataStream<SensorEvent> events = env.addSource(reader, "input").map(new SensorMapper()).name("events");
// 2. extract timestamp information to support 'event-time' processing
SingleOutputStreamOperator<SensorEvent> timestamped = events.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<SensorEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(SensorEvent element) {
return element.getTimestamp();
}
});
timestamped.print();
// 3. summarize the temperature data for each sensor
SingleOutputStreamOperator<SensorAggregate> summaries = timestamped
.keyBy("sensorId")
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(8)))
.fold(null, new SensorAggregator()).name("summaries");
// 4. save to HDFS and print to stdout. Refer to the TaskManager's 'Stdout' view in the Flink UI.
summaries.print().name("stdout");
if (params.has("output")) {
summaries.writeAsCsv(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
}
env.execute("TurbineHeatProcessor_" + stream);
}
开发者ID:pravega,项目名称:pravega-samples,代码行数:38,代码来源:TurbineHeatProcessor.java
注:本文中的org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论