• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala VertexId类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.graphx.VertexId的典型用法代码示例。如果您正苦于以下问题:Scala VertexId类的具体用法?Scala VertexId怎么用?Scala VertexId使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了VertexId类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GraphProviders

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.csv.providers

import ml.sparkling.graph.loaders.csv.types.Types
import ml.sparkling.graph.loaders.csv.types.Types.ToVertexId
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession;
import scala.reflect.ClassTag


object GraphProviders {
  val defaultStorageLevel=StorageLevel.MEMORY_ONLY
  def simpleGraphBuilder[VD: ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
                                                     vertexProvider: Row => Seq[(VertexId, VD)],
                                                     edgeProvider: Row => Seq[Edge[ED]],
                                                     edgeStorageLevel: StorageLevel = defaultStorageLevel,
                                                     vertexStorageLevel: StorageLevel =defaultStorageLevel)
                                                    (dataFrame: DataFrame): Graph[VD, ED] = {

    def mapRows[MT: ClassTag](mappingFunction: (Row) => Seq[MT]): RDD[MT] = {
      dataFrame.rdd.mapPartitionsWithIndex((id, rowIterator) => {
        rowIterator.flatMap { case row => mappingFunction(row) }
      })
    }

    val vertices: RDD[(VertexId, VD)] = mapRows(vertexProvider)
    val edges: RDD[Edge[ED]] = mapRows(edgeProvider)
    defaultVertex match{
      case None => Graph(vertices,edges,edgeStorageLevel=edgeStorageLevel,vertexStorageLevel=vertexStorageLevel)
      case Some(defaultVertexValue)=> Graph(vertices,edges,defaultVertexValue,edgeStorageLevel,vertexStorageLevel)
    }

  }

  def indexedGraphBuilder[VD:ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
                                                      vertexProvider: (Row, ToVertexId[VD]) => Seq[(VertexId, VD)],
                                                      edgeProvider: (Row, ToVertexId[VD]) => Seq[Edge[ED]],
                                                      columnsToIndex: Seq[Int],
                                                      edgeStorageLevel: StorageLevel = defaultStorageLevel,
                                                      vertexStorageLevel: StorageLevel = defaultStorageLevel)
                                                     (dataFrame: DataFrame): Graph[VD, ED] = {
    val index = dataFrame.rdd.flatMap(row => columnsToIndex.map(row(_))).distinct().zipWithUniqueId().collect().toMap
    def extractIdFromIndex(vertex: VD) = index(vertex)
    simpleGraphBuilder(defaultVertex,
      vertexProvider(_: Row, extractIdFromIndex _),
      edgeProvider(_: Row, extractIdFromIndex _),
      edgeStorageLevel,
      vertexStorageLevel)(dataFrame)

  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:54,代码来源:GraphProviders.scala


示例2: ShortestPathLengthsFromCSV

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.examples

import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes._
import ml.sparkling.graph.operators.algorithms.shortestpaths.ShortestPathsAlgorithm
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils.FastUtilWithDistance.DataMap
import ml.sparkling.graph.operators.predicates.AllPathPredicate
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Graph, VertexId}

import scala.collection.JavaConversions._

object ShortestPathLengthsFromCSV extends ExampleApp {
def body()={
  val shortestPaths =if(bucketSize == -1l)
    ShortestPathsAlgorithm.computeShortestPathsLengths(partitionedGraph,AllPathPredicate,treatAsUndirected)
  else
    ShortestPathsAlgorithm.computeShortestPathsLengthsIterative(partitionedGraph,(g:Graph[_,_])=>bucketSize,treatAsUndirected)
  val size: Broadcast[VertexId] =ctx.broadcast(partitionedGraph.numVertices)
  partitionedGraph.outerJoinVertices(shortestPaths.vertices)(Util.dataTransformFunction(size) _).vertices.values.saveAsTextFile(out)
  ctx.stop()
}
}


private object Util{
  def dataTransformFunction(size: Broadcast[VertexId])(vId: VertexId,oldValue: String,pathsOption: Option[_ >: DataMap <: JMap[JLong, JDouble]])={
    pathsOption.flatMap((paths)=>{
      var entries=paths.entrySet().toList.sortBy(_.getKey)
      val out=new StringBuilder()
      out++=s"${oldValue},"
      var a = 0l
      while (a < size.value) {
        if (entries.size > 0 && a == entries.head.getKey) {
          out ++= s"${entries.head.getValue},"
          entries = entries.drop(1)
        }
        else {
          out ++= "0,"
        }
        a += 1l
      }
      out.setLength(out.length - 1)
      Option(out.toString())
    }).getOrElse(oldValue)
  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:48,代码来源:ShortestPathLengthsFromCSV.scala


示例3: PSCANConnectedComponents

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.community.pscan

import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}


class PSCANConnectedComponents(maxWeight:Double) extends Serializable{


