• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala LabeledPoint类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.mllib.regression.LabeledPoint的典型用法代码示例。如果您正苦于以下问题:Scala LabeledPoint类的具体用法?Scala LabeledPoint怎么用?Scala LabeledPoint使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了LabeledPoint类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: KMeansClusteringApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.doubleRDDToDoubleRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object KMeansClusteringApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: KMeansClusteringApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val orientationStream = substream
      .map(f => Seq(1, 4, 5, 6, 10, 11, 12, 20, 21, 22, 26, 27, 28, 36, 37, 38, 42, 43, 44).map(i => f(i)).toArray)
      .map(arr => arr.map(_.toDouble))
      .filter(f => f(0) == 1.0 || f(0) == 2.0 || f(0) == 3.0)
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, f.length))))
    val test = orientationStream.transform(rdd => rdd.randomSplit(Array(0.3, 0.7))(0))
    val train = orientationStream.transformWith(test, (r1: RDD[LabeledPoint], r2: RDD[LabeledPoint]) => r1.subtract(r2)).cache()
    val model = new StreamingKMeans()
      .setK(3)
      .setDecayFactor(0)
      .setRandomCenters(18, 0.0)

    model.trainOn(train.map(v => v.features))
    val prediction = model.predictOnValues(test.map(v => (v.label, v.features)))

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:54,代码来源:L9-10KMeans.scala


示例2: MllibLBFGS

//设置package包名称以及导入依赖的类
package optimizers

import breeze.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import utils.Functions._


class MllibLBFGS(val data: RDD[LabeledPoint],
                 loss: LossFunction,
                 regularizer: Regularizer,
                 params: LBFGSParameters
                ) extends Optimizer(loss, regularizer) {

  val opt = new LogisticRegressionWithLBFGS

  val reg: Updater = (regularizer: Regularizer) match {
    case _: L1Regularizer => new L1Updater
    case _: L2Regularizer => new SquaredL2Updater
    case _: Unregularized => new SimpleUpdater
  }

  opt.optimizer.
    setNumIterations(params.iterations).
    setConvergenceTol(params.convergenceTol).
    setNumCorrections(params.numCorrections).
    setRegParam(regularizer.lambda).
    setUpdater(reg)

  override def optimize(): Vector[Double] = {
    val model = opt.run(data)
    val w = model.weights.toArray
    return DenseVector(w)
  }
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:38,代码来源:MllibLBFGS.scala


示例3: Classifier

//设置package包名称以及导入依赖的类
package edu.neu.coe.scala.spark.spam

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext


object Classifier extends App {
  
  val conf = new SparkConf().setAppName("spam")
  val sc = new SparkContext(conf)
  val spam = sc.textFile("spam.txt")
  val norm = sc.textFile("normal.txt")

  val tf = new HashingTF(10000)
  val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
  val normFeatures = norm.map(email => tf.transform(email.split(" ")))
  
  val posExamples = spamFeatures.map(f => LabeledPoint(1, f))
  val negExamples = normFeatures.map(f => LabeledPoint(0, f))
  val trainingData = posExamples.union(negExamples)
  trainingData.cache()
  
  val model = new LogisticRegressionWithSGD().run(trainingData)
  
  val posTest = tf.transform("Subject: Cheap Stuff From: <omg.fu> O M G GET cheap stuff by sending money to Robin Hillyard".split(" "))
  val negTest = tf.transform("Subject: Spark From: Robin Hillyard<[email protected]> Hi Adam, I started studying Spark the other day".split(" "))
  
  println(s"Prediction for positive test example: ${model.predict(posTest)}")
  println(s"Prediction for negative test example: ${model.predict(negTest)}")
} 
开发者ID:menezesl,项目名称:Scala-repo,代码行数:34,代码来源:Classifier.scala


示例4: SVMPipeline

//设置package包名称以及导入依赖的类
package org.stumbleuponclassifier

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint


object SVMPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def svmPipeline(sc: SparkContext) = {
    val records = sc.textFile("/home/ubuntu/work/ml-resources/spark-ml/train_noheader.tsv").map(line => line.split("\t"))

    val data = records.map { r =>
      val trimmed = r.map(_.replaceAll("\"", ""))
      val label = trimmed(r.size - 1).toInt
      val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
      LabeledPoint(label, Vectors.dense(features))
    }

    // params for SVM
    val numIterations = 10

    // Run training algorithm to build the model
    val svmModel = SVMWithSGD.train(data, numIterations)

    // Clear the default threshold.
    svmModel.clearThreshold()

    val svmTotalCorrect = data.map { point =>
      if(svmModel.predict(point.features) == point.label) 1 else 0
    }.sum()

    // calculate accuracy
    val svmAccuracy = svmTotalCorrect / data.count()
    println(svmAccuracy)
  }

} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:42,代码来源:SVMPipeline.scala


示例5: MllibSGD

//设置package包名称以及导入依赖的类
package optimizers

import breeze.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
import org.apache.spark.rdd.RDD
import utils.Functions._

import scala.tools.cmd.gen.AnyVals.D




class MllibSGD(val data: RDD[LabeledPoint],
               loss: LossFunction,
               regularizer: Regularizer,
               params: SGDParameters,
               ctype: String
              ) extends Optimizer(loss, regularizer) {
  val opt = ctype match {
    case "SVM" => new SVMWithSGD()
    case "LR" => new LogisticRegressionWithSGD()
    case "Regression" => new LinearRegressionWithSGD()
  }

  val reg: Updater = (regularizer: Regularizer) match {
    case _: L1Regularizer => new L1Updater
    case _: L2Regularizer => new SquaredL2Updater
    case _: Unregularized => new SimpleUpdater
  }

  ctype match {
    case "SVM" => opt.asInstanceOf[SVMWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
    case "LR" => opt.asInstanceOf[LogisticRegressionWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
    case "Regression" => opt.asInstanceOf[LinearRegressionWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
  }

  override def optimize(): Vector[Double] = {
    val model = opt.run(data)
    val w = model.weights.toArray
    DenseVector(w)
  }
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:60,代码来源:MllibSGD.scala


示例6: CocoaParameters

//设置package包名称以及导入依赖的类
package optimizers

import java.io.Serializable

import breeze.linalg.DenseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD


class CocoaParameters(var n: Int,
                      var numRounds: Int,
                      var localIterFrac: Double,
                      var lambda: Double,
                      var beta: Double,
                      var gamma: Double,
                      var numParts: Int,
                      var wInit: DenseVector[Double])  extends Serializable  {
  def this(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) {
    this(train.count().toInt,
      200,
      1.0,
      0.01,
      1.0,
      1.0,
      train.partitions.size,
      DenseVector.zeros[Double](train.first().features.size))
  }
  def getLocalIters() = (localIterFrac * n / numParts).toInt

  def getDistOptPar(): distopt.utils.Params ={
    val loss = distopt.utils.OptUtils.hingeLoss _
    return distopt.utils.Params(loss, n, wInit, numRounds, getLocalIters, lambda, beta, gamma)
  }

  override def toString = s"CocoaParameters(n: $n, numRounds: $numRounds, localIters: $getLocalIters, " +
    s"lambda: $lambda, beta: $beta, gamma: $gamma, wInit: $wInit)"
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:38,代码来源:CocoaParameters.scala


示例7: ProxCocoaParameters

//设置package包名称以及导入依赖的类
package optimizers

import java.io.Serializable

import breeze.linalg.SparseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD


class ProxCocoaParameters(var n: Int,
                          var iterations: Int,
                          var localIterFrac: Double,
                          var lambda: Double,
                          var eta: Double,
                          var numFeature: Int,
                          var numParts: Int,
                          var alphaInit: SparseVector[Double])  extends Serializable  {
  def this(train: RDD[LabeledPoint], test: RDD[LabeledPoint], eta: Double = 0.5) {
    this(train.count().toInt,
      100,
      0.9,
      0.1,
      eta,
      train.first().features.size,
      train.partitions.size,
      SparseVector.zeros[Double](train.first().features.size))
  }
  def getLocalIters =  Math.max(1, (localIterFrac * numFeature / numParts).toInt)

  def getL1DistOptPar(): l1distopt.utils.Params = {
    return l1distopt.utils.Params(alphaInit, n, iterations, getLocalIters, lambda, eta)
  }

  override def toString = s"ProxCocoaParameters(n: $n, iterations: $iterations, " +
    s"localIters: $getLocalIters, lambda: $lambda, eta: $eta, alphaInit: $alphaInit)"
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:37,代码来源:ProxCocoaParameters.scala


示例8: wilcoxon

//设置package包名称以及导入依赖的类
package org.robertdodier

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object wilcoxon
{
  def U (XC: RDD[(Double, Int)]): Double =
  {
    val XC_sorted = XC.sortBy { case (a, b) => a }
    val foo = XC_sorted.zipWithIndex ()
    val bar = foo.map { case ((a, b), c) => (a, (b, c)) }
    val baz = bar.aggregateByKey ((0L, 0L)) ( {case ((a, b), (c, d)) => (a + 1, b + d)}, {case ((a, b), (c, d)) => (a + c, b + d)} )
    val quux = baz.map { case (a, (b, c)) => (a, c/b.toDouble) }
    val mumble = XC_sorted.join (quux)
    val blurf = mumble.filter { case (a, (b, c)) => b == 1 }
    val rank_sum = blurf.aggregate (0.0) ( {case (a, (b, (c, d))) => a + d}, {(a, b) => a + b} )
    val n = mumble.count ()
    val n1 = blurf.count ()
    val n0 = n - n1

    ((rank_sum + n1) - n1*(n1 + 1.0)/2.0)/(n1 * n0.toDouble)
  }

  def allU (CXXX : RDD[LabeledPoint]): Seq[Double] =
  {
    val m = CXXX.first ().features.size
    0 to m - 1 map (i => U (CXXX.map (p => (p.features(i), p.label.toInt))))
  }

  def main (args: Array[String]): Unit = {
    val conf = new SparkConf ().setAppName ("wilcoxon")
    val sc = new SparkContext (conf)

    run_example (sc)
  }

  def run_example (sc: SparkContext) = {
    val rng = new java.util.Random (1L)
    val data0 = for (i <- Range (0, 1000)) yield (rng.nextGaussian, 0)

    for (i <- Range (0, 3)) {
      val mean_diff = 0.5*i
      val data1 = for (i <- Range (0, 1000)) yield (mean_diff + rng.nextGaussian, 1)
      val data = sc.parallelize (data0 ++ data1)
      val myU = U (data)

      System.out.println (s"difference of means = $mean_diff; U/(n1*n0) = $myU")
    }

    for (i <- Range (0, 3)) {
      val mean_diff = - 0.5*(i + 1)
      val data1 = for (i <- Range (0, 1000)) yield (mean_diff + rng.nextGaussian, 1)
      val data = sc.parallelize (data0 ++ data1)
      val myU = U (data)

      System.out.println (s"difference of means = $mean_diff; U/(n1*n0) = $myU")
    }
  }
} 
开发者ID:robert-dodier,项目名称:spark-wilcoxon,代码行数:62,代码来源:wilcoxon.scala


示例9: TitanicBayes

//设置package包名称以及导入依赖的类
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}


object TitanicBayes {

  var naiveBayesModel: NaiveBayesModel = null

  def train(df: DataFrame): Unit = {
    val mappedDf = df.map(row =>
      (row.getAs[Int]("Survived"), row.getAs[Double]("Fare"), row.getAs[Int]("Pclass"), row.getAs[Double]("Age")
        ,row.getAs[Int]("Sex"), row.getAs[Int]("Parch"), row.getAs[Int]("SibSp"),row.getAs[Int]("Embarked")))

    val labledData = mappedDf.map { case (survived, fare, pclass, age, sex, parch, sibsp, embarked) =>
      LabeledPoint(survived, Vectors.dense(fare, pclass, age, sex, parch, sibsp, embarked))
    }
    naiveBayesModel = NaiveBayes.train(labledData, lambda = 1.0, modelType = "multinomial")

  }

  def predict(df: DataFrame): RDD[Row] = {

    val resultDf = df.map { row =>
      val denseVecor = Vectors.dense(row.getAs[Double]("Fare"), row.getAs[Int]("Pclass"), row.getAs[Double]("Age"),row.getAs[Int]("Sex"),
        row.getAs[Int]("Parch"), row.getAs[Int]("SibSp"), row.getAs[Int]("Embarked") )
      val result = naiveBayesModel.predict(denseVecor)
      Row.fromTuple((row.getAs[Int]("PassengerId"), result.toInt))
    }
    resultDf
  }


} 
开发者ID:digital-thinking,项目名称:spark-titanic,代码行数:37,代码来源:TitanicBayes.scala


示例10: StreamingSimpleModel

//设置package包名称以及导入依赖的类
package com.bigchange.streaming

import breeze.linalg.DenseVector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingSimpleModel {

  def main(args: Array[String]) {

    val ssc = new StreamingContext("local","test",Seconds(10))
    val stream = ssc.socketTextStream("localhost",9999)
    val numberFeatures = 100
    val zeroVector = DenseVector.zeros[Double](numberFeatures)
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(zeroVector.data))
      .setNumIterations(1)
      .setStepSize(0.01)


    val labeledStream = stream.map { event =>
      val split = event.split("\t")
      val y = split(0).toDouble
      val features = split(1).split(",").map(_.toDouble)
      LabeledPoint(label = y, features = Vectors.dense(features))
    }

    model.trainOn(labeledStream)
    // ??DStream?????
    val predictAndTrue = labeledStream.transform { rdd =>
     val latestModel = model.latestModel()
      rdd.map { point =>
        val predict = latestModel.predict(point.features)
        predict - point.label
      }
    }
    // ??MSE
    predictAndTrue.foreachRDD { rdd =>
      val  mse = rdd.map(x => x * x).mean()
      val rmse = math.sqrt(mse)
      println(s"current batch, MSE: $mse, RMSE:$rmse")

    }
    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:bigchange,项目名称:AI,代码行数:51,代码来源:StreamingSimpleModel.scala


示例11: FeaturesParser

//设置package包名称以及导入依赖的类
package com.evrenio.ml

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD



object FeaturesParser{
  def parseFeatures(rawdata: RDD[String]): RDD[Vector] = {
    val rdd: RDD[Array[Double]] = rawdata.map(_.split(",").map(_.toDouble))
    val vectors: RDD[Vector] = rdd.map(arrDouble => Vectors.dense(arrDouble))
    vectors
  }

  def parseFeaturesWithLabel(cvData: RDD[String]): RDD[LabeledPoint] = {
    val rdd: RDD[Array[Double]] = cvData.map(_.split(",").map(_.toDouble))
    val labeledPoints = rdd.map(arrDouble => new LabeledPoint(arrDouble(0), Vectors.dense(arrDouble.slice(1, arrDouble.length))))
    labeledPoints
  }
} 
开发者ID:weburnit,项目名称:anomaly,代码行数:22,代码来源:FeaturesParser.scala


示例12: DecisionTreeUtil

//设置package包名称以及导入依赖的类
package org.sparksamples.decisiontree

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD
import org.sparksamples.Util

import scala.collection.Map
import scala.collection.mutable.ListBuffer


object DecisionTreeUtil {

  def getTrainTestData(): (RDD[LabeledPoint], RDD[LabeledPoint]) = {
    val recordsArray = Util.getRecords()
    val records = recordsArray._1
    val first = records.first()
    val numData = recordsArray._2

    println(numData.toString())
    records.cache()
    print("Mapping of first categorical feature column: " +  Util.get_mapping(records, 2))
    var list = new ListBuffer[Map[String, Long]]()
    for( i <- 2 to 9){
      val m = Util.get_mapping(records, i)
      list += m
    }
    val mappings = list.toList
    var catLen = 0
    mappings.foreach( m => (catLen +=m.size))

    val numLen = records.first().slice(11, 15).size
    val totalLen = catLen + numLen

    val data = {
      records.map(r => LabeledPoint(Util.extractLabel(r), Util.extractFeatures(r, catLen, mappings)))
    }
    val data_dt = {
      records.map(r => LabeledPoint(Util.extractLabel(r), Util.extract_features_dt(r)))
    }

    val splits = data_dt.randomSplit(Array(0.8, 0.2), seed = 11L)
    val training = splits(0).cache()
    val test = splits(1)
    return (training, test)
  }

  def evaluate(train: RDD[LabeledPoint],test: RDD[LabeledPoint],
               categoricalFeaturesInfo: scala.Predef.Map[Int, Int],
                maxDepth :Int, maxBins: Int): Double = {
    val impurity = "variance"
    val decisionTreeModel = DecisionTree.trainRegressor(train, categoricalFeaturesInfo,
      impurity,maxDepth, maxBins )

    val true_vs_predicted = test.map(p => (p.label, decisionTreeModel.predict(p.features)))
    val rmsle = Math.sqrt(true_vs_predicted.map{ case(t, p) => Util.squaredLogError(t, p)}.mean())
    return rmsle
  }

} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:61,代码来源:DecisionTreeUtil.scala


示例13: SVMPipeline

//设置package包名称以及导入依赖的类
package org.sparksamples.classification.stumbleupon

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint


object SVMPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def svmPipeline(sc: SparkContext) = {
    val records = sc.textFile("/home/ubuntu/work/ml-resources/spark-ml/train_noheader.tsv").map(line => line.split("\t"))

    val data = records.map { r =>
      val trimmed = r.map(_.replaceAll("\"", ""))
      val label = trimmed(r.size - 1).toInt
      val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
      LabeledPoint(label, Vectors.dense(features))
    }

    // params for SVM
    val numIterations = 10

    // Run training algorithm to build the model
    val svmModel = SVMWithSGD.train(data, numIterations)

    // Clear the default threshold.
    svmModel.clearThreshold()

    val svmTotalCorrect = data.map { point =>
      if(svmModel.predict(point.features) == point.label) 1 else 0
    }.sum()

    // calculate accuracy
    val svmAccuracy = svmTotalCorrect / data.count()
    println(svmAccuracy)
  }

} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:42,代码来源:SVMPipeline.scala


示例14: MLLibRandomForestModel

//设置package包名称以及导入依赖的类
package com.asto.dmp.articlecate.biz

import com.asto.dmp.articlecate.base.Props
import com.asto.dmp.articlecate.utils.FileUtils
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import com.asto.dmp.articlecate.biz.ClsFeaturesParser._
import scala.collection._

class MLLibRandomForestModel(val sc: SparkContext, val modelPath: String) extends scala.Serializable with Logging {

  def genRandomForestModel(svmTrainDataPath: String) = {
    val numClasses = ClsFeaturesParser.clsNameToCodeMap.size //Util.parseMapFrom(clsIndicesPath, nameToCode = true).size
    val categoricalFeaturesInfo = immutable.Map[Int, Int]()
    val numTrees = Props.get("model_numTrees").toInt
    val featureSubsetStrategy = Props.get("model_featureSubsetStrategy") // Let the algorithm choose.
    val impurity = Props.get("model_impurity")
    val maxDepth = Props.get("model_maxDepth").toInt
    val maxBins = Props.get("model_maxBins").toInt

    val trainingData = MLUtils.loadLibSVMFile(sc, svmTrainDataPath).cache()
    val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
      numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

    FileUtils.deleteFilesInHDFS(modelPath)
    model.save(sc, modelPath)

    testErrorRate(trainingData, model)
  }

  
  private def testErrorRate(trainingData: RDD[LabeledPoint], model: RandomForestModel) = {
    if (Props.get("model_test").toBoolean) {
      val testData = trainingData.sample(false, Props.get("model_sampleRate").toDouble)
      val labelAndPreds = testData.map { point =>
        val prediction = model.predict(point.features)
        (point.label, prediction)
      }
      val testError = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
      logInfo(s"????????????$testError")
    } else {
      logInfo(s"???????????")
    }
  }

  def predictAndSave(lineAndVectors: Array[(String, org.apache.spark.mllib.linalg.Vector)], resultPath: String) = {
    val model = RandomForestModel.load(sc, modelPath)
    val result = lineAndVectors.map(lv => (s"${clsCodeToNameMap(model.predict(lv._2).toInt.toString)}\t${lv._1}")).mkString("\n")
    FileUtils.saveFileToHDFS(resultPath, result)
  }
} 
开发者ID:luciuschina,项目名称:ArticleCategories,代码行数:56,代码来源:MLLibRandomForestModel.scala


示例15: FeatureExtractionApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object FeatureExtractionApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: FeatureExtractionApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val datastream = substream.map(f => Array(f(1), f(4), f(5), f(6), f(20), f(21), f(22), f(36), f(37), f(38)))
      .map(f => f.map(v => v.toDouble))
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, f.length).map(f => f / 2048))))

    datastream.foreachRDD(rdd => {
      val selector = new ChiSqSelector(5)
      val model = selector.fit(rdd)
      val filtered = rdd.map(p => LabeledPoint(p.label, model.transform(p.features)))
      filtered.take(20).foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:48,代码来源:L9-7FeatureExtraction.scala


示例16: PCAApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object PCAApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: PCAApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val datastream = substream.map(f => Array(f(1), f(4), f(5), f(6), f(20), f(21), f(22), f(36), f(37), f(38)))
      .map(f => f.map(v => v.toDouble))
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, f.length))))

    datastream.foreachRDD(rdd => {
      val pca = new PCA(rdd.first().features.size / 2)
        .fit(rdd.map(_.features))
      val testTrain = rdd.randomSplit(Array(0.3, 0.7))
      val test = testTrain(0).map(lp => lp.copy(features = pca.transform(lp.features)))
      val train = testTrain(1).map(lp => lp.copy(features = pca.transform(lp.features)))
      train.take(20).foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L9-8PCA.scala


示例17: ChiSqApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object ChiSqApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: ChiSqApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")
      .map(f => f.map(f => f.toDouble))

    substream.map(f => Array(f(1).toDouble, f(2).toDouble, f(4).toDouble, f(5).toDouble, f(6).toDouble))
      .filter(f => f(0) == 4.0 || f(0) == 5.0)
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, 5))))
      .foreachRDD(rdd => {
        Statistics.chiSqTest(rdd).zipWithIndex.foreach(v => println("%s, column no. %d".format(v._1, v._2)))
      })

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:45,代码来源:L9-5ChiSq.scala


示例18: LinearRegressionApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.doubleRDDToDoubleRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object LinearRegressionApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: LinearRegressionApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val datastream = substream.map(f => Array(f(2).toDouble, f(3).toDouble, f(4).toDouble, f(5).toDouble, f(6).toDouble))
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, 5))))
    val test = datastream.transform(rdd => rdd.randomSplit(Array(0.3, 0.7))(0))
    val train = datastream.transformWith(test, (r1: RDD[LabeledPoint], r2: RDD[LabeledPoint]) => r1.subtract(r2)).cache()
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.zeros(4))
      .setStepSize(0.0001)
      .setNumIterations(1)

    model.trainOn(train)
    model.predictOnValues(test.map(v => (v.label, v.features))).foreachRDD(rdd => println("MSE: %f".format(rdd
      .map(v => math.pow((v._1 - v._2), 2)).mean())))

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:52,代码来源:L9-1LinearRegression.scala


示例19: CorrelationApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object CorrelationApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: CorrelationApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")
      .map(f => f.map(f => f.toDouble))

    val datastream = substream.map(f => Array(f(1).toDouble, f(2).toDouble, f(4).toDouble, f(5).toDouble, f(6).toDouble))

    val walkingOrRunning = datastream.filter(f => f(0) == 4.0 || f(0) == 5.0).map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, 5))))
    walkingOrRunning.map(f => f.features).foreachRDD(rdd => {
      val corrSpearman = Statistics.corr(rdd, "spearman")
      val corrPearson = Statistics.corr(rdd, "pearson")
      println("Correlation Spearman: \n" + corrSpearman)
      println("Correlation Pearson: \n" + corrPearson)
    })

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:48,代码来源:L9-4Correlation.scala


示例20: DecisionTreeTest

//设置package包名称以及导入依赖的类
package cn.edu.bjtu


import org.apache.spark.SparkConf
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.SparkSession

object DecisionTreeTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("DecisionTreeTest")
      .setMaster("spark://master:7077")
      .setJars(Array("/home/hadoop/DecisionTree.jar"))

    val spark = SparkSession.builder()
      .config(sparkConf)
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    // Load and parse the data file.
    val data = MLUtils.loadLibSVMFile(spark.sparkContext, "hdfs://master:9000/sample_formatted.txt")

    // Split the data into training and test sets (30% held out for testing)
    val splits = data.randomSplit(Array(0.7, 0.3))

    val (training, test) = (splits(0), splits(1))

    // Train a DecisionTree model.
    //  Empty categoricalFeaturesInfo indicates all features are continuous.
    val numClasses = 2
    val categoricalFeaturesInfo = Map[Int, Int]()
    val impurity = "entropy" // Also, we can use entrophy
    val maxDepth = 14
    val maxBins = 16384

    val model = DecisionTree.trainClassifier(training, numClasses, categoricalFeaturesInfo,
      impurity, maxDepth, maxBins)

    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
      val prediction = model.predict(features)
      (prediction, label)
    }
    val metrics = new BinaryClassificationMetrics(predictionAndLabels)
    val auROC = metrics.areaUnderROC()
    println("Area under ROC = " + auROC)
    println("Sensitivity = " + predictionAndLabels.filter(x => x._1 == x._2 && x._1 == 1.0).count().toDouble / predictionAndLabels.filter(x => x._2 == 1.0).count().toDouble)
    println("Specificity = " + predictionAndLabels.filter(x => x._1 == x._2 && x._1 == 0.0).count().toDouble / predictionAndLabels.filter(x => x._2 == 0.0).count().toDouble)
    println("Accuracy = " + predictionAndLabels.filter(x => x._1 == x._2).count().toDouble / predictionAndLabels.count().toDouble)
  }
} 
开发者ID:XiaoyuGuo,项目名称:DataFusionClass,代码行数:55,代码来源:DecisionTreeTest.scala



注:本文中的org.apache.spark.mllib.regression.LabeledPoint类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala StaticAnnotation类代码示例发布时间:2022-05-23
下一篇:
Scala DateTimeZone类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap