• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala RDD类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.rdd.RDD的典型用法代码示例。如果您正苦于以下问题:Scala RDD类的具体用法?Scala RDD怎么用?Scala RDD使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了RDD类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Histogram

//设置package包名称以及导入依赖的类
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object Histogram{
	def main(args:Array[String]){
		val conf:SparkConf = new SparkConf().setAppName("Histogram").setMaster("local")
		val sc:SparkContext = new SparkContext(conf)
		val dataset1:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data1")
		val dataset2:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data2");
		val subRDD:RDD[String] = dataset1.subtract(dataset2)
		val keyValueRDD:RDD[(String, String)] = subRDD.map(line => (line.split(",")(1), line.split(",")(0)))
		val hist = keyValueRDD.countByKey
		for((k,v) <- hist){
			println(k + "===>" + v)
		}
	}
} 
开发者ID:malli3131,项目名称:SparkApps,代码行数:19,代码来源:Histogram.scala


示例2: RddAggregateByKey

//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}

object RddAggregateByKey{
    def main(args:Array[String]){
	val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
	val sc = new SparkContext(conf)
	val stocks = sc.textFile("./stocks")
	val projdata = stocks.map(line => (line.split("\t")(1), line.split("\t")(7).toInt))
	val volMax = projdata.aggregateByKey(0)(math.max(_,_), math.max(_,_))
	val volMin = projdata.aggregateByKey(100000000)(math.min(_,_), math.min(_,_))
	val aggRdd = volMax ++ volMin
	aggRdd.saveAsTextFile("./voulme")
    }
} 
开发者ID:malli3131,项目名称:SparkApps,代码行数:16,代码来源:RddAggregateByKey.scala


示例3: StudyRDD

//设置package包名称以及导入依赖的类
package com.study.spark.datasource

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}


class StudyRDD(sqlContext: SQLContext, schema: StructType) extends RDD[Row](sqlContext.sparkContext, deps=Nil) {
  @DeveloperApi
  override def compute(split: Partition, context: TaskContext): Iterator[Row] = new StudyReader(context, schema, split)

  // ??? ?? ????? 2?? ???? ??? ????.
  // ? Executor? ???? ??? ????. ???? ???? 2? ??? ???, ??? ??? ? ?? Executor? ?? 2???.
  override protected def getPartitions: Array[Partition] = {
    val arr: Array[Partition] = new Array[Partition](2)
    arr.update(0, new Partition() {
      override def index: Int = 0
    })
    arr.update(1, new Partition() {
      override def index: Int = 1
    })
    arr
  }
} 
开发者ID:hackpupu,项目名称:LML,代码行数:27,代码来源:StudyRDD.scala


示例4: extractTriples

//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.forwardchaining

import scala.collection.mutable

import org.apache.spark.rdd.RDD

import net.sansa_stack.inference.data.RDFTriple
import net.sansa_stack.inference.spark.data.model.RDFGraph
import net.sansa_stack.inference.utils.Profiler


  def extractTriples(triples: RDD[RDFTriple],
                     subject: Option[String],
                     predicate: Option[String],
                     obj: Option[String]): RDD[RDFTriple] = {
    var extractedTriples = triples

    if(subject.isDefined) {
      extractedTriples = extractedTriples.filter(triple => triple.s == subject.get)
    }

    if(predicate.isDefined) {
      extractedTriples = extractedTriples.filter(triple => triple.p == predicate.get)
    }

    if(obj.isDefined) {
      extractedTriples = extractedTriples.filter(triple => triple.o == obj.get)
    }

    extractedTriples
  }
} 
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:33,代码来源:ForwardRuleReasoner.scala


示例5: KMeansClusteringApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.doubleRDDToDoubleRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object KMeansClusteringApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: KMeansClusteringApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val orientationStream = substream
      .map(f => Seq(1, 4, 5, 6, 10, 11, 12, 20, 21, 22, 26, 27, 28, 36, 37, 38, 42, 43, 44).map(i => f(i)).toArray)
      .map(arr => arr.map(_.toDouble))
      .filter(f => f(0) == 1.0 || f(0) == 2.0 || f(0) == 3.0)
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, f.length))))
    val test = orientationStream.transform(rdd => rdd.randomSplit(Array(0.3, 0.7))(0))
    val train = orientationStream.transformWith(test, (r1: RDD[LabeledPoint], r2: RDD[LabeledPoint]) => r1.subtract(r2)).cache()
    val model = new StreamingKMeans()
      .setK(3)
      .setDecayFactor(0)
      .setRandomCenters(18, 0.0)

    model.trainOn(train.map(v => v.features))
    val prediction = model.predictOnValues(test.map(v => (v.label, v.features)))

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:54,代码来源:L9-10KMeans.scala


示例6: MedianOfMediansCalculator

//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

class MedianOfMediansCalculator {

  def calculateMedianOfMediansForFile(hdfsFilePath: String, sc: SparkContext): Double =
    calculateMedianOfMedians(sortAndNumberMedians(calculateMediansPerLine(readFileOfIntegers(hdfsFilePath, sc))))

  def readFileOfIntegers(hdfsFilePath: String, sc: SparkContext): RDD[Array[Int]] = {
    sc.textFile(hdfsFilePath)
      .map(line => line.split("\\D+"))
      .map(lineParts => lineParts.map(number => number.toInt)
        .sorted)
  }

  def calculateMediansPerLine(integerArrayRdd: RDD[Array[Int]]): RDD[Double] = {
    integerArrayRdd.map { lineInts =>
      if (lineInts.length % 2 == 0)
        (lineInts(lineInts.length / 2) + lineInts((lineInts.length / 2) + 1)) / 2.0
      else
        lineInts((lineInts.length / 2) + 1)
    }
  }

  def sortAndNumberMedians(lineMedians: RDD[Double]): RDD[(Long, Double)] = {
    lineMedians
      .sortBy(identity)
      .zipWithIndex
      .keyBy { case (_, index) => index }
      .mapValues { case (value, _) => value }
  }

  def calculateMedianOfMedians(sortedAndNumberedMedians: RDD[(Long, Double)]): Double = {
    if (sortedAndNumberedMedians.count() % 2 == 0)
      sortedAndNumberedMedians.lookup((sortedAndNumberedMedians.count / 2) + 1).head + sortedAndNumberedMedians.lookup(sortedAndNumberedMedians.count / 2).head / 2.0
    else
      sortedAndNumberedMedians.lookup((sortedAndNumberedMedians.count / 2) + 1).head
  }
} 
开发者ID:qayshp,项目名称:medianOfMedians,代码行数:40,代码来源:MedianOfMediansCalculator.scala


示例7: PrecipSource

//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}


case class PrecipSource(sourceId: Int,
                        name: String,
                        countryCode: String,
                        latitude: String,
                        longitude: String,
                        elevation: Int,
                        elementId: String,
                        beginDate: String,
                        endDate: String,
                        participantId: Int,
                        participantName: String
                       )

case class Precipication(stationId: Int,
                         sourceId: Int,
                         date: String,
                         amount: Int,
                         quality: Int
                        )

class Mappers() {

  
  def precipicationDF(spark: SparkSession, sourceFilPath: String): Dataset[Precipication] = {
    import spark.implicits._

    var sourceFile: RDD[String] = spark.sparkContext.textFile(sourceFilPath)

    val header = spark.sparkContext.parallelize(sourceFile.take(20))
    sourceFile = sourceFile.subtract(header)
    header.unpersist()

    var precipitionDF: Dataset[Precipication] = sourceFile
      .map(s => s.split(",")
        .map(_.trim()))
      .map(fields => Precipication(
        stationId = fields(0).toInt,
        sourceId = fields(1).toInt,
        date = fields(2),
        amount = fields(3).toInt,
        quality = fields(4).toInt
      ))
      .toDS()

    precipitionDF.show(false)
    precipitionDF
  }

} 
开发者ID:luxinator,项目名称:RainyDay,代码行数:54,代码来源:Mappers.scala


示例8: ReadsRDD

//设置package包名称以及导入依赖的类
package org.hammerlab.guacamole.readsets.rdd

import java.io.File

import org.apache.spark.rdd.RDD
import org.hammerlab.guacamole.reads.{MappedRead, PairedRead, Read}
import org.hammerlab.guacamole.readsets.io.Input


case class ReadsRDD(reads: RDD[Read], input: Input) {

  val basename = new File(input.path).getName
  val shortName = basename.substring(0, math.min(100, basename.length))

  lazy val mappedReads =
    reads.flatMap({
      case r: MappedRead                   => Some(r)
      case PairedRead(r: MappedRead, _, _) => Some(r)
      case _                               => None
    }).setName(s"Mapped reads: $shortName")

  lazy val mappedPairedReads: RDD[PairedRead[MappedRead]] =
    reads.flatMap({
      case rp: PairedRead[_] if rp.isMapped => Some(rp.asInstanceOf[PairedRead[MappedRead]])
      case _                                => None
    }).setName(s"Mapped reads: $shortName")
} 
开发者ID:ryan-williams,项目名称:scala-pkg-deps,代码行数:28,代码来源:ReadsRDD.scala


示例9: Checkpoint

//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}

object Checkpoint{
    def main(args:Array[String]){
	val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
	val sc = new SparkContext(conf)
	sc.setCheckpointDir("./projdata")
	val stocks = sc.textFile("./stocks")
	val projdata = stocks.map(record => (record.split("\t")(1), record.split("\t")(7).toInt))
	projdata.checkpoint()
	println(projdata.count())
    }
} 
开发者ID:malli3131,项目名称:SparkApps,代码行数:15,代码来源:CheckPoint.scala


示例10: GroupWith

//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}

object GroupWith{
    def main(args:Array[String]){
	val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
	val sc = new SparkContext(conf)
	val citi = sc.textFile("./citi")
	val hdfc = sc.textFile("./hdfc")
	val sbi = sc.textFile("./sbi")
	val citiPairRDD = citi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt)) 
	val hdfcPairRDD = hdfc.map(row => (row.split("\t")(0), row.split("\t")(1).toInt)) 
	val sbiPairRDD = sbi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt)) 
	val groupRDD = citiPairRDD.groupWith(hdfcPairRDD, sbiPairRDD)
	groupRDD.collect.foreach{println}
    }
} 
开发者ID:malli3131,项目名称:SparkApps,代码行数:18,代码来源:GroupWith.scala


示例11: Item

//设置package包名称以及导入依赖的类
package com.github.vladminzatu.surfer.persist

import com.github.vladminzatu.surfer.Score
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{HttpClientBuilder}
import org.apache.spark.rdd.RDD
import org.json4s.jackson.Serialization.write

case class Item(item:String, score:Double)

class RestPersister extends Persister {

  val url = "http://localhost:8080/items"

  override def persist(scores: RDD[(String, Score)]): Unit = {
    implicit val formats = org.json4s.DefaultFormats
    val payload = write(scores.collect().sortWith((a,b) => a._2.value > b._2.value).map(x => Item(x._1, x._2.value)))
    val client = HttpClientBuilder.create().build();
    client.execute(postRequest(payload))
  }

  private def postRequest(payload: String): HttpPost = {
    val post = new HttpPost(url)
    post.setEntity(new StringEntity(payload))
    post
  }
} 
开发者ID:VladMinzatu,项目名称:surfer,代码行数:29,代码来源:RestPersister.scala


示例12: MllibLBFGS

//设置package包名称以及导入依赖的类
package optimizers

import breeze.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import utils.Functions._


class MllibLBFGS(val data: RDD[LabeledPoint],
                 loss: LossFunction,
                 regularizer: Regularizer,
                 params: LBFGSParameters
                ) extends Optimizer(loss, regularizer) {

  val opt = new LogisticRegressionWithLBFGS

  val reg: Updater = (regularizer: Regularizer) match {
    case _: L1Regularizer => new L1Updater
    case _: L2Regularizer => new SquaredL2Updater
    case _: Unregularized => new SimpleUpdater
  }

  opt.optimizer.
    setNumIterations(params.iterations).
    setConvergenceTol(params.convergenceTol).
    setNumCorrections(params.numCorrections).
    setRegParam(regularizer.lambda).
    setUpdater(reg)

  override def optimize(): Vector[Double] = {
    val model = opt.run(data)
    val w = model.weights.toArray
    return DenseVector(w)
  }
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:38,代码来源:MllibLBFGS.scala


示例13: SamplePCA

//设置package包名称以及导入依赖的类
package org.broadinstitute.hail.methods

import org.apache.spark.mllib.linalg.{Matrix, DenseMatrix}
import org.apache.spark.rdd.RDD
import org.broadinstitute.hail.variant.Variant
import org.broadinstitute.hail.variant.VariantDataset

class SamplePCA(k: Int, computeLoadings: Boolean, computeEigenvalues: Boolean) {
  def name = "SamplePCA"

  def apply(vds: VariantDataset): (Matrix, Option[RDD[(Variant, Array[Double])]], Option[Array[Double]])  = {

    val (variants, mat) = ToStandardizedIndexedRowMatrix(vds)
    val sc = vds.sparkContext
    val variantsBc = sc.broadcast(variants)

    val svd = mat.computeSVD(k, computeU = computeLoadings)

    val scores =
      svd.V.multiply(DenseMatrix.diag(svd.s))

    val loadings =
      if (computeLoadings)
        Some(svd.U.rows.map(ir =>
          (variantsBc.value(ir.index.toInt), ir.vector.toArray)))
      else
        None

    val eigenvalues =
      if (computeEigenvalues)
        Some(svd.s.toArray.map(x => x * x))
      else
        None

    (scores, loadings, eigenvalues)
  }
} 
开发者ID:Sun-shan,项目名称:Hail_V2,代码行数:38,代码来源:SamplePCA.scala


示例14: of

//设置package包名称以及导入依赖的类
package fregata.spark.metrics.classification

import org.apache.spark.rdd.RDD
import fregata.Num


  def of(rs2:RDD[(Num, Num)]) = {
    val rs = rs2.sortByKey(false)
    val total = rs2.count()
    val (m,sum) = rs.zipWithIndex().map{
      case ((predict,label),rank) =>
        if( label == 1 ) {
          predict -> ( total - rank , 1L , 1 , 0)
        }else{
          predict -> ( total - rank , 1L , 0 , 1)
        }
    }.reduceByKey{
      case ((r1,c1,p1,f1),(r2,c2,p2,f2)) =>
        (r1+r2 ,c1+c2,p1+p2,f1+f2)
    }.map{
      case (score,(rank,count,positive,navigate)) =>
        val avg = rank.toDouble / count
        (positive,avg * positive)
    }.filter( _._1 > 0 ).treeReduce{
      case ((p1,r1),(p2,r2)) => (p1+p2,r1+r2)
    }
    val M = m.toDouble
    if( M == 0 || M == total ) 0.5
    else{
      val N = total - M
      val diff = sum - ( M * ( M + 1 ) / 2 )
      diff / (M * N)
    }
  }
} 
开发者ID:xiaokekehaha,项目名称:fregata,代码行数:36,代码来源:AreaUnderRoc.scala


示例15: PDF

//设置package包名称以及导入依赖的类
package org.hammerlab.coverage.two_sample

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.hammerlab.coverage
import org.hammerlab.coverage.histogram.JointHistogram.Depth
import spire.algebra.Monoid

case class PDF[C: Monoid](rdd: RDD[((Depth, Depth), C)],
                          filtersBroadcast: Broadcast[(Set[Depth], Set[Depth])],
                          maxDepth1: Depth,
                          maxDepth2: Depth)
  extends coverage.PDF[C]
    with CanDownSampleRDD[C]

case class CDF[C: Monoid](rdd: RDD[((Depth, Depth), C)],
                          filtersBroadcast: Broadcast[(Set[Depth], Set[Depth])])
  extends coverage.CDF[C]
    with CanDownSampleRDD[C] 
开发者ID:hammerlab,项目名称:coverage-depth,代码行数:20,代码来源:PDF.scala


示例16: WriteRDD

//设置package包名称以及导入依赖的类
package org.hammerlab.coverage.utils

import grizzled.slf4j.Logging
import org.apache.spark.rdd.RDD
import org.hammerlab.csv._
import org.hammerlab.paths.Path

import scala.reflect.runtime.universe.TypeTag

object WriteRDD
  extends Logging {
  def apply[T <: Product : TypeTag](path: Path, rdd: RDD[T], force: Boolean): Unit = {
    val csvLines = rdd.mapPartitions(_.toCSV(includeHeaderLine = false))
    (path.exists, force) match {
      case (true, true) ?
        logger.info(s"Removing $path")
        path.delete(recursive = true)
        csvLines.saveAsTextFile(path.toString)
      case (true, false) ?
        logger.info(s"Skipping $path, already exists")
      case _ ?
        csvLines.saveAsTextFile(path.toString)
    }
  }
} 
开发者ID:hammerlab,项目名称:coverage-depth,代码行数:26,代码来源:WriteRDD.scala


示例17: MllibSGD

//设置package包名称以及导入依赖的类
package optimizers

import breeze.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
import org.apache.spark.rdd.RDD
import utils.Functions._

import scala.tools.cmd.gen.AnyVals.D




class MllibSGD(val data: RDD[LabeledPoint],
               loss: LossFunction,
               regularizer: Regularizer,
               params: SGDParameters,
               ctype: String
              ) extends Optimizer(loss, regularizer) {
  val opt = ctype match {
    case "SVM" => new SVMWithSGD()
    case "LR" => new LogisticRegressionWithSGD()
    case "Regression" => new LinearRegressionWithSGD()
  }

  val reg: Updater = (regularizer: Regularizer) match {
    case _: L1Regularizer => new L1Updater
    case _: L2Regularizer => new SquaredL2Updater
    case _: Unregularized => new SimpleUpdater
  }

  ctype match {
    case "SVM" => opt.asInstanceOf[SVMWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
    case "LR" => opt.asInstanceOf[LogisticRegressionWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
    case "Regression" => opt.asInstanceOf[LinearRegressionWithSGD].optimizer.
      setNumIterations(params.iterations).
      setMiniBatchFraction(params.miniBatchFraction).
      setStepSize(params.stepSize).
      setRegParam(regularizer.lambda).
      setUpdater(reg)
  }

  override def optimize(): Vector[Double] = {
    val model = opt.run(data)
    val w = model.weights.toArray
    DenseVector(w)
  }
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:60,代码来源:MllibSGD.scala


示例18: ProxCocoaParameters

//设置package包名称以及导入依赖的类
package optimizers

import java.io.Serializable

import breeze.linalg.SparseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD


class ProxCocoaParameters(var n: Int,
                          var iterations: Int,
                          var localIterFrac: Double,
                          var lambda: Double,
                          var eta: Double,
                          var numFeature: Int,
                          var numParts: Int,
                          var alphaInit: SparseVector[Double])  extends Serializable  {
  def this(train: RDD[LabeledPoint], test: RDD[LabeledPoint], eta: Double = 0.5) {
    this(train.count().toInt,
      100,
      0.9,
      0.1,
      eta,
      train.first().features.size,
      train.partitions.size,
      SparseVector.zeros[Double](train.first().features.size))
  }
  def getLocalIters =  Math.max(1, (localIterFrac * numFeature / numParts).toInt)

  def getL1DistOptPar(): l1distopt.utils.Params = {
    return l1distopt.utils.Params(alphaInit, n, iterations, getLocalIters, lambda, eta)
  }

  override def toString = s"ProxCocoaParameters(n: $n, iterations: $iterations, " +
    s"localIters: $getLocalIters, lambda: $lambda, eta: $eta, alphaInit: $alphaInit)"
} 
开发者ID:mlbench,项目名称:mlbench,代码行数:37,代码来源:ProxCocoaParameters.scala


示例19: SampleTokenize

//设置package包名称以及导入依赖的类
package com.highperformancespark.examples.tokenize

import org.apache.spark.rdd.RDD

object SampleTokenize {
  //tag::DIFFICULT[]
  def difficultTokenizeRDD(input: RDD[String]) = {
    input.flatMap(_.split(" "))
  }
  //end::DIFFICULT[]

  //tag::EASY[]
  def tokenizeRDD(input: RDD[String]) = {
    input.flatMap(tokenize)
  }

  protected[tokenize] def tokenize(input: String) = {
    input.split(" ")
  }
  //end::EASY[]
} 
开发者ID:gourimahapatra,项目名称:high-performance-spark,代码行数:22,代码来源:SampleTokenize.scala


示例20: run

//设置package包名称以及导入依赖的类
package fregata.spark.model.classification

import fregata._
import fregata.spark.model.{ SparkTrainer}
import fregata.model.classification.{LogisticRegression, CLR => LCLR, CLRModel => LCLRModel}
import org.apache.spark.rdd.RDD


  def run(data:RDD[(Array[Vector],Num)],
          combines:Array[Array[Int]],
          localEpochNum:Int = 1 ,
          epochNum:Int = 1) = {
    val trainer = new LogisticRegression
    val lengths = combines.map{
      comb => comb.map(i=>data.first._1(i).length).reduce( _ * _ )
    }
    val length = lengths.sum
    val data2 = data.map{
      case (x,label) =>
        LCLR.compactVector(x,combines,lengths,length) -> label
    }
    new SparkTrainer(trainer)
      .run(data2,epochNum,localEpochNum)
    new CLRModel(new LCLRModel(trainer.ps.get(0),combines))
  }
} 
开发者ID:TalkingData,项目名称:Fregata,代码行数:27,代码来源:CLR.scala



注:本文中的org.apache.spark.rdd.RDD类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ask类代码示例发布时间:2022-05-23
下一篇:
Scala DateTime类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap