本文整理汇总了Scala中org.apache.spark.HashPartitioner类的典型用法代码示例。如果您正苦于以下问题:Scala HashPartitioner类的具体用法?Scala HashPartitioner怎么用?Scala HashPartitioner使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HashPartitioner类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: RelationWithItemToItem
//设置package包名称以及导入依赖的类
package com.bigchange.mllib
import breeze.numerics.{sqrt, pow}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object RelationWithItemToItem {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf()
.setAppName("Item to Item")
.setMaster("local"))
// announce the top number of items to get
val topK = 2
val userItem = sc.textFile("/rating.dat")
.map(_.split("\t")).map(x =>(x(0),x(1),x(2))).distinct().cache()
// cal item -> (user,rating) and item -> sqrt(ratings)
val itemUser = userItem.map(x => (x._2,(x._1,x._3.toDouble))).partitionBy(new HashPartitioner(20))
// sqrt : ??? rating ??
val itemPowSqrt = userItem.map(x => (x._2,pow(x._3.toDouble,2.0))).reduceByKey(_+_).mapValues(x => sqrt(x))
// cal item -> ((user,rating),sqrt(ratings)) => user -> (item,rating/sqrt(ratings))
val userItemSqrt = itemUser.join(itemPowSqrt).map(x =>{
val item = x._1
val sqrtRatings = x._2._2
val user = x._2._1._1
val rating = x._2._1._2
(user,(item,rating / sqrtRatings))
})
// cal the relation of item to item in user dimension => get the score of item to item which connection the relation of items
val itemToItem = userItemSqrt.join(userItemSqrt).map(x =>{
val item1 = x._2._1._1
val rating1 = x._2._1._2
val item2 = x._2._2._1
val rating2 = x._2._2._2
val score = rating1 * rating2
if(item1 == item2){
((item1,item2),-1.0)
}else{
((item1,item2),score)
}
})
itemToItem.reduceByKey(_+_).map(x => (x._1._1,(x._1._2,x._2))).groupByKey().foreach(x => {
val sourceItem = x._1
val topItem = x._2.toList.filter(_._2 > 0).sortWith(_._2 > _._2).take(topK)
println(s"item = $sourceItem,topK relative item list:$topItem")
})
sc.stop()
}
}
开发者ID:bigchange,项目名称:AI,代码行数:55,代码来源:RelationWithItemToItem.scala
示例2: DataProcApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JNothing
import org.json4s.jvalue2extractable
import org.json4s.jvalue2monadic
import org.json4s.native.JsonMethods.parse
import org.json4s.string2JsonInput
object DataProcApp {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println(
"Usage: DataProcApp <appname> <batchInterval> <hostname> <port>")
System.exit(1)
}
val Seq(appName, batchInterval, hostname, port) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
ssc.socketTextStream(hostname, port.toInt)
.map(r => {
implicit val formats = DefaultFormats
parse(r)
})
.filter(jvalue => {
jvalue \ "attributes" \ "Wi-Fi" != JNothing
})
.map(jvalue => {
implicit val formats = DefaultFormats
((jvalue \ "attributes" \ "Wi-Fi").extract[String], (jvalue \ "stars").extract[Int])
})
.combineByKey(
(v) => (v, 1),
(accValue: (Int, Int), v) => (accValue._1 + v, accValue._2 + 1),
(accCombine1: (Int, Int), accCombine2: (Int, Int)) => (accCombine1._1 + accCombine2._1, accCombine1._2 + accCombine2._2),
new HashPartitioner(ssc.sparkContext.defaultParallelism))
.map({ case (k, v) => (k, v._1 / v._2.toFloat) })
.print()
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:57,代码来源:L10-2DataProc.scala
示例3: JoinableRDD
//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
class JoinableRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) {
def selfJoin(numPartitions: Int = rdd.partitions.length): RDD[(K, (V, V))] = fastJoin(rdd, numPartitions)
def fastJoin[W](other: RDD[(K, W)], numPartitions: Int = rdd.partitions.length): RDD[(K, (V, W))] = {
val partitioner = new HashPartitioner(numPartitions)
val grouped = rdd cogroup other
val left = grouped.flatMap{
case (k, (vs, ws)) => vs.zipWithIndex.map {
case (v, idx) => ((k, idx), v)
}
}.partitionBy(partitioner)
val right = grouped.flatMap {
case (k, (vs, ws)) => ws.map { w => ((k, w.hashCode()), (w, vs.size)) }
}.partitionBy(partitioner).flatMap {
case ((k, r), (w, size)) => (0 until size).map(i => ((k, w), i))
}.map {
case ((k, w), idx) => ((k, idx), w)
}
(left join right).map {
case ((k, idx), (v, w)) => (k, (v, w))
}
}
}
开发者ID:kakao,项目名称:cuesheet,代码行数:36,代码来源:JoinableRDD.scala
示例4: userItem
//设置package包名称以及导入依赖的类
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf}
object userItem {
def main(args: Array[String]) {
if (args.length < 1) {
println("Usage filePath")
System.exit(1)
}
val conf = new SparkConf().setAppName("tianchi Test")
val sc = new SparkContext(conf)
//filePath
val rawData = sc.textFile(args(0))
val userData = rawData.map(line =>
(line.split(",")(0), line.split(",")(1), line.split(",")(2).toInt,
line.split(",")(5).split(" ")(0)))
val userPair = userData.map(x => ((x._1, x._2), (x._3, x._4))).
partitionBy(new HashPartitioner(100)).persist()
val userItem = userData.map(x => (x._1, x._2, calcu(x._3, x._4))).
map(x => (Pair(x._1, x._2), x._3))
val userWeight = userItem.reduceByKey(_ + _).map(line => (line._1, Math.sqrt(line._2)))
userWeight.collect().foreach(println)
sc.stop()
}
def calcu(para: (Int, String)): Double = {
val br: Int = para._1
val tm: String = para._2
val rate = {
if (tm <= "2014-11-27") {
0.95 * 0.95
} else if (tm <= "2014-12-07") {
0.95
} else {
1.00
}
}
Math.pow(br * rate, 2.0)
}
}
开发者ID:JensenFeng,项目名称:tianchi,代码行数:43,代码来源:userItem.scala
示例5: MD5Functions
//设置package包名称以及导入依赖的类
package com.destrys
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.codec.digest.DigestUtils._
import org.apache.spark.HashPartitioner
import java.security.MessageDigest
class MD5Functions(rdd: RDD[String]) extends Logging with Serializable {
def md5_partitioned(parts: Int = 100): String = {
val partitioner = new HashPartitioner(parts)
val output = rdd.
map(x => (x, 1)).
repartitionAndSortWithinPartitions(partitioner).
map(_._1).
mapPartitions(x => Iterator(x.foldLeft(getMd5Digest())(md5))).
map(x => new java.math.BigInteger(1, x.digest()).toString(16)).
collect().
sorted.
foldLeft(getMd5Digest())(md5)
val checksum = new java.math.BigInteger(1, output.digest()).toString(16)
return(checksum)
}
def md5(prev: MessageDigest, in2: String): MessageDigest = {
val b = in2.getBytes("UTF-8")
prev.update(b, 0, b.length)
prev
}
}
object RddMD5 {
implicit def rddToMD5Functions(rdd: RDD[String]) = new MD5Functions(rdd)
def main(args: Array[String]) {
// Configuration
val conf = new SparkConf().setAppName("CatExample") // Creates empty SparkConf and sets the AppName
val sc = new SparkContext(conf) // Creates a new SparkContext
val text = sc.textFile(args(0)) // Reads a textfile, in local-mode the default is file:///
// in yarn-mode, the default is hdfs:///
text.md5_partitioned(10)
}
}
开发者ID:destrys,项目名称:spark-md5,代码行数:54,代码来源:rddmd5.scala
示例6: RDFS3
//设置package包名称以及导入依赖的类
package com.hj.examples
import com.hj.constant.Const
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object RDFS3 {
def main(args: Array[String]): Unit = {
//Arguments: input/RDFS3.in output/RDFS3.out
if(args.length != 2) {
System.out.println("Arguments are invalid! \nExample: <input_path> <output_path>")
System.exit(1)
}
val inputPath = args(0)
val outputPath = args(1)
val conf = new SparkConf().setAppName("RDFS3.in").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile(inputPath) //"input/RDFS3.in"
val triples = lines.map(x => {
val arr = x.split(" ")
(arr(0), arr(1), arr(2))
})
val partitioner = new HashPartitioner(2)
val range = triples.filter(x => x._2.equals(Const.RDFS_RANGE)).map(x => (x._1, x._3))
val pso = triples.map(x => (x._2, (x._1, x._3))).partitionBy(partitioner)
val joined = pso.join(range)
val res = joined.map(x => (x._2._1._2, x._2._2))
res.foreach(x => println(x))
res.saveAsTextFile(outputPath)
}
}
开发者ID:hualichenxi,项目名称:SparkSRE,代码行数:39,代码来源:RDFS3.scala
示例7: AggregateByKeyTest
//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.rdd.pair
import com.datawizards.sparklocal.SparkLocalBaseTest
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.HashPartitioner
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class AggregateByKeyTest extends SparkLocalBaseTest {
val data = Seq(("a",1),("a",2),("b",2),("b",3),("c",1),("c",1),("c",2))
test("aggregateByKey result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).aggregateByKey(0)({case (a,p) => a + p}, _ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("aggregateByKey(numPartitions) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).aggregateByKey(0, 2)({case (a,p) => a + p},_ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("aggregateByKey(partitioner) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).aggregateByKey(0, new HashPartitioner(2))({case (a,p) => a + p},_ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("aggregateByKey equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.aggregateByKey(0)({case (a,p) => a + p},_ + _)
}
}
test("aggregateByKey(numPartitions) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.aggregateByKey(0, 1)({case (a,p) => a + p},_ + _)
}
}
test("aggregateByKey(partitioner) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.aggregateByKey(0, new HashPartitioner(2))({case (a,p) => a + p},_ + _)
}
}
}
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:57,代码来源:AggregateByKeyTest.scala
示例8: FoldByKeyTest
//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.rdd.pair
import com.datawizards.sparklocal.SparkLocalBaseTest
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.HashPartitioner
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class FoldByKeyTest extends SparkLocalBaseTest {
val data = Seq(("a",1),("a",2),("b",2),("b",3),("c",1),("c",1),("c",2))
test("foldByKey result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).foldByKey(0)(_ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("foldByKey(numPartitions) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).foldByKey(0, 2)(_ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("foldByKey(partitioner) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).foldByKey(0, new HashPartitioner(2))(_ + _)
) {
Array(("a",3),("b",5),("c",4))
}
}
test("foldByKey equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.foldByKey(0)(_ + _)
}
}
test("foldByKey(numPartitions) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.foldByKey(0, 1)(_ + _)
}
}
test("foldByKey(partitioner) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.foldByKey(0, new HashPartitioner(2))(_ + _)
}
}
}
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:57,代码来源:FoldByKeyTest.scala
示例9: GroupByKeyTest
//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.rdd.pair
import com.datawizards.sparklocal.SparkLocalBaseTest
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.HashPartitioner
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class GroupByKeyTest extends SparkLocalBaseTest {
val data = Seq(("a",1),("a",2),("b",2),("b",3),("c",1),("c",1),("c",2))
test("groupByKey result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).groupByKey()
) {
Array(("a",Iterable(1,2)),("b",Iterable(2,3)),("c",Iterable(1,1,2)))
}
}
test("groupByKey(numPartitions) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).groupByKey(2)
) {
Array(("a",Iterable(1,2)),("b",Iterable(2,3)),("c",Iterable(1,1,2)))
}
}
test("groupByKey(partitioner) result") {
assertRDDOperationResultWithSorted(
RDDAPI(data).groupByKey(new HashPartitioner(2))
) {
Array(("a",Iterable(1,2)),("b",Iterable(2,3)),("c",Iterable(1,1,2)))
}
}
test("groupByKey equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.groupByKey()
}
}
test("groupByKey(numPartitions) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.groupByKey(2)
}
}
test("groupByKey(partitioner) equal") {
assertRDDOperationReturnsSameResultWithSorted(data){
ds => ds.groupByKey(new HashPartitioner(2))
}
}
}
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:57,代码来源:GroupByKeyTest.scala
示例10: PartitionByTest
//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.rdd.pair
import com.datawizards.sparklocal.SparkLocalBaseTest
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.HashPartitioner
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class PartitionByTest extends SparkLocalBaseTest {
test("PartitionBy result") {
val rdd = RDDAPI(Seq(("a",1),("b",2)))
assert(rdd.partitionBy(new HashPartitioner(2)) == rdd)
}
test("PartitionBy equal") {
assertRDDOperationReturnsSameResult(Seq(("a",1),("b",2))){
rdd => rdd.partitionBy(new HashPartitioner(2)).collectAsMap()
}
}
}
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:24,代码来源:PartitionByTest.scala
示例11: TestJoins
//设置package包名称以及导入依赖的类
package examples
import org.apache.spark.{ SparkConf, SparkContext, HashPartitioner }
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import scala.Iterator
object TestJoins {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("TestJoinJob"))
val x = sc.parallelize(List((1, 2), (1, 3), (2, 3), (2, 4))).partitionBy(new HashPartitioner(2)).cache
val y = sc.parallelize(List((2, 5), (2, 6))).partitionBy(new HashPartitioner(2)).cache
inspectRDD(x)
inspectRDD(y)
println(">>> joining x with y")
val joinRDD = x.join(y).cache
joinRDD.collect().foreach(println)
inspectRDD(joinRDD)
println(">>> left outer join of x with y")
val leftJoin = x.leftOuterJoin(y).cache
leftJoin.collect().foreach(println)
inspectRDD(leftJoin)
println(">>> right outer join of x with y")
val rightJoin = x.rightOuterJoin(y).cache
rightJoin.collect().foreach(println)
inspectRDD(rightJoin)
}
def inspectRDD[T](rdd: RDD[T]): Unit = {
println(">>> Partition length...")
rdd.mapPartitions(f => Iterator(f.length), true).foreach(println)
println(">>> Partition data...")
rdd.foreachPartition(f => f.foreach(println))
}
}
开发者ID:prithvirajbose,项目名称:spark-dev,代码行数:45,代码来源:TestJoins.scala
示例12: MyGraphLoader
//设置package包名称以及导入依赖的类
package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
object MyGraphLoader extends Logging{
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numVertexPartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: MyGraph[Int, Int] =
{
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines =
if (numVertexPartitions > 0) {
sc.textFile(path, numVertexPartitions).coalesce(numVertexPartitions)
} else {
sc.textFile(path)
}
val mid_data = lines.map(line => {
val parts = line.split("\\s+")
(parts(0).toLong, parts(1).toLong)
})++ lines.map(line => {
val parts = line.split("\\s+")
(parts(1).toLong,-1l)
}) ++ lines.map(line => {
val parts = line.split("\\s+")
(parts(0).toLong,-1l)
})
val links = mid_data.groupByKey(new HashPartitioner(numVertexPartitions)).cache()
println("It took %d ms to group".format(System.currentTimeMillis - startTime))
//?????0???????? ??(4,()) (5,())...
MyGraphImpl.fromEdgeList(links, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
}
开发者ID:yuanqingsunny,项目名称:graph,代码行数:54,代码来源:MyGraphLoader.scala
示例13: MyEdgeRDDImpl
//设置package包名称以及导入依赖的类
package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, OneToOneDependency}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.reflect.ClassTag
class MyEdgeRDDImpl[ED: ClassTag] private[graphx]
(
@transient override val partitionsRDD: RDD[(PartitionID, MyEdgePartition[ED])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends MyEdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
override val partitioner =
partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.length)))
override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): MyEdgeRDDImpl[ED2] =
mapEdgePartitions((pid, part) => part.map(f))
def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, MyEdgePartition[ED]) => MyEdgePartition[ED2]): MyEdgeRDDImpl[ED2] = {
this.withPartitionsRDD[ED2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}
private[graphx] def withPartitionsRDD[ED2: ClassTag]( partitionsRDD: RDD[(PartitionID, MyEdgePartition[ED2])]): MyEdgeRDDImpl[ED2] = {
new MyEdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
}
override def withTargetStorageLevel(storageLevel: StorageLevel): MyEdgeRDD[ED] = {
new MyEdgeRDDImpl(this.partitionsRDD, storageLevel)
}
}
开发者ID:yuanqingsunny,项目名称:graph,代码行数:42,代码来源:MyEdgeRDDImpl.scala
示例14: PageRank
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.Logging
import org.apache.spark.HashPartitioner
object PageRank extends Logging {
def main (args: Array[String]) {
// parâmetros de entrada
val fileName = args(0)
val numIterations = args(1).toInt
// SparkContext
val conf = new SparkConf().setAppName("PageRank")
val sc = new SparkContext(conf)
val lines = sc.textFile (fileName)
val links = lines.
map ( line => line.split("\\s+").map(str => str.toInt) ).
map ( ids => (ids(0), ids.drop(1)) ). // (página, lista de adjacência)
partitionBy (new HashPartitioner(lines.partitions.size)).
cache() // os links serão reutilizados a cada iteração
var ranks = links.mapValues (_ => 1.0) // (página, 1.0)
for (i <- 1 to numIterations) {
ranks = links.join(ranks).
flatMap { case (url, (adjList, rank)) =>
adjList.map (dest => (dest, rank / adjList.size))
}.
reduceByKey(_ + _)
}
logInfo ("Ranking sample (4) = " + ranks.take(4).mkString(","))
sc.stop()
}
}
开发者ID:eubr-bigsea,项目名称:simple-spark-app,代码行数:41,代码来源:PageRank.scala
示例15: OuterRddsJoin
//设置package包名称以及导入依赖的类
package hr.fer.ztel.thesis.multiplication.outer
import hr.fer.ztel.thesis.datasource.MatrixDataSource._
import hr.fer.ztel.thesis.spark.SparkSessionHandler
import hr.fer.ztel.thesis.sparse_linalg.SparseVectorOperators._
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
object OuterRddsJoin {
def main(args: Array[String]): Unit = {
val handler = new SparkSessionHandler(args)
implicit val spark = handler.getSparkSession
val partitioner = Some(new HashPartitioner(16))
val itemUserMatrix: RDD[(Int, Array[Int])] = // (item, [user])
readItemUserMatrix(handler.userItemPath, partitioner)
// to reduce join (shuffle size), discarding all unmatched rows/cols in item-item matrix
val broadcastedBoughtItems = spark.sparkContext.broadcast(itemUserMatrix.keys.collect)
val boughtItemItemMatrix: RDD[(Int, Map[Int, Double])] = // (item, [item, sim])
readBoughtItemItemMatrix(handler.itemItemPath, handler.measure, handler.normalize, broadcastedBoughtItems, partitioner)
// join should produce at most 40k out of 67k tuples in shuffle-u
val join = itemUserMatrix.join(boughtItemItemMatrix, partitioner.get)
val itemSeenByUsersBroadcast = spark.sparkContext.broadcast(
itemUserMatrix.mapValues(_.toSet).collectAsMap.toMap
)
val recommendations = join
.flatMap { case (_, (userVector, itemVector)) => outer(userVector, itemVector) }
// by (user, item) key
.reduceByKey(_ + _)
.map { case ((user, item), utility) => (user, (item, utility)) }
// by (user) key
.groupByKey
.mapPartitions {
val localItemSeenByUsers = itemSeenByUsersBroadcast.value
_.map { case (user, items) =>
val unSeenItems = items.filterNot { case (item, _) => localItemSeenByUsers(item).contains(user) }
val unSeenTopKItems = argTopK(unSeenItems.toArray, handler.topK)
s"$user:${unSeenTopKItems.mkString(",")}"
}
}
recommendations.saveAsTextFile(handler.recommendationsPath)
println(s"Recommendations saved in: ${handler.recommendationsPath}.")
}
}
开发者ID:fpopic,项目名称:master_thesis,代码行数:56,代码来源:OuterRddsJoin.scala
示例16: OuterMatrixEntry
//设置package包名称以及导入依赖的类
package hr.fer.ztel.thesis.multiplication.outer
import hr.fer.ztel.thesis.datasource.MatrixEntryDataSource._
import hr.fer.ztel.thesis.spark.SparkSessionHandler
import hr.fer.ztel.thesis.sparse_linalg.SparseVectorOperators.argTopK
import org.apache.spark.HashPartitioner
object OuterMatrixEntry {
def main(args: Array[String]): Unit = {
val handler = new SparkSessionHandler(args)
implicit val spark = handler.getSparkSession
val partitioner = new HashPartitioner(128)
val itemUserEntries = readItemUserEntries(handler.userItemPath)
.map(x => (x.i.toInt, (x.j.toInt, x.value)))
.partitionBy(partitioner)
val itemItemEntries = readItemItemEntries(handler.itemItemPath, handler.measure, handler.normalize)
.map(x => (x.i.toInt, (x.j.toInt, x.value)))
.partitionBy(partitioner)
val itemSeenByUsersBroadcast = spark.sparkContext.broadcast(
itemUserEntries.groupByKey.mapValues(_.map(_._1).toSet).collectAsMap.toMap
)
val recommendations = itemUserEntries.join(itemItemEntries, partitioner)
.map { case (_, ((user, _), (item, similarity))) => ((user, item), similarity) }
// by (user, item) key
.reduceByKey(_ + _)
.map { case ((user, item), utility) => (user, (item, utility)) }
// by (user) key
.groupByKey
.mapPartitions {
val localItemSeenByUsers = itemSeenByUsersBroadcast.value
_.map { case (user, items) =>
val unSeenItems = items.filterNot { case (item, _) => localItemSeenByUsers(item).contains(user) }
val unSeenTopKItems = argTopK(unSeenItems.toArray, handler.topK)
s"$user:${unSeenTopKItems.mkString(",")}"
}
}
recommendations.saveAsTextFile(handler.recommendationsPath)
println(s"Recommendations saved in: ${handler.recommendationsPath}.")
}
}
开发者ID:fpopic,项目名称:master_thesis,代码行数:52,代码来源:OuterMatrixEntry.scala
注:本文中的org.apache.spark.HashPartitioner类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论