  def run[VD,ED](graph:Graph[VertexId,Double]):Graph[VertexId,Double]={
    val initialMessage = Long.MaxValue
    Pregel(graph, initialMessage)(
    vprog = (_, attr, msg) => math.min(attr, msg),
    sendMsg = sendMessage,
    mergeMsg = (a, b) => math.min(a, b))
  }

  def sendMessage(edge: EdgeTriplet[VertexId, Double]): Iterator[(VertexId, VertexId)] = {
    if(edge.attr > maxWeight){
      if(edge.srcAttr<edge.dstAttr){
        Iterator((edge.dstId,edge.srcAttr))
      }else if(edge.dstAttr<edge.srcAttr){
        Iterator((edge.srcId,edge.dstAttr))
      }else{
        Iterator.empty
      }
    }else{
      Iterator.empty
    }
  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:31,代码来源:PSCANConnectedComponents.scala


示例4: FastUtilWithDistance

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils

import it.unimi.dsi.fastutil.longs._
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes._
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.PathProcessor
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils.FastUtilWithDistance.DataMap
import org.apache.spark.graphx.VertexId

import scala.collection.JavaConversions._



class FastUtilWithDistance[VD, ED]() extends PathProcessor[VD, ED, DataMap] {
  def EMPTY_CONTAINER = getNewContainerForPaths()
  def getNewContainerForPaths() = {
   new DataMap(16,0.5f)
  }

  def putNewPath(map: DataMap, to: VertexId, weight: ED)(implicit num: Numeric[ED]): DataMap = {
    val out=map.asInstanceOf[DataMap].clone()
    out.put(to, num.toDouble(weight))
    out
  }

  def mergePathContainers(map1: DataMap, map2: DataMap)(implicit num: Numeric[ED]):DataMap = {
    val out=map1.clone()
    map2.foreach{case (key,inValue)=>{
      val map1Value: JDouble =Option(map1.get(key)).getOrElse(inValue)
      val map2Value: JDouble =  inValue
      val value: JDouble = min(map1Value, map2Value);
      out.put(key, value)
    }}
    out
  }

  def min(d1:JDouble,d2:JDouble):JDouble={
    if(d1<d2){
      d1
    }else{
      d2
    }
  }

  def extendPaths(targetVertexId:VertexId,map: DataMap, vertexId: VertexId, distance: ED)(implicit num: Numeric[ED]):DataMap = {
    val out=map.clone()
    val toAdd=num.toDouble(distance)
    map.keySet().foreach{ (key: JLong) => {
        out.addTo(key, toAdd)
    }}
    out.remove(targetVertexId)
    out
  }

}

object FastUtilWithDistance{
  type DataMap=Long2DoubleOpenHashMap
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:60,代码来源:FastUtilWithDistance.scala


示例5: SingleVertexProcessor

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors

import org.apache.spark.graphx.VertexId


class SingleVertexProcessor[VD, ED](computedVertexId:VertexId) extends PathProcessor[VD, ED, Double] {
  def EMPTY_CONTAINER = 0d

  override def getNewContainerForPaths(): Double = 0d

  override def extendPaths(targetVertexId: VertexId, currentValue: Double, vertexId: VertexId, distance: ED)(implicit num: Numeric[ED]): Double = {
    if(targetVertexId==computedVertexId || currentValue == 0)
      0d
    else
      currentValue+num.toDouble(distance)
  }

  override def mergePathContainers(map1: Double, map2: Double)(implicit num: Numeric[ED]): Double = {
    (map1,map2) match{
      case (0d,_)=> map2
      case (_,0d)=> map1
      case _ =>Math.min(map1,map2)
    }
  }

  override def putNewPath(map: Double, to: VertexId, weight: ED)(implicit num: Numeric[ED]): Double = {
   num.toDouble(weight)
  }

} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:31,代码来源:SingleVertexProcessor.scala


示例6: WithPathProcessor

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors

import org.apache.spark.graphx.VertexId



class WithPathProcessor[VD,ED]() extends  PathProcessor[VD,ED,Map[VertexId,(ED,Set[List[VertexId]])]]{
  private type PathsSet=(ED,Set[List[VertexId]])
  private type PathsMap=Map[VertexId,PathsSet]
  def EMPTY_CONTAINER=Map.empty[VertexId,PathsSet]

  def getNewContainerForPaths() ={
    EMPTY_CONTAINER
  }

  def putNewPath(map:PathsMap,to:VertexId,weight:ED)(implicit num:Numeric[ED]): PathsMap={
    (map + (to -> (weight,Set(to::Nil)))).map(identity)
  }

  def mergePathContainers(map1:PathsMap,map2:PathsMap)(implicit num:Numeric[ED]):PathsMap={
    (map1.keySet ++ map2.keySet).map(vId=>(vId,mergePathSets(map1.get(vId),map2.get(vId)))).toMap.map(identity)
  }


  def extendPaths(targetVertexId:VertexId,map:PathsMap,vertexId:VertexId,distance:ED)(implicit num:Numeric[ED]): PathsMap ={
   map.filterKeys(_!=targetVertexId).mapValues(extendPathsSet(_,vertexId,distance)).map(identity)
  }

  private def extendPathsSet(pathSet:PathsSet,vertexId:VertexId,distance:ED)(implicit num:Numeric[ED]):PathsSet={
    pathSet match{
      case (edge,set) =>  (num.plus(distance,edge),set.map(vertexId :: _))
    }

  }

  private def mergePathSets(pathSet1:Option[PathsSet],pathSet2:Option[PathsSet])(implicit num:Numeric[ED]): PathsSet ={
    (pathSet1 :: pathSet2 :: Nil).flatten[PathsSet].reduce[PathsSet]{
      case ((edge1,set1),(edge2,set2))=>
        num.compare(edge1,edge2).signum match{
          case 0=> (edge1,set1++set2)
          case 1=>(edge2,set2)
          case -1=>(edge1,set1)
        }
    }
  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:47,代码来源:WithPathProcessor.scala


示例7: NOInitBFSPredicate

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.predicate

import ml.sparkling.graph.operators.algorithms.bfs.predicate.BFSPredicate
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.NOVertex
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.NOMessage
import org.apache.spark.graphx.VertexId


class NOInitBFSPredicate extends BFSPredicate[NOVertex, List[NOMessage[VertexId]]] {

  override def getInitialData(vertexId: VertexId, attr: NOVertex): (VertexId) => NOVertex =
    (id: VertexId) => if (id == vertexId) attr.setParent(id) else attr

  override def applyMessages(vertexId: VertexId, vertex: NOVertex, message: List[NOMessage[VertexId]]): NOVertex =
    if (vertex.isCompleted) vertex else updateVertex(vertex, message)


  def updateVertex(vertex: NOVertex, messages: List[NOMessage[VertexId]]) = {
    val parent = extractParrent(vertex, messages)
    val succ = extractSuccessors(vertex, messages)
    vertex.setPredecessorAndSuccessors(parent, succ)
  }

  def extractParrent(vertex: NOVertex, messages: List[NOMessage[VertexId]]) = {
    vertex.pred match {
      case Some(pred) => vertex.pred
      case None =>
        val expandMsg = messages.filter(_.isExpand).map(_.content)
        expandMsg.headOption
    }
  }

  def extractSuccessors(vertex: NOVertex, messages: List[NOMessage[VertexId]]) =
    vertex.succ match {
      case Some(arr) => vertex.succ
      case None =>
        val confirmMsg = messages.filter(_.isConfirm).map(_.content)
        if (confirmMsg.nonEmpty) Some(confirmMsg.toArray) else None
    }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:41,代码来源:NOInitBFSPredicate.scala


示例8: NOVertex

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct

import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.DFSPointer
import org.apache.spark.graphx.VertexId


class NOVertex(val vertexId: VertexId,
               val bfsMap: Map[VertexId, NOBFSVertex],
               val pred: Option[VertexId],
               val succ: Option[Array[VertexId]],
               val dfsPointer: Option[DFSPointer],
               val bc: Double) extends Serializable {
  def setParent(idParent: VertexId) = NOVertex(vertexId, bfsMap, Some(idParent), succ, dfsPointer, bc)

  def setPredecessorAndSuccessors(newPred: Option[VertexId], newSucc: Option[Array[VertexId]]) =
    NOVertex(vertexId, bfsMap, newPred, newSucc, dfsPointer, bc)

  val isCompleted = pred.nonEmpty && succ.nonEmpty

  val leaf = succ.isEmpty

  lazy val bfsRoot = bfsMap.contains(vertexId)

  lazy val lowestSucc = succ.getOrElse(Array.empty).sorted.headOption

  lazy val eccentricity = if (bfsMap.isEmpty) 0 else bfsMap.map({ case (id, v) => v.distance}).max

  def withDfsPointer(pointer: Option[DFSPointer]) =
    NOVertex(vertexId, bfsMap, pred, succ, pointer, bc)

  def update(bfsMap: Map[VertexId, NOBFSVertex] = bfsMap, succ: Option[Array[VertexId]] = succ, dfsPointer: Option[DFSPointer] = dfsPointer, bcInc: Double = 0) =
    NOVertex(vertexId, bfsMap, pred, succ, dfsPointer, bc + bcInc)
}

object NOVertex extends Serializable {
  def apply(vertexId: VertexId,
            bfsMap: Map[VertexId, NOBFSVertex] = Map.empty,
            pred: Option[VertexId] = None,
            succ: Option[Array[VertexId]] = None,
            dfsPointer: Option[DFSPointer] = None,
            bc: Double = .0): NOVertex = new NOVertex(vertexId, bfsMap, pred, succ, dfsPointer, bc)
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:NOVertex.scala


示例9: NOInitBFSProcessor

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua.processor

import ml.sparkling.graph.operators.algorithms.bfs.processor.BFSProcessor
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.NOVertex
import ml.sparkling.graph.operators.measures.vertex.betweenness.hua.struct.messages.{BFSConfirmMessage, BFSExpandMessage, NOMessage}
import org.apache.spark.graphx.{EdgeTriplet, VertexId}


class NOInitBFSProcessor[ED] extends BFSProcessor[NOVertex, ED, List[NOMessage[VertexId]]] {
  override def initialMessage: List[NOMessage[VertexId]] = List.empty

  override def mergeMessages(msg1: List[NOMessage[VertexId]], msg2: List[NOMessage[VertexId]]): List[NOMessage[VertexId]] = {
    val allMessages = msg1 ++ msg2
    val expandMessageList = allMessages.filter(_.isExpand)
    val expandMessage = expandMessageList.headOption
    val succMessages = allMessages.filter(_.isConfirm)

    expandMessage match {
      case Some(m) => succMessages :+ m
      case None => succMessages
    }
  }

  override def sendMessage(triplet: EdgeTriplet[NOVertex, ED]): Iterator[(VertexId, List[NOMessage[VertexId]])] = {

    def createExpandMsg(dstId: VertexId) = {
      val dstAttr = triplet.vertexAttr(dstId)
      val srcAttr = triplet.otherVertexAttr(dstId)
      if (dstAttr.pred.isEmpty && srcAttr.pred.nonEmpty) Iterator((dstId, List(BFSExpandMessage(triplet.otherVertexId(dstId))))) else Iterator.empty
    }

    def createConfirmMsg(dstId: VertexId) = {
      val dstAttr = triplet.vertexAttr(dstId)
      val srcAttr = triplet.otherVertexAttr(dstId)
      if (!dstAttr.isCompleted && srcAttr.pred.exists(_ == dstId)) Iterator((dstId, List(BFSConfirmMessage(triplet.otherVertexId(dstId))))) else Iterator.empty
    }

    val confirmMsg = createConfirmMsg(triplet.srcId) ++ createConfirmMsg(triplet.dstId)
    val expandMsg = createExpandMsg(triplet.srcId) ++ createExpandMsg(triplet.dstId)
    confirmMsg ++ expandMsg
  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:NOInitBFSProcessor.scala


示例10: CFBCFlow

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.flow.struct

import org.apache.spark.graphx.VertexId


class CFBCFlow(val src: VertexId, val dst: VertexId, val potential: Double, val completed: Boolean, val aliveThrough: Int) extends Serializable {
  def supplyValue(vertexId: VertexId) = vertexId match {
    case src => 1
    case dst => -1
    case _ => 0
  }

  val key = (src, dst)

  val removable = completed && aliveThrough <= 0

  def countdownVitality = if (aliveThrough > 0) CFBCFlow(src, dst, potential, completed, aliveThrough - 1) else this
}

object CFBCFlow extends Serializable {
  def apply(src: VertexId,
            dst: VertexId,
            potential: Double = 1.0,
            completed: Boolean = false,
            aliveThrough: Int = 3
           ): CFBCFlow = new CFBCFlow(src, dst, potential, completed, aliveThrough)

  def updatePotential(flow: CFBCFlow, newPotential: Double, eps: Double = 0.0) = {
    val completed = Math.abs(flow.potential - newPotential) > eps
    CFBCFlow(flow.src, flow.dst, newPotential, completed, flow.aliveThrough)
  }

  def empty(key: (VertexId, VertexId)) = key match { case (src, dst) =>  CFBCFlow(src, dst, 0.0) }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:35,代码来源:CFBCFlow.scala


示例11: CFBCVertex

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.flow.struct

import org.apache.spark.graphx.VertexId


class CFBCVertex(
                  val id: VertexId,
                  val degree: Int,
                  val bc: Double,
                  val sampleVertices: Array[VertexId],
                  val flows: (Array[CFBCFlow], Iterable[CFBCNeighbourFlow]),
                  val processedFlows: Int) extends Serializable {
  lazy val relatedFlows = vertexFlows.filter(f => f.dst == id || f.src == id)
  lazy val availableSamples = sampleVertices

  lazy val vertexPhi = vertexFlows.count(_.src == id)

  lazy val flowsMap = vertexFlows.map(f => ((f.src, f.dst), f)).toMap

  val (vertexFlows, neighboursFlows) = flows

  def isFinalized(k: Int) = sampleVertices.isEmpty || processedFlows >= k

  def getFlow(key: (VertexId, VertexId)) = flowsMap.getOrElse(key, CFBCFlow.empty(key))

  def updateBC(currentFlowing: Double) = {
    val newBC = (processedFlows * bc + currentFlowing) / (processedFlows + 1)
    new CFBCVertex(id, degree, newBC, sampleVertices, flows, processedFlows + 1)
  }

  def updateBC(currentFlowing: Seq[Double]) = {
    val newBC = if (currentFlowing.isEmpty) bc else (processedFlows * bc + currentFlowing.sum) / (processedFlows + currentFlowing.length)
    new CFBCVertex(id, degree, newBC, sampleVertices, flows, processedFlows + currentFlowing.length)
  }

  def addNewFlow(flow: CFBCFlow) =
    new CFBCVertex(id, degree, bc, sampleVertices.filterNot(_ == flow.dst), (vertexFlows :+ flow, neighboursFlows), processedFlows)

  def updateFlows(fls: Array[CFBCFlow]) =
    new CFBCVertex(id, degree, bc, sampleVertices, (fls, neighboursFlows), processedFlows)

  def removeFlows(toRemove: Seq[CFBCFlow]) = {
    val newFlows = vertexFlows.diff(toRemove).map(_.countdownVitality)
    new CFBCVertex(id, degree, bc, sampleVertices, (newFlows, neighboursFlows), processedFlows)
  }

  def applyNeighbourFlows(nbhFlows: Iterable[CFBCNeighbourFlow]) =
    new CFBCVertex(id, degree, bc, sampleVertices, (vertexFlows, nbhFlows), processedFlows)
}

object CFBCVertex extends Serializable {
  def apply(id: VertexId,
            degree: Int,
            bc: Double = 0.0,
            sampleVertices: Array[VertexId] = Array.empty,
            flows: (Array[CFBCFlow], Iterable[CFBCNeighbourFlow]) = (Array.empty, Iterable.empty)
           ): CFBCVertex = new CFBCVertex(id, degree, bc, sampleVertices, flows, 0)
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:59,代码来源:CFBCVertex.scala


示例12: EdmondsMessage

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.edmonds.struct.messages

import org.apache.spark.graphx.VertexId


class EdmondsMessage(val preds: List[VertexId], val sigma: Int, val depth: Int) extends Serializable {
  def merge(other: EdmondsMessage): EdmondsMessage = {
    require(depth == other.depth)
    EdmondsMessage(preds ++ other.preds, sigma + other.sigma, depth)
  }
}

object EdmondsMessage extends Serializable {
  def apply(preds: List[VertexId], sigma: Int, depth: Int): EdmondsMessage =
    new EdmondsMessage(preds, sigma, depth)

  def empty = apply(List.empty, -1, -1)
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:19,代码来源:EdmondsMessage.scala


示例13: PSCANBasedPartitioning

//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.partitioning

import ml.sparkling.graph.api.operators.algorithms.community.CommunityDetection.ComponentID
import ml.sparkling.graph.operators.algorithms.community.pscan.PSCAN
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Graph, VertexId}

import scala.collection.mutable
import scala.reflect.ClassTag


object PSCANBasedPartitioning {

  @transient
  val logger=Logger.getLogger(PSCANBasedPartitioning.getClass())

  def partitionGraphBy[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],numberOfPartitions:Int)(implicit sc:SparkContext): Graph[VD, ED] ={
    val (numberOfCommunities: VertexId,  coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int, strategy: ByComponentIdPartitionStrategy) = buildPartitioningStrategy(graph, numberOfPartitions)
    logger.info(s"Partitioning graph using coarsed map with ${coarsedVertexMap.size} entries and ${coarsedNumberOfPartitions} partitions (before ${numberOfCommunities})")
    val out=new CustomGraphPartitioningImplementation[VD,ED](graph).partitionBy(strategy)
    out.edges.foreachPartition((_)=>{})
    graph.unpersist(false)
    out
  }


  def buildPartitioningStrategy[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
    val (numberOfCommunities: VertexId, coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int) = precomputePartitions(graph, numberOfPartitions)
    val strategy = ByComponentIdPartitionStrategy(coarsedVertexMap, coarsedNumberOfPartitions)
    (numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions, strategy)
  }

  def precomputePartitions[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
    logger.info("Computing components using PSCAN")
    val (communities, numberOfCommunities): (Graph[ComponentID, ED], VertexId) = PSCAN.computeConnectedComponentsUsing(graph, numberOfPartitions)
    logger.info("Components computed!")
    communities.unpersist(false)
    val (coarsedVertexMap, coarsedNumberOfPartitions) = ParallelPartitioningUtils.coarsePartitions(numberOfPartitions, numberOfCommunities, communities.vertices)
    (numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions)
  }
} 
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:PSCANBasedPartitioning.scala


示例14: FindInfluencer

//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx.example

import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindInfluencer {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Twittter Influencer").setMaster("local[*]")
    val sparkContext = new SparkContext(conf)
    sparkContext.setLogLevel("ERROR")

    val twitterData = sparkContext.textFile("src/main/resources/twitter-graph-data.txt")

    val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
      val user = arr(0).replace("((", "")
      val id = arr(1).replace(")", "")
      (id.toLong, user)
    }

    val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map { arr =>
      val user = arr(2).replace("(", "")
      val id = arr(3).replace("))", "")
      (id.toLong, user)
    }

    val vertices = followeeVertices.union(followerVertices)
    val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>
      val followeeId = arr(1).replace(")", "").toLong
      val followerId = arr(3).replace("))", "").toLong
      Edge(followeeId, followerId, "follow")
    }

    val defaultUser = ("")
    val graph = Graph(vertices, edges, defaultUser)

    val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>
      attr + "," + msg,
      triplet => Iterator((triplet.srcId, triplet.dstAttr)),
      (a, b) => (a + "," + b))

    val lengthRDD = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {
      override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
        Ordering[Int].compare(x._2, y._2)
    })

    val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head
    println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")

    sparkContext.stop()
  }
} 
开发者ID:knoldus,项目名称:spark-graphx-twitter,代码行数:54,代码来源:FindInfluencer.scala


示例15: TestData

//设置package包名称以及导入依赖的类
package com.knoldus.spark.graphx

import org.apache.spark.graphx.{ Edge, Graph, VertexId }
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext }
import org.scalatest.FunSuite

object TestData {
  val sparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))

  val users: RDD[(VertexId, (String, String))] =
    sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
  val relationships: RDD[Edge[String]] =
    sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
  val defaultUser = ("John Doe", "Missing")
}

class PropertyGraphTest extends FunSuite {
  import com.knoldus.spark.graphx.TestData._

  val propertyGraph = new PropertyGraph(sparkContext)

  test("property graph returns graph") {
    val graph = propertyGraph.getGraph(users, relationships, defaultUser)
    assert(graph.edges.count() === 4)
  }

  test("property graph returns triplets in a graph") {
    val graph = propertyGraph.getTripletView(Graph(users, relationships, defaultUser))
    assert(graph.count() === 4)
  }

  test("property graph returns indegree of a graph") {
    val graph = propertyGraph.getInDegree(Graph(users, relationships, defaultUser))
    assert(graph.count() === 3)
  }

  test("property graph returns subgraph of a graph") {
    val users: RDD[(VertexId, (String, String))] =
      sparkContext.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
        (4L, ("peter", "student"))))
    val relationships: RDD[Edge[String]] =
      sparkContext.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"),
        Edge(5L, 0L, "colleague")))
    val defaultUser = ("John Doe", "Missing")

    val subGraph = propertyGraph.getSubGraph(Graph(users, relationships, defaultUser), { (id: Long, attr: (String, String)) => attr._2 != "Missing" })
    assert(subGraph.edges.count() === 4)
  }
} 
开发者ID:knoldus,项目名称:spark-graphx-cassandra,代码行数:51,代码来源:PropertyGraphTest.scala



注:本文中的org.apache.spark.graphx.VertexId类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Attribute类代码示例发布时间:2022-05-23
下一篇:
Scala WeakReference类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap