本文整理汇总了Scala中org.apache.spark.streaming.Duration类的典型用法代码示例。如果您正苦于以下问题:Scala Duration类的具体用法?Scala Duration怎么用?Scala Duration使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Duration类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: TestUpdateStateByKey
//设置package包名称以及导入依赖的类
package examples.streaming
import org.apache.spark.streaming.{StreamingContext, Duration}
import org.apache.spark.SparkConf
object TestUpdateStateByKey {
val checkpointDir: String = "hdfs://localhost:9000/user/hduser/spark-chkpt"
def main(args: Array[String]): Unit = {
val ssc = StreamingContext.getOrCreate(checkpointDir, createFunc _)
ssc.start()
ssc.awaitTermination()
}
def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {
Some(values.size + state.getOrElse(0))
}
def createFunc(): StreamingContext = {
val ssc = new StreamingContext(new SparkConf().setAppName("TestUpdateStateByKeyJob"),
Duration(2000))
ssc.checkpoint(checkpointDir)
ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunc _)
.checkpoint(Duration(10000))
.print()
ssc
}
}
开发者ID:prithvirajbose,项目名称:spark-dev,代码行数:37,代码来源:TestUpdateStateByKey.scala
示例2: SparkStreamKinesis
//设置package包名称以及导入依赖的类
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object SparkStreamKinesis{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Kinesis").setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(1))
println("Spark Streaming")
val kinesisStream = KinesisUtils.createStream(ssc, "sparrow-ci",
"sparrow-ci",
"kinesis.us-east-1.amazonaws.com",
"us-east-1",
LATEST,
Duration(2000),
MEMORY_AND_DISK_2)
kinesisStream.print()
kinesisStream.flatMap(new String(_))
.foreachRDD(_.collect().foreach(print))
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:noppanit,项目名称:spark-streaming-kinesis-example,代码行数:34,代码来源:main.scala
示例3: CommandStreamProcessor
//设置package包名称以及导入依赖的类
package com.crystal
package processors
// Spark
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{ Duration, StreamingContext }
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
// JSON Parsing
import scala.util.parsing.json.JSON
object CommandStreamProcessor {
def setup(appConfig: AppConfig, streamingCtx: StreamingContext) = {
val cmdStream = getCommandStream(appConfig, streamingCtx)
cmdStream.foreachRDD { rdd =>
rdd.foreach{ cmd =>
println("--- Command Received ---")
}
}
}
private def getCommandStream(
appConfig: AppConfig,
streamingCtx: StreamingContext): DStream[Map[String, Any]] = {
val stream = KinesisUtils.createStream(
streamingCtx,
appConfig.commandAppName,
appConfig.commandStreamName,
s"kinesis.${appConfig.regionName}.amazonaws.com",
appConfig.regionName,
InitialPositionInStream.LATEST,
Duration(appConfig.checkpointInterval),
StorageLevel.MEMORY_AND_DISK_2
)
stream
.map { byteArray => new String(byteArray) }
.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, Any]] }
}
}
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:45,代码来源:CommandStreamProcessor.scala
示例4: Main
//设置package包名称以及导入依赖的类
package com.crystal
// Spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Duration, StreamingContext }
// Processors
import processors.CommandStreamProcessor
import processors.SnowplowStreamProcessor
object Main extends App {
AppConfig.setArgs(args)
AppConfig.load() match {
case Some(appConfig) =>
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(appConfig.appName)
val streamingCtx = new StreamingContext(
sparkConf,
Duration(appConfig.checkpointInterval)
)
// Disable noisy logging
streamingCtx.sparkContext.setLogLevel("ERROR")
CommandStreamProcessor.setup(appConfig, streamingCtx)
SnowplowStreamProcessor.setup(appConfig, streamingCtx)
streamingCtx.start()
streamingCtx.awaitTerminationOrTimeout(appConfig.checkpointInterval * 3)
case None => ()
}
}
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:37,代码来源:Main.scala
示例5: CassandraConfig
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra
import com.microsoft.partnercatalyst.fortis.spark.FortisSettings
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
import scala.util.Properties.envOrElse
object CassandraConfig {
private val CassandraUsername = "cassandra"
private val CassandraPassword = "cassandra"//todo disable auth as we wont need it as C* will only be available internally in the cluster
def init(conf: SparkConf, batchDuration: Duration, fortisSettings: FortisSettings): SparkConf = {
conf.setIfMissing("spark.cassandra.connection.host", fortisSettings.cassandraHosts)
.setIfMissing("spark.cassandra.auth.username", CassandraUsername)
.setIfMissing("spark.cassandra.auth.password", CassandraPassword)
.setIfMissing("spark.cassandra.connection.keep_alive_ms", (batchDuration.milliseconds * 2).toString)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:20,代码来源:CassandraConfig.scala
示例6: RuntimeJobInfo
//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.api
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Duration
case class RuntimeJobInfo(
id: String,
workerId: String
)
case class CentralLoggingConf(
host: String,
port: Int
)
case class SetupConfiguration(
context: SparkContext,
streamingDuration: Duration,
info: RuntimeJobInfo,
loggingConf: Option[CentralLoggingConf]
)
开发者ID:Hydrospheredata,项目名称:mist,代码行数:22,代码来源:SetupConfiguration.scala
示例7: NamedContext
//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.worker
import java.io.File
import io.hydrosphere.mist.api.{CentralLoggingConf, RuntimeJobInfo, SetupConfiguration}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.Duration
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
class NamedContext(
val sparkContext: SparkContext,
val namespace: String,
streamingDuration: Duration = Duration(40 * 1000),
loggingConf: Option[CentralLoggingConf] = None
) {
private val jars = mutable.Buffer.empty[String]
def addJar(jarPath: String): Unit = {
val jarAbsolutePath = new File(jarPath).getAbsolutePath
if (!jars.contains(jarAbsolutePath)) {
sparkContext.addJar(jarPath)
jars += jarAbsolutePath
}
}
def setupConfiguration(jobId: String): SetupConfiguration = {
SetupConfiguration(
context = sparkContext,
streamingDuration = streamingDuration,
info = RuntimeJobInfo(jobId, namespace),
loggingConf = loggingConf
)
}
//TODO: can we call that inside python directly using setupConfiguration?
// python support
def sparkConf: SparkConf = sparkContext.getConf
// python support
def javaContext: JavaSparkContext = new JavaSparkContext(sparkContext)
// python support
def sqlContext: SQLContext = new SQLContext(sparkContext)
// python support
def hiveContext: HiveContext = new HiveContext(sparkContext)
def stop(): Unit = {
sparkContext.stop()
}
}
开发者ID:Hydrospheredata,项目名称:mist,代码行数:58,代码来源:NamedContext.scala
示例8: getTopHashtag
//设置package包名称以及导入依赖的类
package es.ucm.fdi.sscheck.spark.demo.twitter
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import twitter4j.Status
def getTopHashtag(batchInterval: Duration, windowSize: Int)
(tweets: DStream[Status]): DStream[String] = {
val counts = countHashtags(batchInterval, windowSize)(tweets)
val topHashtag = counts.map { case(tag, count) => (count, tag) }
.transform(rdd => {
val sorted = rdd.sortByKey(false)
rdd.sparkContext.parallelize(sorted.take(1).map(_._2))
}
)
topHashtag.foreachRDD(rdd =>
println(s"Top hashtag: ${rdd.take(1).mkString(",")}")
)
topHashtag
}
}
开发者ID:juanrh,项目名称:sscheck-examples,代码行数:24,代码来源:TweetOps.scala
示例9: compute
//设置package包名称以及导入依赖的类
package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
private[streaming] trait BatchIntervalEstimator extends Serializable {
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Long]
}
object BatchIntervalEstimator {
def create(conf: SparkConf, minBatchInterval: Duration): BatchIntervalEstimator =
conf.get("spark.streaming.batchsizecontrol.batchIntervalEstimator", "pid") match {
case "pid" =>
val proportional = conf.getDouble("spark.streaming.batchsizecontrol.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.batchsizecontrol.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.batchsizecontrol.pid.derived", 0.0)
new PIDBatchIntervalEstimator(minBatchInterval.milliseconds, proportional, integral, derived)
case "gradient" =>
val threshold = conf.getLong("spark.streaming.batchsizecontrol.gradient.threshold", 25) // TODO: RK: Check appropriate threshold
val stepSize = conf.getLong("spark.streaming.batchsizecontrol.gradient.stepSize", 100)
new GradientBatchIntervalEstimator(minBatchInterval.milliseconds, threshold, stepSize)
case unknown =>
throw new IllegalArgumentException(s"Unknown batch size estimator: $unknown")
}
}
开发者ID:kandu009,项目名称:Apache_Spark,代码行数:35,代码来源:BatchIntervalEstimator.scala
示例10: Main
//设置package包名称以及导入依赖的类
package com.fingerco
// Spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Duration, StreamingContext }
// Processors
import processors.PageStreamProcessor
object Main extends App {
AppConfig.load() match {
case Some(appConfig) =>
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(appConfig.appName)
val streamingCtx = new StreamingContext(
sparkConf,
Duration(appConfig.checkpointInterval)
)
// Disable noisy logging
streamingCtx.sparkContext.setLogLevel("ERROR")
PageStreamProcessor.setup(appConfig, streamingCtx)
streamingCtx.start()
streamingCtx.awaitTerminationOrTimeout(appConfig.checkpointInterval * 3)
case None => ()
}
}
开发者ID:fingerco,项目名称:watcher-page-crawler,代码行数:32,代码来源:Main.scala
注:本文中的org.apache.spark.streaming.Duration类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论