• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Duration类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.Duration的典型用法代码示例。如果您正苦于以下问题:Scala Duration类的具体用法?Scala Duration怎么用?Scala Duration使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Duration类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: TestUpdateStateByKey

//设置package包名称以及导入依赖的类
package examples.streaming

import org.apache.spark.streaming.{StreamingContext, Duration}
import org.apache.spark.SparkConf


object TestUpdateStateByKey {
  val checkpointDir: String = "hdfs://localhost:9000/user/hduser/spark-chkpt"

  def main(args: Array[String]): Unit = {
    val ssc = StreamingContext.getOrCreate(checkpointDir, createFunc _)

    ssc.start()
    ssc.awaitTermination()
  }

  def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {
    Some(values.size + state.getOrElse(0))
  }

  def createFunc(): StreamingContext = {
    val ssc = new StreamingContext(new SparkConf().setAppName("TestUpdateStateByKeyJob"),
      Duration(2000))

    ssc.checkpoint(checkpointDir)

    ssc.socketTextStream("localhost", 9999)
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc _)
      .checkpoint(Duration(10000))
      .print()

    ssc
  }
} 
开发者ID:prithvirajbose,项目名称:spark-dev,代码行数:37,代码来源:TestUpdateStateByKey.scala


示例2: SparkStreamKinesis

//设置package包名称以及导入依赖的类
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}


object SparkStreamKinesis{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark Kinesis").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(1))

    println("Spark Streaming")

    

    val kinesisStream = KinesisUtils.createStream(ssc, "sparrow-ci",
      "sparrow-ci",
      "kinesis.us-east-1.amazonaws.com",
      "us-east-1",
      LATEST,
      Duration(2000),
      MEMORY_AND_DISK_2)

    kinesisStream.print()

    kinesisStream.flatMap(new String(_))
      .foreachRDD(_.collect().foreach(print))

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:noppanit,项目名称:spark-streaming-kinesis-example,代码行数:34,代码来源:main.scala


示例3: CommandStreamProcessor

//设置package包名称以及导入依赖的类
package com.crystal
package processors

// Spark
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{ Duration, StreamingContext }
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

// JSON Parsing
import scala.util.parsing.json.JSON

object CommandStreamProcessor {
  def setup(appConfig: AppConfig, streamingCtx: StreamingContext) = {
    val cmdStream = getCommandStream(appConfig, streamingCtx)

    cmdStream.foreachRDD { rdd =>
      rdd.foreach{ cmd =>
        println("--- Command Received ---")
      }
    }
  }

  private def getCommandStream(
    appConfig: AppConfig,
    streamingCtx: StreamingContext): DStream[Map[String, Any]] = {
    val stream = KinesisUtils.createStream(
      streamingCtx,
      appConfig.commandAppName,
      appConfig.commandStreamName,
      s"kinesis.${appConfig.regionName}.amazonaws.com",
      appConfig.regionName,
      InitialPositionInStream.LATEST,
      Duration(appConfig.checkpointInterval),
      StorageLevel.MEMORY_AND_DISK_2
    )

    stream
      .map { byteArray => new String(byteArray) }
      .map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, Any]] }
  }

} 
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:45,代码来源:CommandStreamProcessor.scala


示例4: Main

//设置package包名称以及导入依赖的类
package com.crystal

// Spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Duration, StreamingContext }

// Processors
import processors.CommandStreamProcessor
import processors.SnowplowStreamProcessor

object Main extends App {
  AppConfig.setArgs(args)


  AppConfig.load() match {
    case Some(appConfig) =>
      val sparkConf = new SparkConf()
        .setMaster("local[*]")
        .setAppName(appConfig.appName)

      val streamingCtx = new StreamingContext(
        sparkConf,
        Duration(appConfig.checkpointInterval)
      )

      // Disable noisy logging
      streamingCtx.sparkContext.setLogLevel("ERROR")

      CommandStreamProcessor.setup(appConfig, streamingCtx)
      SnowplowStreamProcessor.setup(appConfig, streamingCtx)

      streamingCtx.start()
      streamingCtx.awaitTerminationOrTimeout(appConfig.checkpointInterval * 3)
    case None => ()
  }
} 
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:37,代码来源:Main.scala


示例5: CassandraConfig

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra

import com.microsoft.partnercatalyst.fortis.spark.FortisSettings
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration

import scala.util.Properties.envOrElse

object CassandraConfig {
  private val CassandraUsername = "cassandra"
  private val CassandraPassword = "cassandra"//todo disable auth as we wont need it as C* will only be available internally in the cluster

  def init(conf: SparkConf, batchDuration: Duration, fortisSettings: FortisSettings): SparkConf = {
    conf.setIfMissing("spark.cassandra.connection.host", fortisSettings.cassandraHosts)
      .setIfMissing("spark.cassandra.auth.username", CassandraUsername)
      .setIfMissing("spark.cassandra.auth.password", CassandraPassword)
      .setIfMissing("spark.cassandra.connection.keep_alive_ms", (batchDuration.milliseconds * 2).toString)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:20,代码来源:CassandraConfig.scala


示例6: RuntimeJobInfo

//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.api

import org.apache.spark.SparkContext
import org.apache.spark.streaming.Duration

case class RuntimeJobInfo(
  id: String,
  workerId: String
)

case class CentralLoggingConf(
  host: String,
  port: Int
)

case class SetupConfiguration(
  context: SparkContext,
  streamingDuration: Duration,
  info: RuntimeJobInfo,
  loggingConf: Option[CentralLoggingConf]
) 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:22,代码来源:SetupConfiguration.scala


示例7: NamedContext

//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.worker

import java.io.File

import io.hydrosphere.mist.api.{CentralLoggingConf, RuntimeJobInfo, SetupConfiguration}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.Duration
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

class NamedContext(
  val sparkContext: SparkContext,
  val namespace: String,
  streamingDuration: Duration = Duration(40 * 1000),
  loggingConf: Option[CentralLoggingConf] = None
) {

  private val jars = mutable.Buffer.empty[String]

  def addJar(jarPath: String): Unit = {
    val jarAbsolutePath = new File(jarPath).getAbsolutePath
    if (!jars.contains(jarAbsolutePath)) {
      sparkContext.addJar(jarPath)
      jars += jarAbsolutePath
    }
  }

  def setupConfiguration(jobId: String): SetupConfiguration = {
    SetupConfiguration(
      context = sparkContext,
      streamingDuration = streamingDuration,
      info = RuntimeJobInfo(jobId, namespace),
      loggingConf = loggingConf
    )
  }

  //TODO: can we call that inside python directly using setupConfiguration?
  // python support
  def sparkConf: SparkConf = sparkContext.getConf

  // python support
  def javaContext: JavaSparkContext = new JavaSparkContext(sparkContext)

  // python support
  def sqlContext: SQLContext = new SQLContext(sparkContext)

  // python support
  def hiveContext: HiveContext = new HiveContext(sparkContext)

  def stop(): Unit = {
    sparkContext.stop()
  }

} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:58,代码来源:NamedContext.scala


示例8: getTopHashtag

//设置package包名称以及导入依赖的类
package es.ucm.fdi.sscheck.spark.demo.twitter

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration

import twitter4j.Status


  def getTopHashtag(batchInterval: Duration, windowSize: Int)
                  (tweets: DStream[Status]): DStream[String] = {
    val counts = countHashtags(batchInterval, windowSize)(tweets)
    val topHashtag = counts.map { case(tag, count) => (count, tag) }
                           .transform(rdd => {
                                val sorted = rdd.sortByKey(false)
                                rdd.sparkContext.parallelize(sorted.take(1).map(_._2))
                              }
                            )
    topHashtag.foreachRDD(rdd =>
      println(s"Top hashtag: ${rdd.take(1).mkString(",")}")
    )
    topHashtag
  }
} 
开发者ID:juanrh,项目名称:sscheck-examples,代码行数:24,代码来源:TweetOps.scala


示例9: compute

//设置package包名称以及导入依赖的类
package org.apache.spark.streaming.scheduler.rate

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration

private[streaming] trait BatchIntervalEstimator extends Serializable {

  
  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Long]
}

object BatchIntervalEstimator {

  def create(conf: SparkConf, minBatchInterval: Duration): BatchIntervalEstimator =
    conf.get("spark.streaming.batchsizecontrol.batchIntervalEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.batchsizecontrol.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.batchsizecontrol.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.batchsizecontrol.pid.derived", 0.0)
        new PIDBatchIntervalEstimator(minBatchInterval.milliseconds, proportional, integral, derived)

      case "gradient" =>
        val threshold = conf.getLong("spark.streaming.batchsizecontrol.gradient.threshold", 25) // TODO: RK: Check appropriate threshold
        val stepSize = conf.getLong("spark.streaming.batchsizecontrol.gradient.stepSize", 100)
        new GradientBatchIntervalEstimator(minBatchInterval.milliseconds, threshold, stepSize)

      case unknown =>
        throw new IllegalArgumentException(s"Unknown batch size estimator: $unknown")
    }
} 
开发者ID:kandu009,项目名称:Apache_Spark,代码行数:35,代码来源:BatchIntervalEstimator.scala


示例10: Main

//设置package包名称以及导入依赖的类
package com.fingerco

// Spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Duration, StreamingContext }

// Processors
import processors.PageStreamProcessor

object Main extends App {
  AppConfig.load() match {
    case Some(appConfig) =>
      val sparkConf = new SparkConf()
        .setMaster("local[*]")
        .setAppName(appConfig.appName)

      val streamingCtx = new StreamingContext(
        sparkConf,
        Duration(appConfig.checkpointInterval)
      )

      // Disable noisy logging
      streamingCtx.sparkContext.setLogLevel("ERROR")

      PageStreamProcessor.setup(appConfig, streamingCtx)

      streamingCtx.start()
      streamingCtx.awaitTerminationOrTimeout(appConfig.checkpointInterval * 3)
    case None => ()
  }
} 
开发者ID:fingerco,项目名称:watcher-page-crawler,代码行数:32,代码来源:Main.scala



注:本文中的org.apache.spark.streaming.Duration类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Writable类代码示例发布时间:2022-05-23
下一篇:
Scala RandomForestClassifier类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap