本文整理汇总了Scala中org.apache.spark.scheduler.SparkListener类的典型用法代码示例。如果您正苦于以下问题:Scala SparkListener类的具体用法?Scala SparkListener怎么用?Scala SparkListener使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SparkListener类的4个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: MetricsListener
//设置package包名称以及导入依赖的类
package com.groupon.dse.spark.listeners
import org.apache.spark.groupon.metrics.UserMetricsSystem
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerTaskEnd}
class MetricsListener extends SparkListener {
private lazy val executorRemovedMeter = UserMetricsSystem.meter("baryon.executorRemoved.rate")
private lazy val failedStagesMeter = UserMetricsSystem.meter("baryon.failedStages.rate")
private lazy val failedTasksMeter = UserMetricsSystem.meter("baryon.failedTasks.rate")
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedMeter.mark()
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
if (stageCompleted.stageInfo.failureReason.isDefined) {
failedStagesMeter.mark()
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
if (!taskEnd.taskInfo.successful) {
failedTasksMeter.mark()
}
}
}
开发者ID:groupon,项目名称:baryon,代码行数:28,代码来源:MetricsListener.scala
示例2: ParallelizeTest
//设置package包名称以及导入依赖的类
package com.jorgefigueiredo
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Assert._
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class ParallelizeTest extends FunSuite with BeforeAndAfter {
var sparkContext: SparkContext = _
before {
sparkContext = SparkContextFactory.getContext()
}
after {
if(sparkContext != null) {
sparkContext.stop()
}
}
test("Parallelize with 5 partitions") {
val numberOfPartitions = 5
val range = 1 to 10000
val items = sparkContext.parallelize(range, numberOfPartitions)
val filterItems = items.filter(item => item % 2 ==0)
val result = filterItems.count()
assertEquals(numberOfPartitions, items.partitions.length)
assertEquals(range.end / 2, result)
}
test("Parallelize foreach partition") {
sparkContext.addSparkListener(new SparkListener {
})
val numberOfPartitions = 5
val range = 1 to 10000
val items = sparkContext.parallelize(range, numberOfPartitions)
assertEquals(5, items.partitions.length)
}
}
开发者ID:jorgeacf,项目名称:apache-spark-demos,代码行数:51,代码来源:ParallelizeTest.scala
示例3: CleanupUtil
//设置package包名称以及导入依赖的类
package com.hazelcast.spark.connector.util
import com.hazelcast.spark.connector.util.ConnectionUtil.closeAll
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
object CleanupUtil {
val jobIds: collection.mutable.Map[Int, Seq[Int]] = collection.mutable.Map[Int, Seq[Int]]()
val cleanupJobRddName: String = "HazelcastResourceCleanupJob"
def addCleanupListener(sc: SparkContext): Unit = {
sc.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
this.synchronized {
jobStart.stageInfos.foreach(info => {
info.rddInfos.foreach(rdd => {
if (!cleanupJobRddName.equals(rdd.name)) {
val ids: Seq[Int] = info.rddInfos.map(_.id)
val maybeIds: Option[Seq[Int]] = jobIds.get(jobStart.jobId)
if (maybeIds.isDefined) {
jobIds.put(jobStart.jobId, ids ++ maybeIds.get)
} else {
jobIds.put(jobStart.jobId, ids)
}
}
})
})
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
this.synchronized {
if (jobIds.contains(jobEnd.jobId)) {
try {
val workers = sc.getConf.getInt("spark.executor.instances", sc.getExecutorStorageStatus.length)
val rddId: Option[Seq[Int]] = jobIds.get(jobEnd.jobId)
if (rddId.isDefined) {
sc.parallelize(1 to workers, workers).setName(cleanupJobRddName).foreachPartition(it ? closeAll(rddId.get))
}
jobIds -= jobEnd.jobId
} catch {
case e: Exception =>
}
}
}
}
})
}
}
开发者ID:hazelcast,项目名称:hazelcast-spark,代码行数:53,代码来源:CleanupUtil.scala
示例4: KarpsListener
//设置package包名称以及导入依赖的类
package org.karps
import com.typesafe.scalalogging.slf4j.{StrictLogging => Logging}
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerStageSubmitted}
import org.apache.spark.sql.SparkSession
import org.karps.ops.{HdfsPath, HdfsResourceResult, SourceStamps}
import org.karps.structures.UntypedNodeJson
class KarpsListener(manager: Manager) extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
logger.debug(s"stage completed: $stageCompleted")
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
logger.debug(s"stage submitted: $stageSubmitted")
}
}
def statusComputation(
session: SessionId,
computation: ComputationId): Option[BatchComputationResult] = {
sessions.get(session).flatMap { ks =>
ks.statusComputation(computation)
}
}
def resourceStatus(session: SessionId, paths: Seq[HdfsPath]): Seq[HdfsResourceResult] = {
sessions.get(session)
.map(session => SourceStamps.getStamps(sparkSession, paths))
.getOrElse(Seq.empty)
}
}
开发者ID:krapsh,项目名称:kraps-server,代码行数:36,代码来源:Manager.scala
注:本文中的org.apache.spark.scheduler.SparkListener类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论