本文整理汇总了Scala中org.apache.spark.graphx.VertexId类的典型用法代码示例。如果您正苦于以下问题:Scala VertexId类的具体用法?Scala VertexId怎么用?Scala VertexId使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了VertexId类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GraphProviders
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.csv.providers
import ml.sparkling.graph.loaders.csv.types.Types
import ml.sparkling.graph.loaders.csv.types.Types.ToVertexId
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession;
import scala.reflect.ClassTag
object GraphProviders {
val defaultStorageLevel=StorageLevel.MEMORY_ONLY
def simpleGraphBuilder[VD: ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: Row => Seq[(VertexId, VD)],
edgeProvider: Row => Seq[Edge[ED]],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel =defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
def mapRows[MT: ClassTag](mappingFunction: (Row) => Seq[MT]): RDD[MT] = {
dataFrame.rdd.mapPartitionsWithIndex((id, rowIterator) => {
rowIterator.flatMap { case row => mappingFunction(row) }
})
}
val vertices: RDD[(VertexId, VD)] = mapRows(vertexProvider)
val edges: RDD[Edge[ED]] = mapRows(edgeProvider)
defaultVertex match{
case None => Graph(vertices,edges,edgeStorageLevel=edgeStorageLevel,vertexStorageLevel=vertexStorageLevel)
case Some(defaultVertexValue)=> Graph(vertices,edges,defaultVertexValue,edgeStorageLevel,vertexStorageLevel)
}
}
def indexedGraphBuilder[VD:ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: (Row, ToVertexId[VD]) => Seq[(VertexId, VD)],
edgeProvider: (Row, ToVertexId[VD]) => Seq[Edge[ED]],
columnsToIndex: Seq[Int],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel = defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
val index = dataFrame.rdd.flatMap(row => columnsToIndex.map(row(_))).distinct().zipWithUniqueId().collect().toMap
def extractIdFromIndex(vertex: VD) = index(vertex)
simpleGraphBuilder(defaultVertex,
vertexProvider(_: Row, extractIdFromIndex _),
edgeProvider(_: Row, extractIdFromIndex _),
edgeStorageLevel,
vertexStorageLevel)(dataFrame)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:54,代码来源:GraphProviders.scala
示例2: ShortestPathLengthsFromCSV
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.examples
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes._
import ml.sparkling.graph.operators.algorithms.shortestpaths.ShortestPathsAlgorithm
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils.FastUtilWithDistance.DataMap
import ml.sparkling.graph.operators.predicates.AllPathPredicate
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Graph, VertexId}
import scala.collection.JavaConversions._
object ShortestPathLengthsFromCSV extends ExampleApp {
def body()={
val shortestPaths =if(bucketSize == -1l)
ShortestPathsAlgorithm.computeShortestPathsLengths(partitionedGraph,AllPathPredicate,treatAsUndirected)
else
ShortestPathsAlgorithm.computeShortestPathsLengthsIterative(partitionedGraph,(g:Graph[_,_])=>bucketSize,treatAsUndirected)
val size: Broadcast[VertexId] =ctx.broadcast(partitionedGraph.numVertices)
partitionedGraph.outerJoinVertices(shortestPaths.vertices)(Util.dataTransformFunction(size) _).vertices.values.saveAsTextFile(out)
ctx.stop()
}
}
private object Util{
def dataTransformFunction(size: Broadcast[VertexId])(vId: VertexId,oldValue: String,pathsOption: Option[_ >: DataMap <: JMap[JLong, JDouble]])={
pathsOption.flatMap((paths)=>{
var entries=paths.entrySet().toList.sortBy(_.getKey)
val out=new StringBuilder()
out++=s"${oldValue},"
var a = 0l
while (a < size.value) {
if (entries.size > 0 && a == entries.head.getKey) {
out ++= s"${entries.head.getValue},"
entries = entries.drop(1)
}
else {
out ++= "0,"
}
a += 1l
}
out.setLength(out.length - 1)
Option(out.toString())
}).getOrElse(oldValue)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:48,代码来源:ShortestPathLengthsFromCSV.scala
示例3: PSCANConnectedComponents
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.community.pscan
import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
class PSCANConnectedComponents(maxWeight:Double) extends Serializable{
def run[VD,ED](graph:Graph[VertexId,Double]):Graph[VertexId,Double]={
val initialMessage = Long.MaxValue
Pregel(graph, initialMessage)(
vprog = (_, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
}
def sendMessage(edge: EdgeTriplet[VertexId, Double]): Iterator[(VertexId, VertexId)] = {
if(edge.attr > maxWeight){
if(edge.srcAttr<edge.dstAttr){
Iterator((edge.dstId,edge.srcAttr))
}else if(edge.dstAttr<edge.srcAttr){
Iterator((edge.srcId,edge.dstAttr))
}else{
Iterator.empty
}
}else{
Iterator.empty
}
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:31,代码来源:PSCANConnectedComponents.scala
示例4: FastUtilWithDistance
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils
import it.unimi.dsi.fastutil.longs._
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes._
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.PathProcessor
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils.FastUtilWithDistance.DataMap
import org.apache.spark.graphx.VertexId
import scala.collection.JavaConversions._
class FastUtilWithDistance[VD, ED]() extends PathProcessor[VD, ED, DataMap] {
def EMPTY_CONTAINER = getNewContainerForPaths()
def getNewContainerForPaths() = {
new DataMap(16,0.5f)
}
def putNewPath(map: DataMap, to: VertexId, weight: ED)(implicit num: Numeric[ED]): DataMap = {
val out=map.asInstanceOf[DataMap].clone()
out.put(to, num.toDouble(weight))
out
}
def mergePathContainers(map1: DataMap, map2: DataMap)(implicit num: Numeric[ED]):DataMap = {
val out=map1.clone()
map2.foreach{case (key,inValue)=>{
val map1Value: JDouble =Option(map1.get(key)).getOrElse(inValue)
val map2Value: JDouble = inValue
val value: JDouble = min(map1Value, map2Value);
out.put(key, value)
}}
out
}
def min(d1:JDouble,d2:JDouble):JDouble={
if(d1<d2){
d1
}else{
d2
}
}
def extendPaths(targetVertexId:VertexId,map: DataMap, vertexId: VertexId, distance: ED)(implicit num: Numeric[ED]):DataMap = {
val out=map.clone()
val toAdd=num.toDouble(distance)
map.keySet().foreach{ (key: JLong) => {
out.addTo(key, toAdd)
}}
out.remove(targetVertexId)
out
}
}
object FastUtilWithDistance{
type DataMap=Long2DoubleOpenHashMap
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:60,代码来源:FastUtilWithDistance.scala
示例5: SingleVertexProcessor
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors
import org.apache.spark.graphx.VertexId
class SingleVertexProcessor[VD, ED](computedVertexId:VertexId) extends PathProcessor[VD, ED, Double] {
def EMPTY_CONTAINER = 0d
override def getNewContainerForPaths(): Double = 0d
override def extendPaths(targetVertexId: VertexId, currentValue: Double, vertexId: VertexId, distance: ED)(implicit num: Numeric[ED]): Double = {
if(targetVertexId==computedVertexId || currentValue == 0)
0d
else
currentValue+num.toDouble(distance)
}
override def mergePathContainers(map1: Double, map2: Double)(implicit num: Numeric[ED]): Double = {
(map1,map2) match{
case (0d,_)=> map2
case (_,0d)=> map1
case _ =>Math.min(map1,map2)
}
}
override def putNewPath(map: Double, to: VertexId, weight: ED)(implicit num: Numeric[ED]): Double = {
num.toDouble(weight)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:31,代码来源:SingleVertexProcessor.scala
示例6: WithPathProcessor
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors
import org.apache.spark.graphx.VertexId
class WithPathProcessor[VD,ED]() extends PathProcessor[VD,ED,Map[VertexId,(ED,Set[List[VertexId]])]]{
private type PathsSet=(ED,Set[List[VertexId]])
private type PathsMap=Map[VertexId,PathsSet]
def EMPTY_CONTAINER=Map.empty[VertexId,PathsSet]
def getNewContainerForPaths() ={
EMPTY_CONTAINER
}
def putNewPath(map:PathsMap,to:VertexId,weight:ED)(implicit num:Numeric[ED]): PathsMap={
(map + (to -> (weight,Set(to::Nil)))).map(identity)
}
def mergePathContainers(map1:PathsMap,map2:PathsMap)(implicit num:Numeric[ED]):PathsMap={
(map1.keySet ++ map2.keySet).map(vId=>(vId,mergePathSets(map1.get(vId),map2.get(vId)))).toMap.map(identity)
}
def extendPaths(targetVertexId:VertexId,map:PathsMap,vertexId:VertexId,distance:ED)(implicit num:Numeric[ED]): PathsMap ={
map.filterKeys(_!=targetVertexId).mapValues(extendPathsSet(_,vertexId,distance)).map(identity)
}
private def extendPathsSet(pathSet:PathsSet,vertexId:VertexId,distance:ED)(implicit num:Numeric[ED]):PathsSet={
pathSet match{
case (edge,set) => (num.plus(distance,edge),set.map(vertexId :: _))
}
}
private def mergePathSets(pathSet1:Option[PathsSet],pathSet2:Option[PathsSet])(implicit num:Numeric[ED]): PathsSet ={
(pathSet1 :: pathSet2 :: Nil).flatten[PathsSet].reduce[PathsSet]{
case ((edge1,set1),(edge2,set2))=>
num.compare(edge1,edge2).signum match{
case 0=> (edge1,set1++set2)
case 1=>(edge2,set2)
case -1=>(edge1,set1)
}
}
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:47,代码来源:WithPathProcessor.scala
示例7: NOInitBFSPredicate
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.predicate
import ml.sparkling.graph.operators.algorithms.bfs.predicate.BFSPredicate
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.NOVertex
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.NOMessage
import org.apache.spark.graphx.VertexId
class NOInitBFSPredicate extends BFSPredicate[NOVertex, List[NOMessage[VertexId]]] {
override def getInitialData(vertexId: VertexId, attr: NOVertex): (VertexId) => NOVertex =
(id: VertexId) => if (id == vertexId) attr.setParent(id) else attr
override def applyMessages(vertexId: VertexId, vertex: NOVertex, message: List[NOMessage[VertexId]]): NOVertex =
if (vertex.isCompleted) vertex else updateVertex(vertex, message)
def updateVertex(vertex: NOVertex, messages: List[NOMessage[VertexId]]) = {
val parent = extractParrent(vertex, messages)
val succ = extractSuccessors(vertex, messages)
vertex.setPredecessorAndSuccessors(parent, succ)
}
def extractParrent(vertex: NOVertex, messages: List[NOMessage[VertexId]]) = {
vertex.pred match {
case Some(pred) => vertex.pred
case None =>
val expandMsg = messages.filter(_.isExpand).map(_.content)
expandMsg.headOption
}
}
def extractSuccessors(vertex: NOVertex, messages: List[NOMessage[VertexId]]) =
vertex.succ match {
case Some(arr) => vertex.succ
case None =>
val confirmMsg = messages.filter(_.isConfirm).map(_.content)
if (confirmMsg.nonEmpty) Some(confirmMsg.toArray) else None
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:41,代码来源:NOInitBFSPredicate.scala
示例8: NOVertex
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.DFSPointer
import org.apache.spark.graphx.VertexId
class NOVertex(val vertexId: VertexId,
val bfsMap: Map[VertexId, NOBFSVertex],
val pred: Option[VertexId],
val succ: Option[Array[VertexId]],
val dfsPointer: Option[DFSPointer],
val bc: Double) extends Serializable {
def setParent(idParent: VertexId) = NOVertex(vertexId, bfsMap, Some(idParent), succ, dfsPointer, bc)
def setPredecessorAndSuccessors(newPred: Option[VertexId], newSucc: Option[Array[VertexId]]) =
NOVertex(vertexId, bfsMap, newPred, newSucc, dfsPointer, bc)
val isCompleted = pred.nonEmpty && succ.nonEmpty
val leaf = succ.isEmpty
lazy val bfsRoot = bfsMap.contains(vertexId)
lazy val lowestSucc = succ.getOrElse(Array.empty).sorted.headOption
lazy val eccentricity = if (bfsMap.isEmpty) 0 else bfsMap.map({ case (id, v) => v.distance}).max
def withDfsPointer(pointer: Option[DFSPointer]) =
NOVertex(vertexId, bfsMap, pred, succ, pointer, bc)
def update(bfsMap: Map[VertexId, NOBFSVertex] = bfsMap, succ: Option[Array[VertexId]] = succ, dfsPointer: Option[DFSPointer] = dfsPointer, bcInc: Double = 0) =
NOVertex(vertexId, bfsMap, pred, succ, dfsPointer, bc + bcInc)
}
object NOVertex extends Serializable {
def apply(vertexId: VertexId,
bfsMap: Map[VertexId, NOBFSVertex] = Map.empty,
pred: Option[VertexId] = None,
succ: Option[Array[VertexId]] = None,
dfsPointer: Option[DFSPointer] = None,
bc: Double = .0): NOVertex = new NOVertex(vertexId, bfsMap, pred, succ, dfsPointer, bc)
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:NOVertex.scala
示例9: NOInitBFSProcessor
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.processor
import ml.sparkling.graph.operators.algorithms.bfs.processor.BFSProcessor
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.NOVertex
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.{BFSConfirmMessage, BFSExpandMessage, NOMessage}
import org.apache.spark.graphx.{EdgeTriplet, VertexId}
class NOInitBFSProcessor[ED] extends BFSProcessor[NOVertex, ED, List[NOMessage[VertexId]]] {
override def initialMessage: List[NOMessage[VertexId]] = List.empty
override def mergeMessages(msg1: List[NOMessage[VertexId]], msg2: List[NOMessage[VertexId]]): List[NOMessage[VertexId]] = {
val allMessages = msg1 ++ msg2
val expandMessageList = allMessages.filter(_.isExpand)
val expandMessage = expandMessageList.headOption
val succMessages = allMessages.filter(_.isConfirm)
expandMessage match {
case Some(m) => succMessages :+ m
case None => succMessages
}
}
override def sendMessage(triplet: EdgeTriplet[NOVertex, ED]): Iterator[(VertexId, List[NOMessage[VertexId]])] = {
def createExpandMsg(dstId: VertexId) = {
val dstAttr = triplet.vertexAttr(dstId)
val srcAttr = triplet.otherVertexAttr(dstId)
if (dstAttr.pred.isEmpty && srcAttr.pred.nonEmpty) Iterator((dstId, List(BFSExpandMessage(triplet.otherVertexId(dstId))))) else Iterator.empty
}
def createConfirmMsg(dstId: VertexId) = {
val dstAttr = triplet.vertexAttr(dstId)
val srcAttr = triplet.otherVertexAttr(dstId)
if (!dstAttr.isCompleted && srcAttr.pred.exists(_ == dstId)) Iterator((dstId, List(BFSConfirmMessage(triplet.otherVertexId(dstId))))) else Iterator.empty
}
val confirmMsg = createConfirmMsg(triplet.srcId) ++ createConfirmMsg(triplet.dstId)
val expandMsg = createExpandMsg(triplet.srcId) ++ createExpandMsg(triplet.dstId)
confirmMsg ++ expandMsg
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:NOInitBFSProcessor.scala
示例10: CFBCFlow
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.flow.struct
import org.apache.spark.graphx.VertexId
class CFBCFlow(val src: VertexId, val dst: VertexId, val potential: Double, val completed: Boolean, val aliveThrough: Int) extends Serializable {
def supplyValue(vertexId: VertexId) = vertexId match {
case src => 1
case dst => -1
case _ => 0
}
val key = (src, dst)
val removable = completed && aliveThrough <= 0
def countdownVitality = if (aliveThrough > 0) CFBCFlow(src, dst, potential, completed, aliveThrough - 1) else this
}
object CFBCFlow extends Serializable {
def apply(src: VertexId,
dst: VertexId,
potential: Double = 1.0,
completed: Boolean = false,
aliveThrough: Int = 3
): CFBCFlow = new CFBCFlow(src, dst, potential, completed, aliveThrough)
def updatePotential(flow: CFBCFlow, newPotential: Double, eps: Double = 0.0) = {
val completed = Math.abs(flow.potential - newPotential) > eps
CFBCFlow(flow.src, flow.dst, newPotential, completed, flow.aliveThrough)
}
def empty(key: (VertexId, VertexId)) = key match { case (src, dst) => CFBCFlow(src, dst, 0.0) }
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:35,代码来源:CFBCFlow.scala
示例11: CFBCVertex
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.flow.struct
import org.apache.spark.graphx.VertexId
class CFBCVertex(
val id: VertexId,
val degree: Int,
val bc: Double,
val sampleVertices: Array[VertexId],
val flows: (Array[CFBCFlow], Iterable[CFBCNeighbourFlow]),
val processedFlows: Int) extends Serializable {
lazy val relatedFlows = vertexFlows.filter(f => f.dst == id || f.src == id)
lazy val availableSamples = sampleVertices
lazy val vertexPhi = vertexFlows.count(_.src == id)
lazy val flowsMap = vertexFlows.map(f => ((f.src, f.dst), f)).toMap
val (vertexFlows, neighboursFlows) = flows
def isFinalized(k: Int) = sampleVertices.isEmpty || processedFlows >= k
def getFlow(key: (VertexId, VertexId)) = flowsMap.getOrElse(key, CFBCFlow.empty(key))
def updateBC(currentFlowing: Double) = {
val newBC = (processedFlows * bc + currentFlowing) / (processedFlows + 1)
new CFBCVertex(id, degree, newBC, sampleVertices, flows, processedFlows + 1)
}
def updateBC(currentFlowing: Seq[Double]) = {
val newBC = if (currentFlowing.isEmpty) bc else (processedFlows * bc + currentFlowing.sum) / (processedFlows + currentFlowing.length)
new CFBCVertex(id, degree, newBC, sampleVertices, flows, processedFlows + currentFlowing.length)
}
def addNewFlow(flow: CFBCFlow) =
new CFBCVertex(id, degree, bc, sampleVertices.filterNot(_ == flow.dst), (vertexFlows :+ flow, neighboursFlows), processedFlows)
def updateFlows(fls: Array[CFBCFlow]) =
new CFBCVertex(id, degree, bc, sampleVertices, (fls, neighboursFlows), processedFlows)
def removeFlows(toRemove: Seq[CFBCFlow]) = {
val newFlows = vertexFlows.diff(toRemove).map(_.countdownVitality)
new CFBCVertex(id, degree, bc, sampleVertices, (newFlows, neighboursFlows), processedFlows)
}
def applyNeighbourFlows(nbhFlows: Iterable[CFBCNeighbourFlow]) =
new CFBCVertex(id, degree, bc, sampleVertices, (vertexFlows, nbhFlows), processedFlows)
}
object CFBCVertex extends Serializable {
def apply(id: VertexId,
degree: Int,
bc: Double = 0.0,
sampleVertices: Array[VertexId] = Array.empty,
flows: (Array[CFBCFlow], Iterable[CFBCNeighbourFlow]) = (Array.empty, Iterable.empty)
): CFBCVertex = new CFBCVertex(id, degree, bc, sampleVertices, flows, 0)
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:59,代码来源:CFBCVertex.scala
示例12: EdmondsMessage
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.edmonds.struct.messages
import org.apache.spark.graphx.VertexId
class EdmondsMessage(val preds: List[VertexId], val sigma: Int, val depth: Int) extends Serializable {
def merge(other: EdmondsMessage): EdmondsMessage = {
require(depth == other.depth)
EdmondsMessage(preds ++ other.preds, sigma + other.sigma, depth)
}
}
object EdmondsMessage extends Serializable {
def apply(preds: List[VertexId], sigma: Int, depth: Int): EdmondsMessage =
new EdmondsMessage(preds, sigma, depth)
def empty = apply(List.empty, -1, -1)
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:19,代码来源:EdmondsMessage.scala
示例13: PSCANBasedPartitioning
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.partitioning
import ml.sparkling.graph.api.operators.algorithms.community.CommunityDetection.ComponentID
import ml.sparkling.graph.operators.algorithms.community.pscan.PSCAN
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Graph, VertexId}
import scala.collection.mutable
import scala.reflect.ClassTag
object PSCANBasedPartitioning {
@transient
val logger=Logger.getLogger(PSCANBasedPartitioning.getClass())
def partitionGraphBy[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],numberOfPartitions:Int)(implicit sc:SparkContext): Graph[VD, ED] ={
val (numberOfCommunities: VertexId, coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int, strategy: ByComponentIdPartitionStrategy) = buildPartitioningStrategy(graph, numberOfPartitions)
logger.info(s"Partitioning graph using coarsed map with ${coarsedVertexMap.size} entries and ${coarsedNumberOfPartitions} partitions (before ${numberOfCommunities})")
val out=new CustomGraphPartitioningImplementation[VD,ED](graph).partitionBy(strategy)
out.edges.foreachPartition((_)=>{})
graph.unpersist(false)
out
}
def buildPartitioningStrategy[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
val (numberOfCommunities: VertexId, coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int) = precomputePartitions(graph, numberOfPartitions)
val strategy = ByComponentIdPartitionStrategy(coarsedVertexMap, coarsedNumberOfPartitions)
(numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions, strategy)
}
def precomputePartitions[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
logger.info("Computing components using PSCAN")
val (communities, numberOfCommunities): (Graph[ComponentID, ED], VertexId) = PSCAN.computeConnectedComponentsUsing(graph, numberOfPartitions)
logger.info("Components computed!")
communities.unpersist(false)
val (coarsedVertexMap, coarsedNumberOfPartitions) = ParallelPartitioningUtils.coarsePartitions(numberOfPartitions, numberOfCommunities, communities.vertices)
(numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:PSCANBasedPartitioning.scala
示例14: FindInfluencer
//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx.example
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FindInfluencer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Twittter Influencer").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
sparkContext.setLogLevel("ERROR")
val twitterData = sparkContext.textFile("src/main/resources/twitter-graph-data.txt")
val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(0).replace("((", "")
val id = arr(1).replace(")", "")
(id.toLong, user)
}
val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
val user = arr(2).replace("(", "")
val id = arr(3).replace("))", "")
(id.toLong, user)
}
val vertices = followeeVertices.union(followerVertices)
val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>
val followeeId = arr(1).replace(")", "").toLong
val followerId = arr(3).replace("))", "").toLong
Edge(followeeId, followerId, "follow")
}
val defaultUser = ("")
val graph = Graph(vertices, edges, defaultUser)
val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>
attr + "," + msg,
triplet => Iterator((triplet.srcId, triplet.dstAttr)),
(a, b) => (a + "," + b))
val lengthRDD = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {
override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
Ordering[Int].compare(x._2, y._2)
})
val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head
println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")
sparkContext.stop()
}
}
开发者ID:knoldus,项目名称:spark-graphx-twitter,代码行数:54,代码来源:FindInfluencer.scala
示例15: TestData
//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx
import org.apache.spark.graphx.{ Edge, Graph, VertexId }
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext }
import org.scalatest.FunSuite
object TestData {
val sparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val users: RDD[(VertexId, (String, String))] =
sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] =
sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing")
}
class PropertyGraphTest extends FunSuite {
import com.knoldus.spark.graphx.TestData._
val propertyGraph = new PropertyGraph(sparkContext)
test("property graph returns graph") {
val graph = propertyGraph.getGraph(users, relationships, defaultUser)
assert(graph.edges.count() === 4)
}
test("property graph returns triplets in a graph") {
val graph = propertyGraph.getTripletView(Graph(users, relationships, defaultUser))
assert(graph.count() === 4)
}
test("property graph returns indegree of a graph") {
val graph = propertyGraph.getInDegree(Graph(users, relationships, defaultUser))
assert(graph.count() === 3)
}
test("property graph returns subgraph of a graph") {
val users: RDD[(VertexId, (String, String))] =
sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
val relationships: RDD[Edge[String]] =
sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"),
Edge(5L, 0L, "colleague")))
val defaultUser = ("John Doe", "Missing")
val subGraph = propertyGraph.getSubGraph(Graph(users, relationships, defaultUser), { (id: Long, attr: (String, String)) => attr._2 != "Missing" })
assert(subGraph.edges.count() === 4)
}
}
开发者ID:knoldus,项目名称:spark-graphx-cassandra,代码行数:51,代码来源:PropertyGraphTest.scala
注:本文中的org.apache.spark.graphx.VertexId类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论