• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Time类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.flink.streaming.api.windowing.time.Time的典型用法代码示例。如果您正苦于以下问题:Scala Time类的具体用法?Scala Time怎么用?Scala Time使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Time类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: StatisticsQuery

//设置package包名称以及导入依赖的类
package org.hpi.esb.flink.query

import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.hpi.esb.flink.utils.Statistics

class StatisticsQuery extends Query[String, String] {

  override def execute(stream: DataStream[String]): DataStream[String] = {

   stream
     .map(_.toLong)
     .timeWindowAll(Time.milliseconds(1000))
     .fold(new Statistics(), new StatisticsFoldFunction())
     .map(v => v.toString())
  }

}

class StatisticsFoldFunction extends FoldFunction[Long, Statistics] {
  override def fold(acc: Statistics, value: Long): Statistics = Statistics.fold(acc, value)
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:25,代码来源:StatisticsQuery.scala


示例2: TwitterStream

//设置package包名称以及导入依赖的类
package flink

import flink.parsers.JsonTweetParser
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.twitter.TwitterSource
import org.joda.time.DateTime
import sinks.ElasticsearchSinks

object TwitterStream extends App with ElasticsearchSinks {

  val streamEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val streamSource = streamEnvironment.addSource(new TwitterSource("config/twitter/twitter-auth.properties"))


  val tweets = streamSource
    .flatMap(new JsonTweetParser).name("Filtering tweets")
    .filter(_.text.nonEmpty)

  val tweets2 = tweets
    .map(tweet => (tweet.userLang, 1))
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
    .map(x=> (DateTime.now(),x._1,x._2))
    .addSink(esPopularLanguagesSink)

  streamEnvironment.execute("Twitter Stream")
} 
开发者ID:tquiviger,项目名称:twitter-flink,代码行数:30,代码来源:TwitterStream.scala


示例3: ConsoleReporterTestJob

//设置package包名称以及导入依赖的类
package com.jgrier.flinkstuff.jobs

import com.jgrier.flinkstuff.sources.IntegerSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._

object ConsoleReporterTestJob {
  def main(args: Array[String]) {
    val config = new Configuration()
    config.setString("metrics.reporters", "consoleReporter")
    config.setString("metrics.reporter.consoleReporter.class", "com.jgrier.flinkstuff.metrics.ConsoleReporter")
    config.setString("metrics.reporter.consoleReporter.interval", "10 SECONDS")

    val env = new StreamExecutionEnvironment(new LocalStreamEnvironment(config))
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.addSource(new IntegerSource(100))

    stream
      .timeWindowAll(Time.seconds(1))
      .sum(0)
      .print

    env.execute("ConsoleReporterTestJob")
  }
} 
开发者ID:jgrier,项目名称:flink-stuff,代码行数:31,代码来源:ConsoleReporterTestJob.scala


示例4: Flink

//设置package包名称以及导入依赖的类
package uk.co.bitcat.streaming.flink

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.util.Collector
import uk.co.bitcat.streaming.flink.domain.{Measurement, MeasurementSchema}
import uk.co.bitcat.streaming.flink.watermark.TwoSecondDelayWatermark

object Flink {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink_consumer")

    env
      .addSource(new FlinkKafkaConsumer09[Measurement]("pollution", new MeasurementSchema(), properties))
      .assignTimestampsAndWatermarks(new TwoSecondDelayWatermark())
      .timeWindowAll(Time.seconds(10))
      .apply(
        (0L, 0.0, 0), // (Window End Time, To Store Mean, Count)
        (acc: (Long, Double, Int), m: Measurement) => { (0L, acc._2 + m.pollution, acc._3 + 1) },
        ( window: TimeWindow,
          accs: Iterable[(Long, Double, Int)],
          out: Collector[(Long, Double, Int)] ) =>
        {
          val acc = accs.iterator.next()
          out.collect((window.getEnd, acc._2/acc._3, acc._3))
        }
      )
      .filter(_._2 > 75.0)
      .print()  // Replace with call to custom sink to raise alert for pollution level

    env.execute()
  }

} 
开发者ID:dscook,项目名称:streaming-examples,代码行数:48,代码来源:Flink.scala


示例5: EventTimeWithWaterMarkAllowedLateness

//设置package包名称以及导入依赖的类
package com.vishnuviswanath.eventtime
  
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import Util._
import org.apache.flink.streaming.api.TimeCharacteristic
import com.vishnuviswanath.processtime.Util.ValueAndTimestamp

object EventTimeWithWaterMarkAllowedLateness {
  def main(args: Array[String]) {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //????watermark???????????????????????????
    val text = env.addSource(new ValueAndTimestampWithDelay).assignTimestampsAndWatermarks(new TimestampExtractor)

    val counts = text.map {
      (x: String) => (x.split(",")(0), 1)
    }.keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5)).allowedLateness(Time.seconds(5))
      .sum(1)
    counts.print
    env.execute("ProcessingTime processing example")

  }
} 
开发者ID:Dax1n,项目名称:flinkdevelop,代码行数:28,代码来源:EventTimeWithWaterMarkAllowedLateness.scala


示例6: EventTimeWithWaterMark

//设置package包名称以及导入依赖的类
package com.vishnuviswanath.eventtime

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import Util._
import org.apache.flink.streaming.api.TimeCharacteristic
import com.vishnuviswanath.processtime.Util.ValueAndTimestamp


object EventTimeWithWaterMark {
  def main(args: Array[String]) {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //????watermark???????????????????????????
    val text = env.addSource(new ValueAndTimestampWithDelay).assignTimestampsAndWatermarks(new TimestampExtractorWithWaterMarkDelay)

    val counts = text.map {
      (x: String) => (x.split(",")(0), 1)
    }.keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
    counts.print
    env.execute("ProcessingTime processing example")

  }
} 
开发者ID:Dax1n,项目名称:flinkdevelop,代码行数:29,代码来源:EventTimeWithWaterMark.scala


示例7: EventTime

//设置package包名称以及导入依赖的类
package com.vishnuviswanath.eventtime

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import Util._
import org.apache.flink.streaming.api.TimeCharacteristic

object EventTime {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //??????(a,2), (a,3) and (a,1) 
    //??????(a,1), (a,3) and (a,1)???2?3????????????????????19??????????13?????15-25???????? 
    
    val text = env.addSource(new ValueAndTimestampWithDelay).assignTimestampsAndWatermarks(new TimestampExtractor)

    val counts = text.map {
      (x: String) => (x.split(",")(0), 1)
    }.keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)

    counts.print
    env.execute("ProcessingTime processing example")

  }
} 
开发者ID:Dax1n,项目名称:flinkdevelop,代码行数:30,代码来源:EventTime.scala


示例8: WikipediaAnalysis

//设置package包名称以及导入依赖的类
package phu.quang.le.Stream

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object WikipediaAnalysis extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironment()
  val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource())
  val keyedEdits = edits.keyBy(_.getUser)
  val result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(("", 0L))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
    
  result
    .map(_.toString())
    .addSink(new FlinkKafkaProducer010("localhost:9092", "wiki_results", new SimpleStringSchema()))
  env.execute()
} 
开发者ID:p-le,项目名称:flink-learning,代码行数:22,代码来源:WikipediaAnalysis.scala



注:本文中的org.apache.flink.streaming.api.windowing.time.Time类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala MediaType类代码示例发布时间:2022-05-23
下一篇:
Scala GoogleCredential类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap