• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Pipeline类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.ml.Pipeline的典型用法代码示例。如果您正苦于以下问题:Scala Pipeline类的具体用法?Scala Pipeline怎么用?Scala Pipeline使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Pipeline类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: LRCV

//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater

import org.apache.spark.SparkContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{ StringIndexerModel, VectorAssembler }
import org.apache.spark.ml.tuning.{ CrossValidator, CrossValidatorModel, ParamGridBuilder }
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.sql.{ DataFrame, Row, SQLContext }

class LRCV(sc: SparkContext) {

  implicit val sqlContext = new SQLContext(sc)

  val lr = new LogisticRegression().setMaxIter(10).setFeaturesCol("scaledFeatures")

  val paramGrid = new ParamGridBuilder()
    .addGrid(lr.regParam, Array(0.1, 0.01))
    .build()

  val assembler = new VectorAssembler()
    .setInputCols(Array("gender", "age", "weight", "height", "indexedJob"))
    .setOutputCol("features")

  val pipeline = new Pipeline()
    .setStages(Array(assembler, standardScaler("features"), lr))

  val cv = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(new BinaryClassificationEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(10)

  def train(df: DataFrame): (StringIndexerModel, CrossValidatorModel, Matrix) = {

    // need to index strings on all data to not missing the job fields.
    // other alternative can be manually assign values for each job like gender.
    val indexerModel = stringIndexer("job").fit(df)
    val indexed = indexerModel.transform(df)

    val splits = indexed.randomSplit(Array(0.8, 0.2))
    val training = splits(0).cache()
    val test = splits(1)

    val cvModel = cv.fit(training)

    val predictionAndLabels = cvModel
      .transform(test)
      .select("label", "prediction").map {
        case Row(label: Double, prediction: Double) ?
          (prediction, label)
      }

    printBinaryMetrics(predictionAndLabels)

    (indexerModel, cvModel, confusionMatrix(predictionAndLabels))

  }

} 
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:62,代码来源:LRCV.scala


示例2: movies

//设置package包名称以及导入依赖的类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.SparkSession

object movies {

  case class Sentence(sentence: String,label: Double)

  def main(args:Array[String]) {

    val spark = SparkSession
      .builder
      .appName("Movies Reviews")
      .config("spark.master", "local")
      .getOrCreate()


    // Prepare training documents from a list of (id, text, label) tuples.
    val neg = spark.sparkContext.textFile("file:///data/train/neg/").repartition(4)
      .map(w => Sentence(w, 0.0))

    val pos = spark.sparkContext.textFile("file:///data/train/pos/").repartition(4)
      .map(w => Sentence(w, 1.0))

    val test = spark.sparkContext.wholeTextFiles("file:///data/test/").repartition(4)
      .map({case(file,sentence) => (file.split("/").last.split("\\.")(0),sentence)})


    val training=neg.union(pos)
    val trainingDF=spark.createDataFrame(training)
    val testDF=spark.createDataFrame(test)

    // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and Naive Bayes
    val tokenizer = new Tokenizer()
      .setInputCol("sentence")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val nb = new NaiveBayes()

    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, nb))

    // Fit the pipeline to training documents.
    val model = pipeline.fit(trainingDF)

    // Make predictions on test documents.
    model.transform(testDF).repartition(1)
      .select("file", "prediction")
      .write.format("csv")
      .option("header","true")
      .option("delimiter","\t")
      .save("/tmp/spark-prediction")
    spark.stop()
      }
  } 
开发者ID:evaliotiri,项目名称:NaiveBayes,代码行数:59,代码来源:naiveBayes.scala


示例3: LinearRegressionPipeline

//设置package包名称以及导入依赖的类
package org.sparksamples.regression.bikesharing

import org.apache.log4j.Logger
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, VectorIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}


object LinearRegressionPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def linearRegressionWithVectorFormat(vectorAssembler: VectorAssembler, vectorIndexer: VectorIndexer, dataFrame: DataFrame) = {
    val lr = new LinearRegression()
      .setFeaturesCol("features")
      .setLabelCol("label")
      .setRegParam(0.1)
      .setElasticNetParam(1.0)
      .setMaxIter(10)

    val pipeline = new Pipeline().setStages(Array(vectorAssembler, vectorIndexer, lr))

    val Array(training, test) = dataFrame.randomSplit(Array(0.8, 0.2), seed = 12345)

    val model = pipeline.fit(training)

    val fullPredictions = model.transform(test).cache()
    val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
    val labels = fullPredictions.select("label").rdd.map(_.getDouble(0))
    val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
    println(s"  Root mean squared error (RMSE): $RMSE")
  }

  def linearRegressionWithSVMFormat(spark: SparkSession) = {
    // Load training data
    val training = spark.read.format("libsvm")
      .load("./src/main/scala/org/sparksamples/regression/dataset/BikeSharing/lsvmHours.txt")

    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(training)

    // Print the coefficients and intercept for linear regression
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Summarize the model over the training set and print out some metrics
    val trainingSummary = lrModel.summary
    println(s"numIterations: ${trainingSummary.totalIterations}")
    println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")
    trainingSummary.residuals.show()
    println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

    println(s"r2: ${trainingSummary.r2}")
  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:61,代码来源:LinearRegressionPipeline.scala


示例4: preprocess

//设置package包名称以及导入依赖的类
package functions

import config.paramconf.PreprocessParams
import functions.clean.Cleaner
import functions.segment.Segmenter
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{CountVectorizer, IDF, StopWordsRemover, StringIndexer}
import org.apache.spark.sql.DataFrame


  def preprocess(data: DataFrame): Pipeline = {
    val spark = data.sparkSession
    val params = new PreprocessParams

    val indexModel = new StringIndexer()
      .setHandleInvalid(params.handleInvalid)
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)

    val cleaner = new Cleaner()
      .setFanJian(params.fanjian)
      .setQuanBan(params.quanban)
      .setMinLineLen(params.minLineLen)
      .setInputCol("content")
      .setOutputCol("cleand")

    val segmenter = new Segmenter()
      .isAddNature(params.addNature)
      .isDelEn(params.delEn)
      .isDelNum(params.delNum)
      .isNatureFilter(params.natureFilter)
      .setMinTermLen(params.minTermLen)
      .setMinTermNum(params.minTermNum)
      .setSegType(params.segmentType)
      .setInputCol(cleaner.getOutputCol)
      .setOutputCol("segmented")

    val stopwords = spark.sparkContext.textFile(params.stopwordFilePath).collect()
    val remover = new StopWordsRemover()
      .setStopWords(stopwords)
      .setInputCol(segmenter.getOutputCol)
      .setOutputCol("removed")

    val vectorizer = new CountVectorizer()
      .setMinTF(params.minTF)
      .setVocabSize(params.vocabSize)
      .setInputCol(remover.getOutputCol)
      .setOutputCol("vectorized")

    val idf = new IDF()
      .setMinDocFreq(params.minDocFreq)
      .setInputCol(vectorizer.getOutputCol)
      .setOutputCol("features")

    val stages = Array(cleaner, indexModel, segmenter, remover, vectorizer, idf)
    new Pipeline().setStages(stages)
  }
} 
开发者ID:yhao2014,项目名称:CkoocNLP,代码行数:60,代码来源:Preprocessor.scala


示例5: DecisionTreePipeline

//设置package包名称以及导入依赖的类
package org.stumbleuponclassifier

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object DecisionTreePipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def decisionTreePipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val dt = new DecisionTreeClassifier()
      .setFeaturesCol(vectorAssembler.getOutputCol)
      .setLabelCol("indexedLabel")
      .setMaxDepth(5)
      .setMaxBins(32)
      .setMinInstancesPerNode(1)
      .setMinInfoGain(0.0)
      .setCacheNodeIds(false)
      .setCheckpointInterval(10)

    stages += vectorAssembler
    stages += dt
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)
  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:60,代码来源:DecisionTreePipeline.scala


示例6: MLClassification

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.SparkSession


object MLClassification extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(): Map[String, Any] = {
    val training = session.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")

    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.01)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))

    val model = pipeline.fit(training)

    model.write.overwrite().save("regression")

    Map.empty[String, Any]
  }

  def serve(text: List[String]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(s"regression")
    val data = LocalData(
      LocalDataColumn("text", text)
    )
    val result: LocalData = pipeline.transform(data)
    Map("result" -> result.select("text", "prediction").toMapList)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:55,代码来源:MLClassification.scala


示例7: Train

//设置package包名称以及导入依赖的类
import io.hydrosphere.spark_ml_serving._

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.SparkSession

object Train extends App {

  val session = SparkSession.builder()
    .master("local")
    .appName("Word Count")
    .getOrCreate()

  val df = session.createDataFrame(Seq(
            (0, Array("a", "b", "c")),
            (1, Array("a", "b", "b", "c", "a"))
         )).toDF("id", "words")

   val cv = new CountVectorizer()
     .setInputCol("words")
     .setOutputCol("features")
     .setVocabSize(3)
     .setMinDF(2)

   val pipeline = new Pipeline().setStages(Array(cv))

   val model = pipeline.fit(df)
   model.write.overwrite().save("./target/countVectorizer")
}

object Serve extends App {

  import LocalPipelineModel._

  val model = PipelineLoader.load("./target/countVectorizer")

  val data = LocalData(List(LocalDataColumn("words", List(
    List("a", "b", "d"),
    List("a", "b", "b", "b")

  ))))
  val result = model.transform(data)

  println(result)
} 
开发者ID:Hydrospheredata,项目名称:spark-ml-serving,代码行数:47,代码来源:Main.scala


示例8: NaiveBayesPipeline

//设置package包名称以及导入依赖的类
package org.stumbleuponclassifier

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object NaiveBayesPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def naiveBayesPipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val nb = new NaiveBayes()

    stages += vectorAssembler
    stages += nb
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)
  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:52,代码来源:NaiveBayesPipeline.scala


示例9: RandomForestPipeline

//设置package包名称以及导入依赖的类
package org.stumbleuponclassifier

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object RandomForestPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def randomForestPipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val rf = new RandomForestClassifier()
      .setFeaturesCol(vectorAssembler.getOutputCol)
      .setLabelCol("indexedLabel")
      .setNumTrees(20)
      .setMaxDepth(5)
      .setMaxBins(32)
      .setMinInstancesPerNode(1)
      .setMinInfoGain(0.0)
      .setCacheNodeIds(false)
      .setCheckpointInterval(10)

    stages += vectorAssembler
    stages += rf
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)

  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:62,代码来源:RandomForestPipeline.scala


示例10: DecisionTreePipeline

//设置package包名称以及导入依赖的类
package org.sparksamples.classification.stumbleupon

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object DecisionTreePipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def decisionTreePipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val dt = new DecisionTreeClassifier()
      .setFeaturesCol(vectorAssembler.getOutputCol)
      .setLabelCol("indexedLabel")
      .setMaxDepth(5)
      .setMaxBins(32)
      .setMinInstancesPerNode(1)
      .setMinInfoGain(0.0)
      .setCacheNodeIds(false)
      .setCheckpointInterval(10)

    stages += vectorAssembler
    stages += dt
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)
  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:60,代码来源:DecisionTreePipeline.scala


示例11: NaiveBayesPipeline

//设置package包名称以及导入依赖的类
package org.sparksamples.classification.stumbleupon

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object NaiveBayesPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def naiveBayesPipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val nb = new NaiveBayes()

    stages += vectorAssembler
    stages += nb
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)
  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:52,代码来源:NaiveBayesPipeline.scala


示例12: RandomForestPipeline

//设置package包名称以及导入依赖的类
package org.sparksamples.classification.stumbleupon

import org.apache.log4j.Logger
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.DataFrame

import scala.collection.mutable


object RandomForestPipeline {
  @transient lazy val logger = Logger.getLogger(getClass.getName)

  def randomForestPipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = {
    val Array(training, test) = dataFrame.randomSplit(Array(0.9, 0.1), seed = 12345)

    // Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()

    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
    stages += labelIndexer

    val rf = new RandomForestClassifier()
      .setFeaturesCol(vectorAssembler.getOutputCol)
      .setLabelCol("indexedLabel")
      .setNumTrees(20)
      .setMaxDepth(5)
      .setMaxBins(32)
      .setMinInstancesPerNode(1)
      .setMinInfoGain(0.0)
      .setCacheNodeIds(false)
      .setCheckpointInterval(10)

    stages += vectorAssembler
    stages += rf
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline
    val startTime = System.nanoTime()
    //val model = pipeline.fit(training)
    val model = pipeline.fit(dataFrame)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    //val holdout = model.transform(test).select("prediction","label")
    val holdout = model.transform(dataFrame).select("prediction","label")

    // Select (prediction, true label) and compute test error
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val mAccuracy = evaluator.evaluate(holdout)
    println("Test set accuracy = " + mAccuracy)

  }
} 
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:62,代码来源:RandomForestPipeline.scala


示例13: NaiveBayesJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.{Vector => LVector}
import org.apache.spark.sql.SparkSession


object NaiveBayesJob extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {
    val df = session.createDataFrame(Seq(
      (Vectors.dense(4.0, 0.2, 3.0, 4.0, 5.0), 1.0),
      (Vectors.dense(3.0, 0.3, 1.0, 4.1, 5.0), 1.0),
      (Vectors.dense(2.0, 0.5, 3.2, 4.0, 5.0), 1.0),
      (Vectors.dense(5.0, 0.7, 1.5, 4.0, 5.0), 1.0),
      (Vectors.dense(1.0, 0.1, 7.0, 4.0, 5.0), 0.0),
      (Vectors.dense(8.0, 0.3, 5.0, 1.0, 7.0), 0.0)
    )).toDF("features", "label")

    val nb = new NaiveBayes()

    val pipeline = new Pipeline().setStages(Array(nb))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[List[Double]]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(LocalDataColumn("features", features))

    val result = pipeline.transform(data)

    val response = result.select("probability", "rawPrediction", "prediction").toMapList.map(rowMap => {
      val mapped = rowMap("probability").asInstanceOf[LVector].toArray
      val one = rowMap + ("probability" -> mapped)

      val mapped2 = one("rawPrediction").asInstanceOf[LVector].toArray
      one + ("rawPrediction" -> mapped2)
    })
    Map("result" -> response)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:55,代码来源:NaiveBayesJob.scala


示例14: StringIndexerJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.SparkSession

object StringIndexerJob extends MLMistJob{
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()


  def train(savePath: String): Map[String, Any] = {
    val df = session.createDataFrame(
      Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
    ).toDF("id", "category")

    val indexer = new StringIndexer()
      .setInputCol("category")
      .setOutputCol("categoryIndex")

    val pipeline = new Pipeline().setStages(Array(indexer))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[String]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(
      LocalDataColumn("category", features)
    )

    val result: LocalData = pipeline.transform(data)
    Map("result" -> result.select("category", "categoryIndex").toMapList)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:44,代码来源:StringIndexerJob.scala


示例15: BinarizerJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.Binarizer

import org.apache.spark.sql.SparkSession

object BinarizerJob extends MLMistJob {

  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {
    val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
    val dataFrame = session.createDataFrame(data).toDF("id", "feature")

    val binarizer: Binarizer = new Binarizer()
      .setInputCol("feature")
      .setOutputCol("binarized_feature")
      .setThreshold(5.0)

    val pipeline = new Pipeline().setStages(Array(binarizer))

    val model = pipeline.fit(dataFrame)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[Double]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(LocalDataColumn("feature", features))

    val result: LocalData = pipeline.transform(data)
    Map("result" -> result.select("feature", "binarized_feature").toMapList)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:43,代码来源:BinarizerJob.scala


示例16: StopWordsRemoverJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.SparkSession


object StopWordsRemoverJob extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {
    val df = session.createDataFrame(Seq(
      (0, Seq("I", "saw", "the", "red", "balloon")),
      (1, Seq("Mary", "had", "a", "little", "lamb"))
    )).toDF("id", "raw")

    val remover = new StopWordsRemover()
      .setInputCol("raw")
      .setOutputCol("filtered")

    val pipeline = new Pipeline().setStages(Array(remover))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[List[String]]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(
      LocalDataColumn("raw", features)
    )

    val result: LocalData = pipeline.transform(data)
    Map("result" -> result.select("raw", "filtered").toMapList)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:45,代码来源:StopWordsRemoverJob.scala


示例17: TFIDFJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.mllib.linalg.{Vector => LVector}
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession


object TFIDFJob extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {

    val df = session.createDataFrame(Seq(
      (0, "Provectus rocks!"),
      (0, "Machine learning for masses!"),
      (1, "BigData is a hot topick right now")
    )).toDF("label", "sentence")

    val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

    val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, sentences: List[String]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(LocalDataColumn("sentence", sentences))

    val result = pipeline.transform(data)
    val response = result.select("sentence", "features").toMapList.map(rowMap => {
      val conv = rowMap("features").asInstanceOf[LVector].toArray
      rowMap + ("features" -> conv)
    })
    Map("result" -> response)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:50,代码来源:TFIDFJob.scala


示例18: StandardScalerJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.linalg.{Vector => LVector}
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession

object StandardScalerJob extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {
    val data = Array(
      Vectors.dense(0.0, 10.3, 1.0, 4.0, 5.0),
      Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
      Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
    )
    val df = session.createDataFrame(data.map(Tuple1.apply)).toDF("features")

    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithStd(true)
      .setWithMean(false)

    val pipeline = new Pipeline().setStages(Array(scaler))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[Array[Double]]): Map[String, Any] = {
    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(
      LocalDataColumn("features", features)
    )

    val result: LocalData = pipeline.transform(data)
    val response = result.select("features", "scaledFeatures").toMapList.map(rowMap => {
      val mapped = rowMap("scaledFeatures").asInstanceOf[LVector].toArray
      rowMap + ("scaledFeatures" -> mapped)
    })
    Map("result" -> response)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:54,代码来源:StandardScalerJob.scala


示例19: NgramJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.NGram
import org.apache.spark.sql.SparkSession


object NgramJob extends MLMistJob{
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(savePath: String): Map[String, Any] = {
    val df = session.createDataFrame(Seq(
      (0, Array("Provectus", "is", "such", "a", "cool", "company")),
      (1, Array("Big", "data", "rules", "the", "world")),
      (2, Array("Cloud", "solutions", "are", "our", "future"))
    )).toDF("id", "words")

    val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

    val pipeline = new Pipeline().setStages(Array(ngram))

    val model = pipeline.fit(df)

    model.write.overwrite().save(savePath)
    Map.empty[String, Any]
  }

  def serve(modelPath: String, features: List[String]): Map[String, Any] = {

    import LocalPipelineModel._

    val pipeline = PipelineLoader.load(modelPath)
    val data = LocalData(
      LocalDataColumn("words", List(features))
    )

    val result: LocalData = pipeline.transform(data)
    Map("result" -> result.select("words", "ngrams").toMapList)
  }
} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:45,代码来源:NgramJob.scala


示例20: DTreeRegressionJob

//设置package包名称以及导入依赖的类
import io.hydrosphere.mist.api._
import io.hydrosphere.mist.api.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.sql.SparkSession

object DTreeRegressionJob extends MLMistJob {
  def session: SparkSession = SparkSession
    .builder()
    .appName(context.appName)
    .config(context.getConf)
    .getOrCreate()

  def train(datasetPath: String, savePath: String): Map[String, Any] = {
    val dataset = session.read.format(&qu 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Matchers类代码示例发布时间:2022-05-23
下一篇:
Scala MetricRegistry类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap