本文整理汇总了Scala中org.apache.spark.sql.Encoders类的典型用法代码示例。如果您正苦于以下问题:Scala Encoders类的具体用法?Scala Encoders怎么用?Scala Encoders使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Encoders类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ClickHouseSink
//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming
import io.clickhouse.ext.ClickHouseUtils
import io.clickhouse.ext.tools.Utils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.streaming.Sink
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag
class ClickHouseSink[T <: Product: ClassTag](dbName: String, tableName: String, eventDataColumn: String)
(getConnectionString: () => (String, Int)) // -> (host, port)
(partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date)
(implicit tag: TypeTag[T]) extends Sink with Serializable with Logging {
override def addBatch(batchId: Long, data: DataFrame) = {
val res = data.queryExecution.toRdd.mapPartitions{ iter =>
val stateUpdateEncoder = Encoders.product[T]
val schema = stateUpdateEncoder.schema
val exprEncoder = stateUpdateEncoder.asInstanceOf[ExpressionEncoder[T]]
if(iter.nonEmpty){
val clickHouseHostPort = getConnectionString()
Utils.using(ClickHouseUtils.createConnection(clickHouseHostPort)){ connection =>
val insertStatement = ClickHouseUtils.prepareInsertStatement(connection, dbName, tableName, eventDataColumn)(schema)
iter.foreach{ internalRow =>
val caseClassInstance = exprEncoder.resolveAndBind(
schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
).fromRow(internalRow)
val row = org.apache.spark.sql.Row.fromTuple(caseClassInstance)
ClickHouseUtils.batchAdd(schema, row)(insertStatement)(partitionFunc)
}
val inserted = insertStatement.executeBatch().sum
log.info(s"inserted $inserted -> (${clickHouseHostPort._1}:${clickHouseHostPort._2})")
List(inserted).toIterator
} // end: close connection
} else {
Iterator.empty
}
} // end: mapPartition
val insertedCount = res.collect().sum
log.info(s"Batch $batchId's inserted total: $insertedCount")
}
}
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:58,代码来源:ClickHouseSink.scala
示例2: GladLogLikelihoodAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.log
private[crowd] class GladLogLikelihoodAggregator(params: Broadcast[GladParams])
extends Aggregator[GladPartial, GladLogLikelihoodAggregatorBuffer, Double]{
def zero: GladLogLikelihoodAggregatorBuffer = GladLogLikelihoodAggregatorBuffer(0,-1)
def reduce(b: GladLogLikelihoodAggregatorBuffer, a: GladPartial) : GladLogLikelihoodAggregatorBuffer = {
val alphaVal = params.value.alpha(a.annotator.toInt)
val betaVal = a.beta
val sig = Functions.sigmoid(alphaVal*betaVal)
val p0 = 1-a.est
val p1 = a.est
val k0 = if (a.value == 0) sig else 1-sig
val k1 = if (a.value == 1) sig else 1-sig
GladLogLikelihoodAggregatorBuffer(b.agg + Functions.prodlog(p0,k0)
+ Functions.prodlog(p1,k1), p1)
}
def merge(b1: GladLogLikelihoodAggregatorBuffer, b2: GladLogLikelihoodAggregatorBuffer) : GladLogLikelihoodAggregatorBuffer = {
GladLogLikelihoodAggregatorBuffer(b1.agg + b2.agg, if (b1.classProb == -1) b2.classProb else b1.classProb)
}
def finish(reduction: GladLogLikelihoodAggregatorBuffer) = {
val agg = reduction.agg
val w0 = params.value.w(0)
val w1 = params.value.w(1)
val lastVal = reduction.agg + Functions.prodlog((1-reduction.classProb),params.value.w(0)) +
Functions.prodlog(reduction.classProb,params.value.w(1))
lastVal
}
def bufferEncoder: Encoder[GladLogLikelihoodAggregatorBuffer] = Encoders.product[GladLogLikelihoodAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladLogLikelihoodAggregatorBuffer(agg: Double, classProb: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:49,代码来源:GladLogLikelihoodAggregator.scala
示例3: ManchesterSyntaxOWLAxiomsDatasetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.dataset
import com.typesafe.scalalogging.Logger
import net.sansa_stack.owl.common.parsing.ManchesterSyntaxParsing
import org.apache.spark.sql.{Encoders, SparkSession}
import org.semanticweb.owlapi.io.OWLParserException
import org.semanticweb.owlapi.model.{OWLAxiom, OWLRuntimeException}
object ManchesterSyntaxOWLAxiomsDatasetBuilder extends ManchesterSyntaxParsing {
private val logger = Logger(this.getClass)
def build(spark: SparkSession, filePath: String): OWLAxiomsDataset = {
val res = ManchesterSyntaxOWLExpressionsDatasetBuilder.buildAndGetDefaultPrefix(spark, filePath)
val expressionsDataset = res._1
val defaultPrefix = res._2
build(expressionsDataset, defaultPrefix)
}
// FIXME: It has to be ensured that the expressionsDataset is in functional syntax
def build(expressionsDataset: OWLExpressionsDataset, defaultPrefix: String): OWLAxiomsDataset = {
implicit val encoder = Encoders.kryo[OWLAxiom]
expressionsDataset.filter(!_.startsWith("Annotations")).flatMap(frame => {
try makeAxioms(frame, defaultPrefix)
catch {
case exception: OWLParserException => {
val msg = exception.getMessage
logger.warn("Parser error for frame\n" + frame + "\n\n" + msg)
// exception.printStackTrace()
Set.empty[OWLAxiom]
}
case exception: OWLRuntimeException => {
val msg = exception.getMessage
logger.warn("Parser error for frame\n" + frame + "\n\n" + msg)
exception.printStackTrace()
Set.empty[OWLAxiom]
}
}
})
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:43,代码来源:ManchesterSyntaxOWLAxiomsDatasetBuilder.scala
示例4: FunctionalSyntaxOWLAxiomsDatasetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.dataset
import net.sansa_stack.owl.common.parsing.FunctionalSyntaxParsing
import org.apache.spark.sql.{Encoders, SparkSession}
import org.semanticweb.owlapi.model.OWLAxiom
object FunctionalSyntaxOWLAxiomsDatasetBuilder extends FunctionalSyntaxParsing {
def build(spark: SparkSession, filePath: String): OWLAxiomsDataset = {
build(FunctionalSyntaxOWLExpressionsDatasetBuilder.build(spark, filePath))
}
// FIXME: It has to be ensured that the expressionsDataset is in functional syntax
def build(expressionsDataset: OWLExpressionsDataset): OWLAxiomsDataset = {
implicit val encoder = Encoders.kryo[OWLAxiom]
expressionsDataset.map(expression => makeAxiom(expression)).
filter(axiom => axiom != null)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:20,代码来源:FunctionalSyntaxOWLAxiomsDatasetBuilder.scala
示例5: SQLDataProvider
//设置package包名称以及导入依赖的类
package org.ieee.codemeow.geometric.spark.data
import com.vividsolutions.jts.geom.Geometry
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.ieee.codemeow.geometric.Feature
import org.ieee.codemeow.geometric.spark.LayerConfiguration
class SQLDataProvider(_spark: SparkSession, _layer: LayerConfiguration) extends AbstractDataProvider(_spark, _layer){
val url = layer.kwargs.get("url").get.asInstanceOf[String]
val dbtables = layer.kwargs.get("dbtables").get.asInstanceOf[Map[String, String]]
val user = layer.kwargs.get("user").get.asInstanceOf[String]
val password = layer.kwargs.get("password").get.asInstanceOf[String]
val zoomConfig = layer.kwargs.get("zooms").get.asInstanceOf[Map[String, String]]
// load all tables
dbtables.foreach(tuple => {
val sparkTableName = tuple._1
val realTableName = tuple._2
val mapDataFrame = spark.read.format("jdbc")
.option("url", url)
.option("user", user)
.option("password", password)
.option("dbtable", realTableName).load
mapDataFrame.createOrReplaceTempView(sparkTableName)
})
override def getFeatures(layerName: String, zoom: Long): Option[Dataset[Feature]] ={
// Ref http://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat
import spark.implicits._
// Ref http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset
implicit val featureEncoder = Encoders.kryo[Feature]
val natureSQL = zoomConfig.get(zoom.toString)
if(natureSQL.isEmpty){
return None
}
val rawDF = spark.sql(natureSQL.get)
val featureCollection = rawDF.map(row => {
val id = row.getAs[Long]("__id__")
val geom = row.getAs[Geometry]("__geometry__")
val fields = row.schema.filter(field => {
!Seq("__id__", "__geometry__").contains(field.name)
}).map(field => field.name)
val props = row.getValuesMap[String](fields)
Feature(id, geom, props)
})
Some(featureCollection)
}
}
开发者ID:codemeow5,项目名称:Vector-Tile-Spark-Process,代码行数:56,代码来源:SQLDataProvider.scala
示例6: GladAlphaAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class GladAlphaAggregator(params: Broadcast[GladParams], learningRate: Double)
extends Aggregator[GladPartial, GladAlphaAggregatorBuffer, Double]{
def zero: GladAlphaAggregatorBuffer = GladAlphaAggregatorBuffer(0,-1)
def reduce(b: GladAlphaAggregatorBuffer, a: GladPartial) : GladAlphaAggregatorBuffer = {
val alpha = params.value.alpha
val al = alpha(a.annotator)
val bet = a.beta
val aest = a.est
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p = if (a.value == 1) a.est else (1-a.est)
val term = (p - sigmoidValue)*bet
GladAlphaAggregatorBuffer(b.agg + term, al)
}
def merge(b1: GladAlphaAggregatorBuffer, b2: GladAlphaAggregatorBuffer) : GladAlphaAggregatorBuffer = {
GladAlphaAggregatorBuffer(b1.agg + b2.agg, if (b1.alpha == -1) b2.alpha else b1.alpha )
}
def finish(reduction: GladAlphaAggregatorBuffer) = {
reduction.alpha + learningRate * reduction.agg
}
def bufferEncoder: Encoder[GladAlphaAggregatorBuffer] = Encoders.product[GladAlphaAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladAlphaAggregatorBuffer(agg: Double, alpha: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:40,代码来源:GladAlphaAggregator.scala
示例7: MulticlassMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.MulticlassAnnotation
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class MulticlassMVAggregator(nClasses: Int) extends Aggregator[MulticlassAnnotation, MulticlassMVPartial, Int]{
def sumKey(map: Map[Int,Long], pair: (Int,Long)) = {
val key = pair._1
val value = pair._2
val new_value = map.get(key) match {
case Some(x) => x + value
case None => value
}
map.updated(key, new_value)
}
def zero: MulticlassMVPartial = MulticlassMVPartial(Vector.fill(nClasses)(0),0)
def reduce(b: MulticlassMVPartial, a: MulticlassAnnotation) : MulticlassMVPartial = {
MulticlassMVPartial(b.aggVect.updated(a.value, b.aggVect(a.value) + 1), b.count + 1)
}
def merge(b1: MulticlassMVPartial, b2: MulticlassMVPartial) : MulticlassMVPartial = {
MulticlassMVPartial(b1.aggVect.zip(b2.aggVect).map(x => x._1 + x._2), b1.count + b2.count)
}
def finish(reduction: MulticlassMVPartial) = {
reduction.aggVect.indexOf(reduction.aggVect.max)
}
def bufferEncoder: Encoder[MulticlassMVPartial] = Encoders.product[MulticlassMVPartial]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:38,代码来源:MulticlassMVAggregator.scala
示例8: GladBetaAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class GladBetaAggregator(params: Broadcast[GladParams], learningRate: Double)
extends Aggregator[GladPartial, GladBetaAggregatorBuffer, Double]{
def zero: GladBetaAggregatorBuffer = GladBetaAggregatorBuffer(0,-1)
def reduce(b: GladBetaAggregatorBuffer, a: GladPartial) : GladBetaAggregatorBuffer = {
val alpha = params.value.alpha
val al = alpha(a.annotator)
val bet = a.beta
val aest = a.est
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p = if (a.value == 1) a.est else (1-a.est)
val term = (p - sigmoidValue)*alpha(a.annotator)
GladBetaAggregatorBuffer(b.agg + term, a.beta)
}
def merge(b1: GladBetaAggregatorBuffer, b2: GladBetaAggregatorBuffer) : GladBetaAggregatorBuffer = {
GladBetaAggregatorBuffer(b1.agg + b2.agg, if (b1.beta == -1) b2.beta else b1.beta)
}
def finish(reduction: GladBetaAggregatorBuffer) = {
reduction.beta + learningRate * reduction.agg
}
def bufferEncoder: Encoder[GladBetaAggregatorBuffer] = Encoders.product[GladBetaAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladBetaAggregatorBuffer(agg: Double, beta: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:40,代码来源:GladBetaAggregator.scala
示例9: BinarySoftMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import org.apache.spark.sql.{Encoder, Encoders}
import com.enriquegrodrigo.spark.crowd.types.BinaryAnnotation
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class BinarySoftMVAggregator extends Aggregator[BinaryAnnotation, BinaryMVPartial, Double]{
def zero: BinaryMVPartial = BinaryMVPartial(0,0)
def reduce(b: BinaryMVPartial, a: BinaryAnnotation) : BinaryMVPartial =
BinaryMVPartial(b.aggValue+a.value, b.count + 1)
def merge(b1: BinaryMVPartial, b2: BinaryMVPartial) : BinaryMVPartial =
BinaryMVPartial(b1.aggValue + b2.aggValue, b1.count + b2.count)
def finish(reduction: BinaryMVPartial) = {
val numerator: Double = reduction.aggValue
val denominator: Double = reduction.count
if (denominator == 0)
throw new IllegalArgumentException()
else {
(numerator / denominator)
}
}
def bufferEncoder: Encoder[BinaryMVPartial] = Encoders.product[BinaryMVPartial]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:31,代码来源:BinarySoftMVAggregator.scala
示例10: RaykarBinaryStatisticsAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.RaykarBinaryPartial
import com.enriquegrodrigo.spark.crowd.types.RaykarBinaryParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class RaykarBinaryStatisticsAggregator(params: Broadcast[RaykarBinaryParams])
extends Aggregator[RaykarBinaryPartial, RaykarBinaryStatisticsAggregatorBuffer, (Double,Double)]{
def zero: RaykarBinaryStatisticsAggregatorBuffer = RaykarBinaryStatisticsAggregatorBuffer(1,1) //Binary
def reduce(b: RaykarBinaryStatisticsAggregatorBuffer, a: RaykarBinaryPartial) : RaykarBinaryStatisticsAggregatorBuffer = {
val alphaValue = params.value.alpha(a.annotator)
val alphaTerm = if (a.value == 1) alphaValue else 1-alphaValue
val betaValue = params.value.beta(a.annotator)
val betaTerm = if (a.value == 0) betaValue else 1-betaValue
RaykarBinaryStatisticsAggregatorBuffer(b.a * alphaTerm, b.b * betaTerm)
}
def merge(b1: RaykarBinaryStatisticsAggregatorBuffer, b2: RaykarBinaryStatisticsAggregatorBuffer) : RaykarBinaryStatisticsAggregatorBuffer = {
RaykarBinaryStatisticsAggregatorBuffer(b1.a * b2.a, b1.b*b2.b)
}
def finish(reduction: RaykarBinaryStatisticsAggregatorBuffer) = {
(reduction.a,reduction.b)
}
def bufferEncoder: Encoder[RaykarBinaryStatisticsAggregatorBuffer] = Encoders.product[RaykarBinaryStatisticsAggregatorBuffer]
def outputEncoder: Encoder[(Double,Double)] = Encoders.product[(Double,Double)]
}
private[crowd] case class RaykarBinaryStatisticsAggregatorBuffer(a: Double, b: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:37,代码来源:RaykarBinaryStatisticsAggregator.scala
示例11: DawidSkeneLogLikelihoodAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.DawidSkenePartial
import com.enriquegrodrigo.spark.crowd.types.DawidSkeneParams
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.log
private[crowd] class DawidSkeneLogLikelihoodAggregator(params: Broadcast[DawidSkeneParams])
extends Aggregator[DawidSkenePartial, DawidSkeneLogLikelihoodAggregatorBuffer, Double]{
private def sumKey(map: Map[Int,Double], pair: (Int,Double)) = {
val key = pair._1
val value = pair._2
val new_value = map.get(key) match {
case Some(x) => x + value
case None => value
}
map.updated(key, new_value)
}
def zero: DawidSkeneLogLikelihoodAggregatorBuffer = DawidSkeneLogLikelihoodAggregatorBuffer(0, -1)
def reduce(b: DawidSkeneLogLikelihoodAggregatorBuffer, a: DawidSkenePartial) : DawidSkeneLogLikelihoodAggregatorBuffer = {
val pival = params.value.pi(a.annotator.toInt)(a.est)(a.value)
DawidSkeneLogLikelihoodAggregatorBuffer(b.agg + log(pival), a.est)
}
def merge(b1: DawidSkeneLogLikelihoodAggregatorBuffer, b2: DawidSkeneLogLikelihoodAggregatorBuffer) : DawidSkeneLogLikelihoodAggregatorBuffer = {
DawidSkeneLogLikelihoodAggregatorBuffer(b1.agg + b2.agg, if (b1.predClass == -1) b2.predClass else b1.predClass)
}
def finish(reduction: DawidSkeneLogLikelihoodAggregatorBuffer) = {
reduction.agg + log(params.value.w(reduction.predClass))
}
def bufferEncoder: Encoder[DawidSkeneLogLikelihoodAggregatorBuffer] = Encoders.product[DawidSkeneLogLikelihoodAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class DawidSkeneLogLikelihoodAggregatorBuffer(agg: Double, predClass: Int)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:46,代码来源:DawidSkeneLogLikelihoodAggregator.scala
示例12: DawidSkeneEAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.DawidSkenePartial
import com.enriquegrodrigo.spark.crowd.types.DawidSkeneParams
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class DawidSkeneEAggregator(params: Broadcast[DawidSkeneParams], nClasses: Int)
extends Aggregator[DawidSkenePartial, DawidSkeneAggregatorBuffer, Int]{
def zero: DawidSkeneAggregatorBuffer = DawidSkeneAggregatorBuffer(Vector.fill(nClasses)(1))
def reduce(b: DawidSkeneAggregatorBuffer, a: DawidSkenePartial) : DawidSkeneAggregatorBuffer = {
val pi = params.value.pi
val classCondi = Vector.range(0,nClasses).map( c => pi(a.annotator.toInt)(c)(a.value))
val newVect = classCondi.zip(b.aggVect).map(x => x._1 * x._2)
DawidSkeneAggregatorBuffer(newVect)
}
def merge(b1: DawidSkeneAggregatorBuffer, b2: DawidSkeneAggregatorBuffer) : DawidSkeneAggregatorBuffer = {
val buf = DawidSkeneAggregatorBuffer(b1.aggVect.zip(b2.aggVect).map(x => x._1 * x._2))
buf
}
def finish(reduction: DawidSkeneAggregatorBuffer) = {
val result = reduction.aggVect.zipWithIndex.maxBy(x => x._1*params.value.w(x._2))._2
result
}
def bufferEncoder: Encoder[DawidSkeneAggregatorBuffer] = Encoders.product[DawidSkeneAggregatorBuffer]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
private[crowd] case class DawidSkeneAggregatorBuffer(aggVect: scala.collection.Seq[Double])
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:37,代码来源:DawidSkeneEAggregator.scala
示例13: BinaryMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import org.apache.spark.sql.{Encoder, Encoders}
import com.enriquegrodrigo.spark.crowd.types.BinaryAnnotation
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class BinaryMVAggregator extends Aggregator[BinaryAnnotation, BinaryMVPartial, Int]{
def zero: BinaryMVPartial = BinaryMVPartial(0,0)
def reduce(b: BinaryMVPartial, a: BinaryAnnotation) : BinaryMVPartial =
BinaryMVPartial(b.aggValue+a.value, b.count + 1)
def merge(b1: BinaryMVPartial, b2: BinaryMVPartial) : BinaryMVPartial =
BinaryMVPartial(b1.aggValue + b2.aggValue, b1.count + b2.count)
def finish(reduction: BinaryMVPartial) = {
val numerator: Double = reduction.aggValue
val denominator: Double = reduction.count
if (denominator == 0)
throw new IllegalArgumentException()
else if ( (numerator / denominator) >= 0.5 )
1
else
0
}
def bufferEncoder: Encoder[BinaryMVPartial] = Encoders.product[BinaryMVPartial]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:32,代码来源:BinaryMVAggregator.scala
示例14: GladEAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.{log,exp}
private[crowd] class GladEAggregator(params: Broadcast[GladParams])
extends Aggregator[GladPartial, GladEAggregatorBuffer, Double]{
def zero: GladEAggregatorBuffer = GladEAggregatorBuffer(Vector.fill(2)(1)) //Binary
def reduce(b: GladEAggregatorBuffer, a: GladPartial) : GladEAggregatorBuffer = {
val alpha = params.value.alpha
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p0 = if (a.value == 0) sigmoidValue else (1 - sigmoidValue)
val p1 = if (a.value == 1) sigmoidValue else (1 - sigmoidValue)
GladEAggregatorBuffer(Vector(Functions.logLim(p0) + b.aggVect(0), Functions.logLim(p1) + b.aggVect(1)))
}
def merge(b1: GladEAggregatorBuffer, b2: GladEAggregatorBuffer) : GladEAggregatorBuffer = {
GladEAggregatorBuffer(b1.aggVect.zip(b2.aggVect).map(x => x._1 * x._2))
}
def finish(reduction: GladEAggregatorBuffer) = {
val w = params.value.w
val negative = exp(reduction.aggVect(0) + Functions.logLim(w(0)))
val positive = exp(reduction.aggVect(1) + Functions.logLim(w(1)))
val norm = negative + positive
positive/norm
}
def bufferEncoder: Encoder[GladEAggregatorBuffer] = Encoders.product[GladEAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladEAggregatorBuffer(aggVect: scala.collection.Seq[Double])
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:43,代码来源:GladEAggregator.scala
示例15: ClickHouseSinkProvider
//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming
import io.clickhouse.ext.ClickHouseUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Encoders, SQLContext}
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag
abstract class ClickHouseSinkProvider[T <: Product: ClassTag](implicit tag: TypeTag[T]) extends StreamSinkProvider with Serializable with Logging {
def clickHouseServers: Seq[(String, Int)]
def dbName: String
def tableName: Option[String] = None
def eventDateColumnName: String
def indexColumns: Seq[String]
def partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): ClickHouseSink[T] = {
val typeEncoder = Encoders.product[T]
val schema = typeEncoder.schema
val _tableName = tableName.get //tableName.getOrElse(classOf[T].getName)
val createTableSql = ClickHouseUtils.createTableIfNotExistsSql(
schema,
dbName,
_tableName,
eventDateColumnName,
indexColumns
)
log.info("create new table sql:")
log.info(createTableSql)
val connection = ClickHouseUtils.createConnection(getConnectionString())
try{
connection.createStatement().execute(createTableSql)
}finally {
connection.close()
log.info(s"ClickHouse table ${dbName}.${_tableName} created")
}
log.info("Creating ClickHouse sink")
new ClickHouseSink[T](dbName, _tableName, eventDateColumnName)(getConnectionString)(partitionFunc)
}
def getConnectionString(): (String, Int) = clickHouseServers.head
}
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:55,代码来源:ClickHouseSinkProvider.scala
注:本文中的org.apache.spark.sql.Encoders类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论