本文整理汇总了Scala中org.apache.spark.graphx.Edge类的典型用法代码示例。如果您正苦于以下问题:Scala Edge类的具体用法?Scala Edge怎么用?Scala Edge使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Edge类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: PCAClustering
//设置package包名称以及导入依赖的类
package graph
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{EdgeDirection, Edge, Graph}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Matrix, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import scala.collection.mutable
class PCAClustering {
def matrixToRDD(sc:SparkContext, m: Matrix): RDD[Vector] = {
val columns = m.toArray.grouped(m.numRows)
val rows = columns.toSeq.transpose // Skip this if you want a column-major RDD.
val vectors = rows.map(row => new DenseVector(row.toArray))
sc.parallelize(vectors)
}
def run(inputGraph: Graph[Any, Any], clusterNum: Int, eigsNum: Int,sc:SparkContext ): Graph[Int, Any] = {
val numNode = inputGraph.numVertices.toInt
val mapping = new mutable.HashMap[Long,Int]()
val revMapping = new mutable.HashMap[Int, Long]()
val verticeIds = inputGraph.vertices.map( u => u._1 ).collect()
for(i<-0 to numNode - 1) {
mapping.put(verticeIds.apply(i), i)
revMapping.put(i, verticeIds.apply(i))
}
//reindex the verteces from 0 to the num of nodes
val nVertices = inputGraph.vertices.map( u=> (mapping.apply(u._1).toLong, u._2))
val nEdges = inputGraph.edges.map(e=> Edge(mapping.apply(e.srcId).toLong, mapping.apply(e.dstId).toLong, e.attr))
val ngraph = Graph(nVertices, nEdges)
val output = ngraph.collectNeighborIds(EdgeDirection.Out)
val spvec = output.mapValues(r => Vectors.sparse( numNode, r.map(e=>e.toInt) , r.map(e=> 1.0/r.length )))
val rows = spvec.map(v=>v._2)
val order = spvec.map(v=>v._1)
val mat = new RowMatrix(rows)
val pc = mat.computePrincipalComponents(eigsNum)
val pcRDD = matrixToRDD(sc, pc)
val clusters = KMeans.train(pcRDD, clusterNum, 100)
val clusterArray = pcRDD.map(p=> clusters.predict(p) ).collect()
val assignedClusters = order.map( o => (o, clusterArray.apply(o.toInt)))
val origVerextRDD = assignedClusters.map{case (vid, value)=> (revMapping.apply(vid.toInt), value)}
Graph(origVerextRDD, inputGraph.edges)
}
}
开发者ID:HPCL,项目名称:GalacticSpark,代码行数:56,代码来源:PCAClustering.scala
示例2: Main
//设置package包名称以及导入依赖的类
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph}
object Main extends App {
val sparkConf = new SparkConf()
.setAppName("Simple Application")
.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
sparkContext.setLogLevel("ERROR")
val vertices = sparkContext.makeRDD(Array((1L, 0), (2L, 0), (3L, 0), (4L, 0), (5L, 0), (6L, 0)))
val edges = sparkContext.makeRDD(Array(
Edge(1L, 2L, ""), Edge(1L, 3L, ""), Edge(1L, 6L, ""),
Edge(2L, 3L, ""), Edge(2L, 4L, ""), Edge(2L, 5L, ""),
Edge(3L, 5L, ""),
Edge(4L, 6L, ""),
Edge(5L, 6L, "")))
val graph = Graph(vertices, edges)
}
开发者ID:Seyzz,项目名称:GraphColoring,代码行数:24,代码来源:Main.scala
示例3: EdgeProviders
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.csv.providers
import ml.sparkling.graph.loaders.csv.types.CSVTypes.EdgeAttributeExtractor
import ml.sparkling.graph.loaders.csv.types.Types.ToVertexId
import ml.sparkling.graph.loaders.csv.types.{CSVTypes, Types}
import ml.sparkling.graph.loaders.csv.utils.DefaultTransformers
import ml.sparkling.graph.loaders.csv.utils.DefaultTransformers.{defaultEdgeAttribute, numberToVertexId}
import org.apache.spark.graphx.Edge
import org.apache.spark.sql.Row
import scala.reflect.ClassTag
object EdgeProviders {
type TwoColumnsMakeEdgeProvider[VD,ED]=(Int,Int,Row, ToVertexId[VD], EdgeAttributeExtractor[ED])=>Seq[Edge[ED]]
def twoColumnsMakesEdge[VD:ClassTag,ED:ClassTag](id1:Int,
id2:Int,row:Row,
columnToId:ToVertexId[VD],
edgeAttributeProvider:EdgeAttributeExtractor[ED]):Seq[Edge[ED]]={
Seq(Edge(columnToId(row.getAs(id1)),columnToId(row.getAs(id2)),edgeAttributeProvider(row)))
}
def twoColumnsMakesEdge[VD:ClassTag](id1:Int,
id2:Int,
row:Row):Seq[Edge[Double]]={
twoColumnsMakesEdge(id1,id2,row,numberToVertexId _,defaultEdgeAttribute _)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:32,代码来源:EdgeProviders.scala
示例4: GraphProviders
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.csv.providers
import ml.sparkling.graph.loaders.csv.types.Types
import ml.sparkling.graph.loaders.csv.types.Types.ToVertexId
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession;
import scala.reflect.ClassTag
object GraphProviders {
val defaultStorageLevel=StorageLevel.MEMORY_ONLY
def simpleGraphBuilder[VD: ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: Row => Seq[(VertexId, VD)],
edgeProvider: Row => Seq[Edge[ED]],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel =defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
def mapRows[MT: ClassTag](mappingFunction: (Row) => Seq[MT]): RDD[MT] = {
dataFrame.rdd.mapPartitionsWithIndex((id, rowIterator) => {
rowIterator.flatMap { case row => mappingFunction(row) }
})
}
val vertices: RDD[(VertexId, VD)] = mapRows(vertexProvider)
val edges: RDD[Edge[ED]] = mapRows(edgeProvider)
defaultVertex match{
case None => Graph(vertices,edges,edgeStorageLevel=edgeStorageLevel,vertexStorageLevel=vertexStorageLevel)
case Some(defaultVertexValue)=> Graph(vertices,edges,defaultVertexValue,edgeStorageLevel,vertexStorageLevel)
}
}
def indexedGraphBuilder[VD:ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: (Row, ToVertexId[VD]) => Seq[(VertexId, VD)],
edgeProvider: (Row, ToVertexId[VD]) => Seq[Edge[ED]],
columnsToIndex: Seq[Int],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel = defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
val index = dataFrame.rdd.flatMap(row => columnsToIndex.map(row(_))).distinct().zipWithUniqueId().collect().toMap
def extractIdFromIndex(vertex: VD) = index(vertex)
simpleGraphBuilder(defaultVertex,
vertexProvider(_: Row, extractIdFromIndex _),
edgeProvider(_: Row, extractIdFromIndex _),
edgeStorageLevel,
vertexStorageLevel)(dataFrame)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:54,代码来源:GraphProviders.scala
示例5: StoryBatchDedup
//设置package包名称以及导入依赖的类
package io.gzet.story
import io.gzet.story.model.{Content, Article}
import org.apache.spark.graphx.{Graph, Edge}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import io.gzet.story.util.SimhashUtils._
import com.datastax.spark.connector._
object StoryBatchDedup extends SimpleConfig with Logging {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Story Extractor")
val sc = new SparkContext(sparkConf)
val simhashRDD = sc.cassandraTable[Article]("gzet", "articles").zipWithIndex().map({ case (a, id) =>
((id, Content(a.url, a.title, a.body)), a.hash)
})
Set(0)
val duplicateTupleRDD = simhashRDD.flatMap({ case ((id, content), simhash) =>
searchmasks.map({ mask =>
(simhash ^ mask, id)
})
}).groupByKey()
val edgeRDD = duplicateTupleRDD.values.flatMap({ it =>
val list = it.toList
for (x <- list; y <- list) yield (x, y)
}).filter({ case (x, y) =>
x != y
}).distinct().map({case (x, y) =>
Edge(x, y, 0)
})
val duplicateRDD = Graph.fromEdges(edgeRDD, 0L)
.connectedComponents()
.vertices
.join(simhashRDD.keys)
.values
duplicateRDD.sortBy(_._1).collect().foreach({ case (story, content) =>
println(story + "\t" + content.title)
})
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:49,代码来源:StoryBatchDedup.scala
示例6: GodwinTest
//设置package包名称以及导入依赖的类
package io.gzet.timeseries.graph
import io.gzet.test.SparkFunSuite
import org.apache.log4j.{Logger, Level}
import org.apache.spark.graphx.{Graph, Edge}
import org.apache.spark.rdd.RDD
import scala.io.Source
class GodwinTest extends SparkFunSuite {
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.OFF)
def buildEdges() = {
Source.fromInputStream(getClass.getResourceAsStream("/edges.csv")).getLines().drop(1).map(s => {
val Array(source, target, weight) = s.split(",")
Edge(source.toLong, target.toLong, weight.toDouble)
}).toList
}
localTest("Test Random Walks") { sc =>
val edges: RDD[Edge[Double]] = sc.parallelize(buildEdges(), 1)
val godwin = new Godwin(Seq(16))
val walks = godwin.randomWalks(Graph.fromEdges(edges, 0L), 4).collect().sortBy(_._2)
println(walks.map(_._1).mkString(" -> "))
walks.last._1 should be(16)
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:31,代码来源:GodwinTest.scala
示例7:
//设置package包名称以及导入依赖的类
import java.io.{File, FileWriter}
import ml.sparkling.graph.operators.OperatorsDSL._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.Edge
import scala.collection.mutable
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
val edges = forwardEdges.union(forwardEdges.map(x => Edge(x.dstId, x.srcId, 0)))
// Get vertex RDD to initialize the graph
val vertices = ctx.parallelize(forwardEdges.map(x => Set(x.srcId, x.dstId)).reduce(_ ++ _).map(x => (x, 0)).toArray.toSeq)
// Construct Graph
val g = Graph(vertices, edges)
// Graph where each vertex is associated with its component identifier
val components = g.PSCAN(epsilon=0.5).vertices.map(x=>(x._2,mutable.Set(x._1))).reduceByKey(_++_).collectAsMap()
// Dump to output file
val keys = components.keys.toList.sorted
val writer = new FileWriter(new File(outfile))
for(key <- keys){
val out = components(key).toList.sorted
//println(components(key).toString())
writer.write("["+out.head)
for(i <- 1 until out.length) yield writer.write(","+out(i))
writer.write("]\n")
}
writer.close()
}
}
开发者ID:mit2nil,项目名称:data_mining_assignments,代码行数:38,代码来源:Nilay_Chheda_bonus.scala
示例8: FindInfluencer
//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx.example
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FindInfluencer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Twittter Influencer").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
sparkContext.setLogLevel("ERROR")
val twitterData = sparkContext.textFile("src/main/resources/twitter-graph-data.txt")
val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(0).replace("((", "")
val id = arr(1).replace(")", "")
(id.toLong, user)
}
val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(2).replace("(", "")
val id = arr(3).replace("))", "")
(id.toLong, user)
}
val vertices = followeeVertices.union(followerVertices)
val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>
val followeeId = arr(1).replace(")", "").toLong
val followerId = arr(3).replace("))", "").toLong
Edge(followeeId, followerId, "follow")
}
val defaultUser = ("")
val graph = Graph(vertices, edges, defaultUser)
val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>
attr + "," + msg,
triplet => Iterator((triplet.srcId, triplet.dstAttr)),
(a, b) => (a + "," + b))
val lengthRDD = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {
override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
Ordering[Int].compare(x._2, y._2)
})
val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head
println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")
sparkContext.stop()
}
}
开发者ID:knoldus,项目名称:spark-graphx-twitter,代码行数:54,代码来源:FindInfluencer.scala
示例9: PairwiseBPSuite
//设置package包名称以及导入依赖的类
package sparkle.graph
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
import sparkle.util.LocalSparkContext
class PairwiseBPSuite extends FunSuite with LocalSparkContext {
test("Pairwise BP test") {
// test from the lectures EECS course 6.869, Bill Freeman and Antonio Torralba.
// Chapter 7.3.5 Numerical example.
withSpark { sc =>
val vertices: RDD[(Long, PVertex)] = sc.parallelize(Seq(
(1L, PVertex(Variable(Array(0.0, 0.0)), Variable(Array(1.0, 1.0).map(math.log)))),
(2L, PVertex(Variable(Array(0.0, 0.0)), Variable(Array(1.0, 1.0).map(math.log)))),
(3L, PVertex(Variable(Array(0.0, 0.0)), Variable(Array(1.0, 1.0).map(math.log)))),
(4L, PVertex(Variable(Array(0.0, 0.0)), Variable(Array(1.0, 0.0).map(math.log)))))
)
val edges = sc.parallelize(Seq(
Edge(1L, 2L, PEdge(Factor(Array(2, 2), Array(1.0, 0.9, 0.9, 1.0).map(math.log)), Variable(Array(0.0, 0.0)), Variable(Array(0.0, 0.0)))),
Edge(2L, 3L, PEdge(Factor(Array(2, 2), Array(0.1, 1.0, 1.0, 0.1).map(math.log)), Variable(Array(0.0, 0.0)), Variable(Array(0.0, 0.0)))),
Edge(2L, 4L, PEdge(Factor(Array(2, 2), Array(1.0, 0.1, 0.1, 1.0).map(math.log)), Variable(Array(0.0, 0.0)), Variable(Array(0.0, 0.0))))
))
val graph = Graph(vertices, edges)
val bpGraph = PairwiseBP(graph)
val trueProbabilities = Seq(
1L -> (1.0 / 2.09 * 1.09, 1.0 / 2.09 * 1.0),
2L -> (1.0 / 1.1 * 1.0, 1.0 / 1.1 * 0.1),
3L -> (1.0 / 1.21 * 0.2, 1.0 / 1.21 * 1.01),
4L -> (1.0, 0.0)).sortBy { case (vid, _) => vid }
val calculatedProbabilities = bpGraph.vertices.collect().sortBy { case (vid, _) => vid }
val eps = 10e-5
calculatedProbabilities.zip(trueProbabilities).foreach {
case ((_, vertex), (_, (trueP0, trueP1))) =>
assert(trueP0 - vertex.belief.exp().cloneValues(0) < eps && trueP1 - vertex.belief.exp().cloneValues(1) < eps)
}
}
}
test("Pariwise BP test with file") {
withSpark { sc =>
val graph = PairwiseBP.loadPairwiseGraph(sc, "data/vertex4.txt", "data/edge4.txt")
val bpGraph = PairwiseBP(graph)
val trueProbabilities = Seq(
1L -> (1.0 / 2.09 * 1.09, 1.0 / 2.09 * 1.0),
2L -> (1.0 / 1.1 * 1.0, 1.0 / 1.1 * 0.1),
3L -> (1.0 / 1.21 * 0.2, 1.0 / 1.21 * 1.01),
4L -> (1.0, 0.0)).sortBy { case (vid, _) => vid }
val calculatedProbabilities = bpGraph.vertices.collect().sortBy { case (vid, _) => vid }
val eps = 10e-5
calculatedProbabilities.zip(trueProbabilities).foreach {
case ((_, vertex), (_, (trueP0, trueP1))) =>
assert(trueP0 - vertex.belief.exp().cloneValues(0) < eps && trueP1 - vertex.belief.exp().cloneValues(1) < eps)
}
}
}
}
开发者ID:HewlettPackard,项目名称:sandpiper,代码行数:61,代码来源:PairwiseBPSuite.scala
示例10: WikiPageLink
//设置package包名称以及导入依赖的类
package dataset
import org.apache.spark.graphx.{Graph, Edge}
import org.apache.spark.{SparkContext, SparkConf}
import utils.{MetaFile, FileHandler, FileLogger}
import scala.collection.mutable
object WikiPageLink {
def main (args: Array[String]): Unit = {
val filepath = args(0)
val output = args(1)
val log = args(2);
val uriType = args(3)
FileLogger.open(log)
try {
val conf = new SparkConf().setAppName("lr")
val sc = new SparkContext(conf)
val pageLinks = sc.textFile(filepath)
val uriEdge = pageLinks.filter(l=> (!l.startsWith("#"))).map{l=>val uris = l.split(" "); (uris.apply(0), uris.apply(1))}
val uriHead = uriEdge.map(e=>e._1)
val uriTail = uriEdge.map(e=>e._2)
val allURIs = uriHead.union(uriTail).distinct().zipWithUniqueId()
val uriMapping = mutable.HashMap.empty[String,Long]
allURIs.collect().foreach{case (uri, id)=> uriMapping += (uri->id)}
val edgeRDD = uriEdge.map{case (h,t)=> Edge(uriMapping.get(h).get, uriMapping.get(h).get, 1 ) }
val vertexRDD = allURIs.map{_.swap}
val g = Graph(vertexRDD, edgeRDD)
FileLogger.println("Number of verteces: " + vertexRDD.count())
FileLogger.println("Number of edges: " + edgeRDD.count() )
val outVerFile = FileHandler.getFullName(sc, uriType, "vertices")
val outEdgeFile = FileHandler.getFullName(sc, uriType, "edges")
//largestCC.vertices
vertexRDD.saveAsObjectFile(outVerFile)
edgeRDD.saveAsObjectFile(outEdgeFile)
val outObjects = new Array[MetaFile](2)
val verObj = new MetaFile("VertexRDD[(Long,String)]", uriType, outVerFile)
val edgeObj = new MetaFile("EdgeRDD[Int]", uriType, outEdgeFile)
FileHandler.dumpOutput(args(1), Array(verObj, edgeObj))
} catch {
case e: Exception => FileLogger.println("ERROR. tool unsuccessful:" + e);
} finally {
FileLogger.close();
}
}
}
开发者ID:HPCL,项目名称:GalacticSpark,代码行数:56,代码来源:WikiPageLink.scala
示例11: TestData
//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx
import org.apache.spark.graphx.{ Edge, Graph, VertexId }
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext }
import org.scalatest.FunSuite
object TestData {
val sparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val users: RDD[(VertexId, (String, String))] =
sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] =
sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing")
}
class PropertyGraphTest extends FunSuite {
import com.knoldus.spark.graphx.TestData._
val propertyGraph = new PropertyGraph(sparkContext)
test("property graph returns graph") {
val graph = propertyGraph.getGraph(users, relationships, defaultUser)
assert(graph.edges.count() === 4)
}
test("property graph returns triplets in a graph") {
val graph = propertyGraph.getTripletView(Graph(users, relationships, defaultUser))
assert(graph.count() === 4)
}
test("property graph returns indegree of a graph") {
val graph = propertyGraph.getInDegree(Graph(users, relationships, defaultUser))
assert(graph.count() === 3)
}
test("property graph returns subgraph of a graph") {
val users: RDD[(VertexId, (String, String))] =
sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
val relationships: RDD[Edge[String]] =
sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"),
Edge(5L, 0L, "colleague")))
val defaultUser = ("John Doe", "Missing")
val subGraph = propertyGraph.getSubGraph(Graph(users, relationships, defaultUser), { (id: Long, attr: (String, String)) => attr._2 != "Missing" })
assert(subGraph.edges.count() === 4)
}
}
开发者ID:knoldus,项目名称:spark-graphx-cassandra,代码行数:51,代码来源:PropertyGraphTest.scala
示例12: EmployeeRelationship
//设置package包名称以及导入依赖的类
package examples.graphx
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.{ Edge, Graph }
object EmployeeRelationship {
def main(args: Array[String]): Unit = {
// vertex format: vertex_id, data
val vertexArray = Array(
(1L, ("John", "Software Developer")),
(2L, ("Robert", "Technical Leader")),
(3L, ("Charlie", "Software Architect")),
(4L, ("David", "Software Developer")),
(5L, ("Edward", "Software Development Manager")),
(6L, ("Francesca", "Software Development Manager")))
// edge format: from_vertex_id, to_vertex_id, data
val edgeArray = Array(
Edge(2L, 1L, "Technical Mentor"),
Edge(2L, 4L, "Technical Mentor"),
Edge(3L, 2L, "Collaborator"),
Edge(6L, 3L, "Team Member"),
Edge(4L, 1L, "Peers"),
Edge(5L, 2L, "Team Member"),
Edge(5L, 3L, "Team Member"),
Edge(5L, 6L, "Peers"))
val sc = new SparkContext(new SparkConf().setAppName("EmployeeRelationshipJob"))
val vertexRDD: RDD[(Long, (String, String))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[String]] = sc.parallelize(edgeArray)
val graph: Graph[(String, String), String] = Graph(vertexRDD, edgeRDD)
// Vanilla query
println(">>> Showing the names of people who are Software Developers")
graph.vertices.filter { case (id, (name, designation)) => designation.equals("Software Developer") }
.collect()
.foreach { case (id, (name, designation)) => println(s"... Name: $name, Designation: $designation") }
// Connection analysis
println(">>> People connected to Robert (Technical Leader) -> ")
graph.triplets.filter(_.srcId == 2).collect()
.foreach { item => println("... " + item.dstAttr._1 + ", " + item.dstAttr._2) }
println(">>> Robert (Technical Leader) connected to -> ")
graph.triplets.filter(_.dstId == 2).collect()
.foreach { item => println("... " + item.srcAttr._1 + ", " + item.srcAttr._2) }
println(">>> Technical Mentoring Analysis -> ")
graph.triplets.filter(_.attr.equals("Technical Mentor")).collect()
.foreach { item => println("... " + item.srcAttr._1 + " mentoring " + item.dstAttr._1) }
}
}
开发者ID:prithvirajbose,项目名称:spark-dev,代码行数:58,代码来源:EmployeeRelationship.scala
注:本文中的org.apache.spark.graphx.Edge类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论