本文整理汇总了Scala中org.apache.spark.internal.Logging类的典型用法代码示例。如果您正苦于以下问题:Scala Logging类的具体用法?Scala Logging怎么用?Scala Logging使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Logging类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ClickHouseSink
//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming
import io.clickhouse.ext.ClickHouseUtils
import io.clickhouse.ext.tools.Utils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.streaming.Sink
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag
class ClickHouseSink[T <: Product: ClassTag](dbName: String, tableName: String, eventDataColumn: String)
(getConnectionString: () => (String, Int)) // -> (host, port)
(partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date)
(implicit tag: TypeTag[T]) extends Sink with Serializable with Logging {
override def addBatch(batchId: Long, data: DataFrame) = {
val res = data.queryExecution.toRdd.mapPartitions{ iter =>
val stateUpdateEncoder = Encoders.product[T]
val schema = stateUpdateEncoder.schema
val exprEncoder = stateUpdateEncoder.asInstanceOf[ExpressionEncoder[T]]
if(iter.nonEmpty){
val clickHouseHostPort = getConnectionString()
Utils.using(ClickHouseUtils.createConnection(clickHouseHostPort)){ connection =>
val insertStatement = ClickHouseUtils.prepareInsertStatement(connection, dbName, tableName, eventDataColumn)(schema)
iter.foreach{ internalRow =>
val caseClassInstance = exprEncoder.resolveAndBind(
schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
).fromRow(internalRow)
val row = org.apache.spark.sql.Row.fromTuple(caseClassInstance)
ClickHouseUtils.batchAdd(schema, row)(insertStatement)(partitionFunc)
}
val inserted = insertStatement.executeBatch().sum
log.info(s"inserted $inserted -> (${clickHouseHostPort._1}:${clickHouseHostPort._2})")
List(inserted).toIterator
} // end: close connection
} else {
Iterator.empty
}
} // end: mapPartition
val insertedCount = res.collect().sum
log.info(s"Batch $batchId's inserted total: $insertedCount")
}
}
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:58,代码来源:ClickHouseSink.scala
示例2: extractCatalogConf
//设置package包名称以及导入依赖的类
package org.apache.spark.sql.crossdata
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.internal.Logging
trait BuilderHelper extends Logging {
private[this] val ParentConfPrefix = "crossdata-core"
private[this] val SparkConfPrefix = "spark"
private[this] val CatalogConfPrefix = "catalog"
private[crossdata] def extractCatalogConf(options: scala.collection.mutable.HashMap[String, String]): Config = {
val catalogConf = options.filter {
case (key, _) => key.startsWith(s"$ParentConfPrefix.$CatalogConfPrefix")
}
import scala.collection.JavaConversions._
ConfigFactory.parseMap {
catalogConf
.map { t =>
(t._1.replaceFirst(s"$ParentConfPrefix.$CatalogConfPrefix.", ""), t._2)
}
.toMap[String, String]
}
}
}
开发者ID:nagyistge,项目名称:crossdata-spark2,代码行数:28,代码来源:BuilderHelper.scala
示例3: SparkTask
//设置package包名称以及导入依赖的类
package org.apache.spark.scheduler
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
class SparkTask(jid:Int,tid:Int,info:TaskInfo) extends Logging{
val stageId = tid
val taskId = info.taskId
val jobId = jid
val taskInfo = info
var taskMetrics:TaskMetrics = null
def setTaskMetrics(data:TaskMetrics) =
if(this.taskMetrics == null)
this.taskMetrics = data
else
logInfo("init failed\n")
}
object SparkTask {
def getTaskSetStatusDetails(info:TaskInfo,stageId:Int):String = {
val taskId = info.id
val taskStatus = info.status
val executorId = info.executorId+":"+info.host
val duringTime = info.duration
s"TaskInfo($taskId,$stageId);Executor($executorId);"+
s"taskStatus($taskStatus);Took($duringTime)"
}
}
开发者ID:bingrao,项目名称:SparkListener,代码行数:35,代码来源:SparkTask.scala
示例4: SparkJob
//设置package包名称以及导入依赖的类
package org.apache.spark.scheduler
import scala.collection.mutable
import org.apache.spark.internal.Logging
class SparkJob(key:Int) extends Logging {
var stages = new mutable.HashMap[Int,SparkStage]()
val jobId = key
var jobResult: JobResult = null
private var startTime:Long = 0
private var endTime:Long = 0
def addStage(stageId:Int,sparkStage:SparkStage) = {
stages.+=(stageId -> sparkStage)
}
def updateResult(result: JobResult) = this.jobResult = result
def setStartTime(time:Long) = this.startTime = time
def setEndTime(time:Long) = this.endTime = time
def getRunningTime:Long = endTime - startTime
def numsStage = this.stages.size
def getStageWithId(key:Int):SparkStage = this.stages.apply(key)
}
object SparkJob {
}
开发者ID:bingrao,项目名称:SparkListener,代码行数:28,代码来源:SparkJob.scala
示例5: DirectGraphLoader
//设置package包名称以及导入依赖的类
package org.apache.spark.graphx
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import it.unimi.dsi.fastutil.longs.LongOpenHashBigSet
object DirectGraphLoader extends Logging {
def edgeListFile(e: RDD[(Long, LongOpenHashBigSet)]): Graph[Int, Int] =
{
val canonicalOrientation: Boolean = false
val numEdgePartitions: Int = -1
val edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
val vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines = e.flatMap { case (vid, adj) => adj.toLongArray().map(dst => vid + "\t" + dst) }
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
throw new IllegalArgumentException("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges")
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
}
开发者ID:chan150,项目名称:TrillionG,代码行数:50,代码来源:DirectGraphLoader.scala
示例6: DefaultSource
//设置package包名称以及导入依赖的类
package org.apache.spark.sql.sparkcv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.bytedeco.javacpp.opencv_core.IplImage
import org.bytedeco.javacpp.opencv_imgcodecs.cvLoadImage
class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with Logging {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, new StructType())
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
assert(parameters.get("path").isDefined, "path parameter is required")
val image: IplImage = cvLoadImage("src/main/resources/birds-of-paradise.jpg")
ImageRelation(sqlContext, parameters, schema)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
ImageRelation(sqlContext, parameters, data.schema)
}
}
开发者ID:miguel0afd,项目名称:sparkCV,代码行数:31,代码来源:DefaultSource.scala
示例7: BLAS
//设置package包名称以及导入依赖的类
package org.apache.spark.mllib.sparselr.Utils
import org.apache.spark.internal.Logging
object BLAS extends Serializable with Logging{
def dot(a: Double, y: Vector): Unit = {
y match {
case hy: HashedSparseVector =>
dot(a, hy)
case _ =>
throw new IllegalArgumentException(s"dot doesn't support (Double, ${y.getClass}).")
}
}
private def dot(a: Double, y: HashedSparseVector): Unit = {
y.iterator.foreach {keyVal =>
y(keyVal._1) = a * keyVal._2
}
}
private def axpy(a: Double, x: CompressedSparseVector, y: HashedSparseVector): Unit = {
x.iterator.foreach { keyVal =>
y(keyVal._1) = a * keyVal._2 + y(keyVal._1)
}
}
}
开发者ID:intel-analytics,项目名称:SparseML,代码行数:29,代码来源:BLAS.scala
示例8: RedisShuffleWriter
//设置package包名称以及导入依赖的类
package org.apache.spark.shuffle.redis
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriter}
import org.apache.spark.{SparkEnv, TaskContext}
import redis.clients.jedis.{Jedis, JedisPool}
class RedisShuffleWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {
private val dep = handle.asInstanceOf[RedisShuffleHandle[Any, Any, Any]].dependency
private val blockManager = SparkEnv.get.blockManager
private val jedisPool = new JedisPool()
private var sorter: RedisSorter[Any, Any, Any] = null
// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don't try deleting files, etc twice.
private var stopping = false
private var mapStatus: MapStatus = null
override def stop(success: Boolean): Option[MapStatus] = {
try {
if (stopping) {
return None
}
stopping = true
if (success) {
return Option(mapStatus)
} else {
if (sorter != null) {
sorter.clean()
sorter = null
}
return None
}
} finally {
jedisPool.close()
}
}
}
开发者ID:ambling,项目名称:RedisShuffleManager,代码行数:51,代码来源:RedisShuffleWriter.scala
示例9: MyGraphLoader
//设置package包名称以及导入依赖的类
package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
object MyGraphLoader extends Logging{
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numVertexPartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: MyGraph[Int, Int] =
{
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines =
if (numVertexPartitions > 0) {
sc.textFile(path, numVertexPartitions).coalesce(numVertexPartitions)
} else {
sc.textFile(path)
}
val mid_data = lines.map(line => {
val parts = line.split("\\s+")
(parts(0).toLong, parts(1).toLong)
})++ lines.map(line => {
val parts = line.split("\\s+")
(parts(1).toLong,-1l)
}) ++ lines.map(line => {
val parts = line.split("\\s+")
(parts(0).toLong,-1l)
})
val links = mid_data.groupByKey(new HashPartitioner(numVertexPartitions)).cache()
println("It took %d ms to group".format(System.currentTimeMillis - startTime))
//?????0???????? ??(4,()) (5,())...
MyGraphImpl.fromEdgeList(links, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
}
开发者ID:yuanqingsunny,项目名称:graph,代码行数:54,代码来源:MyGraphLoader.scala
示例10: MyShippableVertexPartition
//设置package包名称以及导入依赖的类
package org.apache.spark.graphx
import org.apache.spark.graphx.impl.VertexIdToIndexMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.internal.Logging
import org.apache.spark.util.collection.BitSet
import scala.reflect.ClassTag
class MyShippableVertexPartition[VD]( val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
val local2global: Array[VertexId],
val values: Array[VD],
val mask: BitSet) extends Serializable with Logging{
// val capacity: Int = index.size
def size: Int = mask.capacity
def iterator: Iterator[(VertexId, VD)] =
mask.iterator.map(ind => (local2global(ind), values(ind)))
//
def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): MyShippableVertexPartition[VD2] = {
val newMask = new BitSet(size)
val newValues = new Array[VD2](size)
iter.foreach { product =>
val vid = product._1
val vdata = product._2
val pos = index.getOrElse(vid,-1)
if (pos >= 0) {
if (newMask.get(pos)) {
newValues(pos) = reduceFunc(newValues(pos), vdata)
} else { // otherwise just store the new value
newMask.set(pos)
newValues(pos) = vdata
}
// println("debug")
}
}
new MyShippableVertexPartition(index, local2global, newValues, newMask)
}
}
开发者ID:yuanqingsunny,项目名称:graph,代码行数:49,代码来源:MyShippableVertexPartition.scala
示例11: Jython
//设置package包名称以及导入依赖的类
package org.apache.spark.sql
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.jython.UserDefinedJythonFunction
/**
* Created by mariu_000 on 2016-11-09.
*/
object Jython {
/**
* Created by mariu_000 on 2016-11-09.
*/
implicit class JythonUDFRegistration(udfRegistration: UDFRegistration) extends Logging {
private def functionRegistry: FunctionRegistry = {
val field = this.udfRegistration.getClass.getDeclaredField("functionRegistry")
field.setAccessible(true)
field.get(this.udfRegistration).asInstanceOf[FunctionRegistry]
}
protected[sql] def registerJythonUDF(name: String, udf: UserDefinedJythonFunction): Unit = {
log.debug(
s"""
| Registering new JythonUDF:
| name: $name
| dataType: ${udf.dataType}
""".stripMargin)
functionRegistry.registerFunction(name, udf.builder)
}
}
}
开发者ID:mariusvniekerk,项目名称:spark-jython-udf,代码行数:37,代码来源:Jython.scala
示例12: ClickHouseSinkProvider
//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming
import io.clickhouse.ext.ClickHouseUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Encoders, SQLContext}
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag
abstract class ClickHouseSinkProvider[T <: Product: ClassTag](implicit tag: TypeTag[T]) extends StreamSinkProvider with Serializable with Logging {
def clickHouseServers: Seq[(String, Int)]
def dbName: String
def tableName: Option[String] = None
def eventDateColumnName: String
def indexColumns: Seq[String]
def partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): ClickHouseSink[T] = {
val typeEncoder = Encoders.product[T]
val schema = typeEncoder.schema
val _tableName = tableName.get //tableName.getOrElse(classOf[T].getName)
val createTableSql = ClickHouseUtils.createTableIfNotExistsSql(
schema,
dbName,
_tableName,
eventDateColumnName,
indexColumns
)
log.info("create new table sql:")
log.info(createTableSql)
val connection = ClickHouseUtils.createConnection(getConnectionString())
try{
connection.createStatement().execute(createTableSql)
}finally {
connection.close()
log.info(s"ClickHouse table ${dbName}.${_tableName} created")
}
log.info("Creating ClickHouse sink")
new ClickHouseSink[T](dbName, _tableName, eventDateColumnName)(getConnectionString)(partitionFunc)
}
def getConnectionString(): (String, Int) = clickHouseServers.head
}
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:55,代码来源:ClickHouseSinkProvider.scala
示例13: TestBroadCast
//设置package包名称以及导入依赖的类
package org.apache.spark.examples
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import scala.collection.mutable
object TestBroadCast extends Logging{
val sparkSession = SparkSession.builder().appName("test BoradCast").getOrCreate()
val sc = sparkSession.sparkContext
def main(args: Array[String]): Unit = {
// val data = sc.parallelize(Seq(1 until 10000000))
val num = args(args.length - 2).toInt
val times = args(args.length -1).toInt
println(num)
val start = System.nanoTime()
val seq =Seq(1 until num)
for(i <- 0 until times) {
val start2 = System.nanoTime()
val bc = sc.broadcast(seq)
val rdd = sc.parallelize(1 until 10, 5)
rdd.map(_ => bc.value.take(1)).collect()
println((System.nanoTime() - start2)/ 1e6 + "ms")
}
logInfo((System.nanoTime() - start) / 1e6 + "ms")
}
def testMap(): Unit ={
val smallRDD = sc.parallelize(Seq(1,2,3))
val bigRDD = sc.parallelize(Seq(1 until 20))
bigRDD.mapPartitions {
partition =>
val hashMap = new mutable.HashMap[Int,Int]()
for(ele <- smallRDD) {
hashMap(ele) = ele
}
// some operation here
partition
}
}
}
开发者ID:CASISCAS,项目名称:asyspark,代码行数:47,代码来源:TestBroadCast.scala
注:本文中的org.apache.spark.internal.Logging类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论