本文整理汇总了Scala中org.apache.spark.sql.expressions.Aggregator类的典型用法代码示例。如果您正苦于以下问题:Scala Aggregator类的具体用法?Scala Aggregator怎么用?Scala Aggregator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Aggregator类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GladLogLikelihoodAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.log
private[crowd] class GladLogLikelihoodAggregator(params: Broadcast[GladParams])
extends Aggregator[GladPartial, GladLogLikelihoodAggregatorBuffer, Double]{
def zero: GladLogLikelihoodAggregatorBuffer = GladLogLikelihoodAggregatorBuffer(0,-1)
def reduce(b: GladLogLikelihoodAggregatorBuffer, a: GladPartial) : GladLogLikelihoodAggregatorBuffer = {
val alphaVal = params.value.alpha(a.annotator.toInt)
val betaVal = a.beta
val sig = Functions.sigmoid(alphaVal*betaVal)
val p0 = 1-a.est
val p1 = a.est
val k0 = if (a.value == 0) sig else 1-sig
val k1 = if (a.value == 1) sig else 1-sig
GladLogLikelihoodAggregatorBuffer(b.agg + Functions.prodlog(p0,k0)
+ Functions.prodlog(p1,k1), p1)
}
def merge(b1: GladLogLikelihoodAggregatorBuffer, b2: GladLogLikelihoodAggregatorBuffer) : GladLogLikelihoodAggregatorBuffer = {
GladLogLikelihoodAggregatorBuffer(b1.agg + b2.agg, if (b1.classProb == -1) b2.classProb else b1.classProb)
}
def finish(reduction: GladLogLikelihoodAggregatorBuffer) = {
val agg = reduction.agg
val w0 = params.value.w(0)
val w1 = params.value.w(1)
val lastVal = reduction.agg + Functions.prodlog((1-reduction.classProb),params.value.w(0)) +
Functions.prodlog(reduction.classProb,params.value.w(1))
lastVal
}
def bufferEncoder: Encoder[GladLogLikelihoodAggregatorBuffer] = Encoders.product[GladLogLikelihoodAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladLogLikelihoodAggregatorBuffer(agg: Double, classProb: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:49,代码来源:GladLogLikelihoodAggregator.scala
示例2: GladAlphaAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class GladAlphaAggregator(params: Broadcast[GladParams], learningRate: Double)
extends Aggregator[GladPartial, GladAlphaAggregatorBuffer, Double]{
def zero: GladAlphaAggregatorBuffer = GladAlphaAggregatorBuffer(0,-1)
def reduce(b: GladAlphaAggregatorBuffer, a: GladPartial) : GladAlphaAggregatorBuffer = {
val alpha = params.value.alpha
val al = alpha(a.annotator)
val bet = a.beta
val aest = a.est
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p = if (a.value == 1) a.est else (1-a.est)
val term = (p - sigmoidValue)*bet
GladAlphaAggregatorBuffer(b.agg + term, al)
}
def merge(b1: GladAlphaAggregatorBuffer, b2: GladAlphaAggregatorBuffer) : GladAlphaAggregatorBuffer = {
GladAlphaAggregatorBuffer(b1.agg + b2.agg, if (b1.alpha == -1) b2.alpha else b1.alpha )
}
def finish(reduction: GladAlphaAggregatorBuffer) = {
reduction.alpha + learningRate * reduction.agg
}
def bufferEncoder: Encoder[GladAlphaAggregatorBuffer] = Encoders.product[GladAlphaAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladAlphaAggregatorBuffer(agg: Double, alpha: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:40,代码来源:GladAlphaAggregator.scala
示例3: MulticlassMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.MulticlassAnnotation
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class MulticlassMVAggregator(nClasses: Int) extends Aggregator[MulticlassAnnotation, MulticlassMVPartial, Int]{
def sumKey(map: Map[Int,Long], pair: (Int,Long)) = {
val key = pair._1
val value = pair._2
val new_value = map.get(key) match {
case Some(x) => x + value
case None => value
}
map.updated(key, new_value)
}
def zero: MulticlassMVPartial = MulticlassMVPartial(Vector.fill(nClasses)(0),0)
def reduce(b: MulticlassMVPartial, a: MulticlassAnnotation) : MulticlassMVPartial = {
MulticlassMVPartial(b.aggVect.updated(a.value, b.aggVect(a.value) + 1), b.count + 1)
}
def merge(b1: MulticlassMVPartial, b2: MulticlassMVPartial) : MulticlassMVPartial = {
MulticlassMVPartial(b1.aggVect.zip(b2.aggVect).map(x => x._1 + x._2), b1.count + b2.count)
}
def finish(reduction: MulticlassMVPartial) = {
reduction.aggVect.indexOf(reduction.aggVect.max)
}
def bufferEncoder: Encoder[MulticlassMVPartial] = Encoders.product[MulticlassMVPartial]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:38,代码来源:MulticlassMVAggregator.scala
示例4: GladBetaAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class GladBetaAggregator(params: Broadcast[GladParams], learningRate: Double)
extends Aggregator[GladPartial, GladBetaAggregatorBuffer, Double]{
def zero: GladBetaAggregatorBuffer = GladBetaAggregatorBuffer(0,-1)
def reduce(b: GladBetaAggregatorBuffer, a: GladPartial) : GladBetaAggregatorBuffer = {
val alpha = params.value.alpha
val al = alpha(a.annotator)
val bet = a.beta
val aest = a.est
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p = if (a.value == 1) a.est else (1-a.est)
val term = (p - sigmoidValue)*alpha(a.annotator)
GladBetaAggregatorBuffer(b.agg + term, a.beta)
}
def merge(b1: GladBetaAggregatorBuffer, b2: GladBetaAggregatorBuffer) : GladBetaAggregatorBuffer = {
GladBetaAggregatorBuffer(b1.agg + b2.agg, if (b1.beta == -1) b2.beta else b1.beta)
}
def finish(reduction: GladBetaAggregatorBuffer) = {
reduction.beta + learningRate * reduction.agg
}
def bufferEncoder: Encoder[GladBetaAggregatorBuffer] = Encoders.product[GladBetaAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladBetaAggregatorBuffer(agg: Double, beta: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:40,代码来源:GladBetaAggregator.scala
示例5: BinarySoftMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import org.apache.spark.sql.{Encoder, Encoders}
import com.enriquegrodrigo.spark.crowd.types.BinaryAnnotation
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class BinarySoftMVAggregator extends Aggregator[BinaryAnnotation, BinaryMVPartial, Double]{
def zero: BinaryMVPartial = BinaryMVPartial(0,0)
def reduce(b: BinaryMVPartial, a: BinaryAnnotation) : BinaryMVPartial =
BinaryMVPartial(b.aggValue+a.value, b.count + 1)
def merge(b1: BinaryMVPartial, b2: BinaryMVPartial) : BinaryMVPartial =
BinaryMVPartial(b1.aggValue + b2.aggValue, b1.count + b2.count)
def finish(reduction: BinaryMVPartial) = {
val numerator: Double = reduction.aggValue
val denominator: Double = reduction.count
if (denominator == 0)
throw new IllegalArgumentException()
else {
(numerator / denominator)
}
}
def bufferEncoder: Encoder[BinaryMVPartial] = Encoders.product[BinaryMVPartial]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:31,代码来源:BinarySoftMVAggregator.scala
示例6: RaykarBinaryStatisticsAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.RaykarBinaryPartial
import com.enriquegrodrigo.spark.crowd.types.RaykarBinaryParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class RaykarBinaryStatisticsAggregator(params: Broadcast[RaykarBinaryParams])
extends Aggregator[RaykarBinaryPartial, RaykarBinaryStatisticsAggregatorBuffer, (Double,Double)]{
def zero: RaykarBinaryStatisticsAggregatorBuffer = RaykarBinaryStatisticsAggregatorBuffer(1,1) //Binary
def reduce(b: RaykarBinaryStatisticsAggregatorBuffer, a: RaykarBinaryPartial) : RaykarBinaryStatisticsAggregatorBuffer = {
val alphaValue = params.value.alpha(a.annotator)
val alphaTerm = if (a.value == 1) alphaValue else 1-alphaValue
val betaValue = params.value.beta(a.annotator)
val betaTerm = if (a.value == 0) betaValue else 1-betaValue
RaykarBinaryStatisticsAggregatorBuffer(b.a * alphaTerm, b.b * betaTerm)
}
def merge(b1: RaykarBinaryStatisticsAggregatorBuffer, b2: RaykarBinaryStatisticsAggregatorBuffer) : RaykarBinaryStatisticsAggregatorBuffer = {
RaykarBinaryStatisticsAggregatorBuffer(b1.a * b2.a, b1.b*b2.b)
}
def finish(reduction: RaykarBinaryStatisticsAggregatorBuffer) = {
(reduction.a,reduction.b)
}
def bufferEncoder: Encoder[RaykarBinaryStatisticsAggregatorBuffer] = Encoders.product[RaykarBinaryStatisticsAggregatorBuffer]
def outputEncoder: Encoder[(Double,Double)] = Encoders.product[(Double,Double)]
}
private[crowd] case class RaykarBinaryStatisticsAggregatorBuffer(a: Double, b: Double)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:37,代码来源:RaykarBinaryStatisticsAggregator.scala
示例7: DawidSkeneLogLikelihoodAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.DawidSkenePartial
import com.enriquegrodrigo.spark.crowd.types.DawidSkeneParams
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.log
private[crowd] class DawidSkeneLogLikelihoodAggregator(params: Broadcast[DawidSkeneParams])
extends Aggregator[DawidSkenePartial, DawidSkeneLogLikelihoodAggregatorBuffer, Double]{
private def sumKey(map: Map[Int,Double], pair: (Int,Double)) = {
val key = pair._1
val value = pair._2
val new_value = map.get(key) match {
case Some(x) => x + value
case None => value
}
map.updated(key, new_value)
}
def zero: DawidSkeneLogLikelihoodAggregatorBuffer = DawidSkeneLogLikelihoodAggregatorBuffer(0, -1)
def reduce(b: DawidSkeneLogLikelihoodAggregatorBuffer, a: DawidSkenePartial) : DawidSkeneLogLikelihoodAggregatorBuffer = {
val pival = params.value.pi(a.annotator.toInt)(a.est)(a.value)
DawidSkeneLogLikelihoodAggregatorBuffer(b.agg + log(pival), a.est)
}
def merge(b1: DawidSkeneLogLikelihoodAggregatorBuffer, b2: DawidSkeneLogLikelihoodAggregatorBuffer) : DawidSkeneLogLikelihoodAggregatorBuffer = {
DawidSkeneLogLikelihoodAggregatorBuffer(b1.agg + b2.agg, if (b1.predClass == -1) b2.predClass else b1.predClass)
}
def finish(reduction: DawidSkeneLogLikelihoodAggregatorBuffer) = {
reduction.agg + log(params.value.w(reduction.predClass))
}
def bufferEncoder: Encoder[DawidSkeneLogLikelihoodAggregatorBuffer] = Encoders.product[DawidSkeneLogLikelihoodAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class DawidSkeneLogLikelihoodAggregatorBuffer(agg: Double, predClass: Int)
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:46,代码来源:DawidSkeneLogLikelihoodAggregator.scala
示例8: DawidSkeneEAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.DawidSkenePartial
import com.enriquegrodrigo.spark.crowd.types.DawidSkeneParams
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
private[crowd] class DawidSkeneEAggregator(params: Broadcast[DawidSkeneParams], nClasses: Int)
extends Aggregator[DawidSkenePartial, DawidSkeneAggregatorBuffer, Int]{
def zero: DawidSkeneAggregatorBuffer = DawidSkeneAggregatorBuffer(Vector.fill(nClasses)(1))
def reduce(b: DawidSkeneAggregatorBuffer, a: DawidSkenePartial) : DawidSkeneAggregatorBuffer = {
val pi = params.value.pi
val classCondi = Vector.range(0,nClasses).map( c => pi(a.annotator.toInt)(c)(a.value))
val newVect = classCondi.zip(b.aggVect).map(x => x._1 * x._2)
DawidSkeneAggregatorBuffer(newVect)
}
def merge(b1: DawidSkeneAggregatorBuffer, b2: DawidSkeneAggregatorBuffer) : DawidSkeneAggregatorBuffer = {
val buf = DawidSkeneAggregatorBuffer(b1.aggVect.zip(b2.aggVect).map(x => x._1 * x._2))
buf
}
def finish(reduction: DawidSkeneAggregatorBuffer) = {
val result = reduction.aggVect.zipWithIndex.maxBy(x => x._1*params.value.w(x._2))._2
result
}
def bufferEncoder: Encoder[DawidSkeneAggregatorBuffer] = Encoders.product[DawidSkeneAggregatorBuffer]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
private[crowd] case class DawidSkeneAggregatorBuffer(aggVect: scala.collection.Seq[Double])
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:37,代码来源:DawidSkeneEAggregator.scala
示例9: BinaryMVAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import org.apache.spark.sql.{Encoder, Encoders}
import com.enriquegrodrigo.spark.crowd.types.BinaryAnnotation
import org.apache.spark.sql.expressions.Aggregator
private[crowd] class BinaryMVAggregator extends Aggregator[BinaryAnnotation, BinaryMVPartial, Int]{
def zero: BinaryMVPartial = BinaryMVPartial(0,0)
def reduce(b: BinaryMVPartial, a: BinaryAnnotation) : BinaryMVPartial =
BinaryMVPartial(b.aggValue+a.value, b.count + 1)
def merge(b1: BinaryMVPartial, b2: BinaryMVPartial) : BinaryMVPartial =
BinaryMVPartial(b1.aggValue + b2.aggValue, b1.count + b2.count)
def finish(reduction: BinaryMVPartial) = {
val numerator: Double = reduction.aggValue
val denominator: Double = reduction.count
if (denominator == 0)
throw new IllegalArgumentException()
else if ( (numerator / denominator) >= 0.5 )
1
else
0
}
def bufferEncoder: Encoder[BinaryMVPartial] = Encoders.product[BinaryMVPartial]
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:32,代码来源:BinaryMVAggregator.scala
示例10: GladEAggregator
//设置package包名称以及导入依赖的类
package com.enriquegrodrigo.spark.crowd.aggregators
import com.enriquegrodrigo.spark.crowd.types.GladPartial
import com.enriquegrodrigo.spark.crowd.types.GladParams
import com.enriquegrodrigo.spark.crowd.utils.Functions
import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.broadcast.Broadcast
import scala.math.{log,exp}
private[crowd] class GladEAggregator(params: Broadcast[GladParams])
extends Aggregator[GladPartial, GladEAggregatorBuffer, Double]{
def zero: GladEAggregatorBuffer = GladEAggregatorBuffer(Vector.fill(2)(1)) //Binary
def reduce(b: GladEAggregatorBuffer, a: GladPartial) : GladEAggregatorBuffer = {
val alpha = params.value.alpha
val sigmoidValue = Functions.sigmoid(alpha(a.annotator)*a.beta)
val p0 = if (a.value == 0) sigmoidValue else (1 - sigmoidValue)
val p1 = if (a.value == 1) sigmoidValue else (1 - sigmoidValue)
GladEAggregatorBuffer(Vector(Functions.logLim(p0) + b.aggVect(0), Functions.logLim(p1) + b.aggVect(1)))
}
def merge(b1: GladEAggregatorBuffer, b2: GladEAggregatorBuffer) : GladEAggregatorBuffer = {
GladEAggregatorBuffer(b1.aggVect.zip(b2.aggVect).map(x => x._1 * x._2))
}
def finish(reduction: GladEAggregatorBuffer) = {
val w = params.value.w
val negative = exp(reduction.aggVect(0) + Functions.logLim(w(0)))
val positive = exp(reduction.aggVect(1) + Functions.logLim(w(1)))
val norm = negative + positive
positive/norm
}
def bufferEncoder: Encoder[GladEAggregatorBuffer] = Encoders.product[GladEAggregatorBuffer]
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
private[crowd] case class GladEAggregatorBuffer(aggVect: scala.collection.Seq[Double])
开发者ID:enriquegrodrigo,项目名称:spark-crowd,代码行数:43,代码来源:GladEAggregator.scala
注:本文中的org.apache.spark.sql.expressions.Aggregator类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论