本文整理汇总了Scala中org.apache.spark.graphx.Graph类的典型用法代码示例。如果您正苦于以下问题:Scala Graph类的具体用法?Scala Graph怎么用?Scala Graph使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Graph类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: PCAClustering
//设置package包名称以及导入依赖的类
package graph
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{EdgeDirection, Edge, Graph}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Matrix, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import scala.collection.mutable
class PCAClustering {
def matrixToRDD(sc:SparkContext, m: Matrix): RDD[Vector] = {
val columns = m.toArray.grouped(m.numRows)
val rows = columns.toSeq.transpose // Skip this if you want a column-major RDD.
val vectors = rows.map(row => new DenseVector(row.toArray))
sc.parallelize(vectors)
}
def run(inputGraph: Graph[Any, Any], clusterNum: Int, eigsNum: Int,sc:SparkContext ): Graph[Int, Any] = {
val numNode = inputGraph.numVertices.toInt
val mapping = new mutable.HashMap[Long,Int]()
val revMapping = new mutable.HashMap[Int, Long]()
val verticeIds = inputGraph.vertices.map( u => u._1 ).collect()
for(i<-0 to numNode - 1) {
mapping.put(verticeIds.apply(i), i)
revMapping.put(i, verticeIds.apply(i))
}
//reindex the verteces from 0 to the num of nodes
val nVertices = inputGraph.vertices.map( u=> (mapping.apply(u._1).toLong, u._2))
val nEdges = inputGraph.edges.map(e=> Edge(mapping.apply(e.srcId).toLong, mapping.apply(e.dstId).toLong, e.attr))
val ngraph = Graph(nVertices, nEdges)
val output = ngraph.collectNeighborIds(EdgeDirection.Out)
val spvec = output.mapValues(r => Vectors.sparse( numNode, r.map(e=>e.toInt) , r.map(e=> 1.0/r.length )))
val rows = spvec.map(v=>v._2)
val order = spvec.map(v=>v._1)
val mat = new RowMatrix(rows)
val pc = mat.computePrincipalComponents(eigsNum)
val pcRDD = matrixToRDD(sc, pc)
val clusters = KMeans.train(pcRDD, clusterNum, 100)
val clusterArray = pcRDD.map(p=> clusters.predict(p) ).collect()
val assignedClusters = order.map( o => (o, clusterArray.apply(o.toInt)))
val origVerextRDD = assignedClusters.map{case (vid, value)=> (revMapping.apply(vid.toInt), value)}
Graph(origVerextRDD, inputGraph.edges)
}
}
开发者ID:HPCL,项目名称:GalacticSpark,代码行数:56,代码来源:PCAClustering.scala
示例2: Main
//设置package包名称以及导入依赖的类
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph}
object Main extends App {
val sparkConf = new SparkConf()
.setAppName("Simple Application")
.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
sparkContext.setLogLevel("ERROR")
val vertices = sparkContext.makeRDD(Array((1L, 0), (2L, 0), (3L, 0), (4L, 0), (5L, 0), (6L, 0)))
val edges = sparkContext.makeRDD(Array(
Edge(1L, 2L, ""), Edge(1L, 3L, ""), Edge(1L, 6L, ""),
Edge(2L, 3L, ""), Edge(2L, 4L, ""), Edge(2L, 5L, ""),
Edge(3L, 5L, ""),
Edge(4L, 6L, ""),
Edge(5L, 6L, "")))
val graph = Graph(vertices, edges)
}
开发者ID:Seyzz,项目名称:GraphColoring,代码行数:24,代码来源:Main.scala
示例3: GraphProviders
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.csv.providers
import ml.sparkling.graph.loaders.csv.types.Types
import ml.sparkling.graph.loaders.csv.types.Types.ToVertexId
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession;
import scala.reflect.ClassTag
object GraphProviders {
val defaultStorageLevel=StorageLevel.MEMORY_ONLY
def simpleGraphBuilder[VD: ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: Row => Seq[(VertexId, VD)],
edgeProvider: Row => Seq[Edge[ED]],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel =defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
def mapRows[MT: ClassTag](mappingFunction: (Row) => Seq[MT]): RDD[MT] = {
dataFrame.rdd.mapPartitionsWithIndex((id, rowIterator) => {
rowIterator.flatMap { case row => mappingFunction(row) }
})
}
val vertices: RDD[(VertexId, VD)] = mapRows(vertexProvider)
val edges: RDD[Edge[ED]] = mapRows(edgeProvider)
defaultVertex match{
case None => Graph(vertices,edges,edgeStorageLevel=edgeStorageLevel,vertexStorageLevel=vertexStorageLevel)
case Some(defaultVertexValue)=> Graph(vertices,edges,defaultVertexValue,edgeStorageLevel,vertexStorageLevel)
}
}
def indexedGraphBuilder[VD:ClassTag, ED: ClassTag](defaultVertex: Option[VD]=None,
vertexProvider: (Row, ToVertexId[VD]) => Seq[(VertexId, VD)],
edgeProvider: (Row, ToVertexId[VD]) => Seq[Edge[ED]],
columnsToIndex: Seq[Int],
edgeStorageLevel: StorageLevel = defaultStorageLevel,
vertexStorageLevel: StorageLevel = defaultStorageLevel)
(dataFrame: DataFrame): Graph[VD, ED] = {
val index = dataFrame.rdd.flatMap(row => columnsToIndex.map(row(_))).distinct().zipWithUniqueId().collect().toMap
def extractIdFromIndex(vertex: VD) = index(vertex)
simpleGraphBuilder(defaultVertex,
vertexProvider(_: Row, extractIdFromIndex _),
edgeProvider(_: Row, extractIdFromIndex _),
edgeStorageLevel,
vertexStorageLevel)(dataFrame)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:54,代码来源:GraphProviders.scala
示例4: GraphFromGraphML
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.loaders.graphml
import ml.sparkling.graph.api.loaders.GraphLoading.LoadGraph
import ml.sparkling.graph.loaders.LoaderTest
import ml.sparkling.graph.loaders.graphml.GraphFromGraphML.{GraphML, GraphProperties}
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
class GraphFromGraphML$Test(implicit sc:SparkContext) extends LoaderTest {
"GraphML with standard format" should "be loaded by default" in{
Given("XML in GraphML format path")
val filePath = getClass.getResource("/simpleGraphML.xml").toString
When("Loads graph")
val graph = LoadGraph.from(GraphML(filePath)).load()
Then("Graph should be loaded correctly")
graph.vertices.count() should equal(2)
graph.edges.count() should equal(1)
}
"GraphML with standard format and multiple edges" should "be loaded by default" in{
Given("XML in GraphML format path")
val filePath = getClass.getResource("/simpleGraphML2.xml").toString
When("Loads graph")
val graph = LoadGraph.from(GraphML(filePath)).load()
Then("Graph should be loaded correctly")
graph.vertices.count() should equal(3)
graph.edges.count() should equal(2)
}
"GraphML with vertices attributes" should "be loaded by default" in{
Given("XML in GraphML format path")
val filePath = getClass.getResource("/withValuesGraphML.xml").toString
When("Loads graph")
val graph: Graph[GraphProperties, GraphProperties] = LoadGraph.from(GraphML(filePath)).load()
Then("Graph should be loaded correctly")
graph.vertices.count() should equal(4)
graph.edges.count() should equal(2)
graph.vertices.map{
case (vId,properites)=>(vId,properites("name").asInstanceOf[String])
}.collect().sorted should equal(List((0l,"name0"),(1l,"name1"),(2l,"name2"),(3l,"name3")))
graph.vertices.flatMap{
case (vId,properites)=>properites.get("type").asInstanceOf[Option[String]].map((vId,_))
}.collect().sorted should equal(List((0l,"type0")))
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:52,代码来源:GraphFromGraphML$Test.scala
示例5: ShortestPathLengthsFromCSV
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.examples
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes
import ml.sparkling.graph.api.operators.algorithms.shortestpaths.ShortestPathsTypes._
import ml.sparkling.graph.operators.algorithms.shortestpaths.ShortestPathsAlgorithm
import ml.sparkling.graph.operators.algorithms.shortestpaths.pathprocessors.fastutils.FastUtilWithDistance.DataMap
import ml.sparkling.graph.operators.predicates.AllPathPredicate
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Graph, VertexId}
import scala.collection.JavaConversions._
object ShortestPathLengthsFromCSV extends ExampleApp {
def body()={
val shortestPaths =if(bucketSize == -1l)
ShortestPathsAlgorithm.computeShortestPathsLengths(partitionedGraph,AllPathPredicate,treatAsUndirected)
else
ShortestPathsAlgorithm.computeShortestPathsLengthsIterative(partitionedGraph,(g:Graph[_,_])=>bucketSize,treatAsUndirected)
val size: Broadcast[VertexId] =ctx.broadcast(partitionedGraph.numVertices)
partitionedGraph.outerJoinVertices(shortestPaths.vertices)(Util.dataTransformFunction(size) _).vertices.values.saveAsTextFile(out)
ctx.stop()
}
}
private object Util{
def dataTransformFunction(size: Broadcast[VertexId])(vId: VertexId,oldValue: String,pathsOption: Option[_ >: DataMap <: JMap[JLong, JDouble]])={
pathsOption.flatMap((paths)=>{
var entries=paths.entrySet().toList.sortBy(_.getKey)
val out=new StringBuilder()
out++=s"${oldValue},"
var a = 0l
while (a < size.value) {
if (entries.size > 0 && a == entries.head.getKey) {
out ++= s"${entries.head.getValue},"
entries = entries.drop(1)
}
else {
out ++= "0,"
}
a += 1l
}
out.setLength(out.length - 1)
Option(out.toString())
}).getOrElse(oldValue)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:48,代码来源:ShortestPathLengthsFromCSV.scala
示例6: BasicLinkPredictor
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.link
import ml.sparkling.graph.api.operators.algorithms.link.MeasureBasedLnkPredictor
import ml.sparkling.graph.api.operators.measures.EdgeMeasure
import org.apache.spark.graphx.Graph
import scala.reflect.ClassTag
object BasicLinkPredictor extends MeasureBasedLnkPredictor {
override def predictLinks[V: ClassTag, E: ClassTag, EV: ClassTag, EO: ClassTag](graph: Graph[V, E],
edgeMeasure: EdgeMeasure[EO, EV],
threshold: EO,
treatAsUndirected:Boolean=false)(implicit num: Numeric[EO]) = {
val preprocessedGraph=edgeMeasure.preprocess(graph,treatAsUndirected)
val allPossibleEdges = preprocessedGraph.vertices.cartesian(preprocessedGraph.vertices).filter{
case ((vId1,data1),(vId2,data2))=>vId1!=vId2
}
val edgesAboveThreshold=allPossibleEdges.map{
case ((vId1,data1),(vId2,data2))=>(edgeMeasure.computeValue(data1,data2,treatAsUndirected),(vId1,vId2))
}.filter(t=>num.gt(t._1,threshold)).map(t=>(t._2,0))
val exsistingEdgesTuples=graph.edges.map(e=>((e.srcId,e.dstId),0))
val newEdges=edgesAboveThreshold.leftOuterJoin(exsistingEdgesTuples).filter{
case (k,(_,option))=>option.isEmpty
}.map(_._1)
if(treatAsUndirected){
newEdges.map{
case (vId1,vId2)=>(Math.min(vId1,vId2),Math.max(vId1,vId2))
}.distinct()
}else{
newEdges
}
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:37,代码来源:BasicLinkPredictor.scala
示例7: PSCANConnectedComponents
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.community.pscan
import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
class PSCANConnectedComponents(maxWeight:Double) extends Serializable{
def run[VD,ED](graph:Graph[VertexId,Double]):Graph[VertexId,Double]={
val initialMessage = Long.MaxValue
Pregel(graph, initialMessage)(
vprog = (_, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
}
def sendMessage(edge: EdgeTriplet[VertexId, Double]): Iterator[(VertexId, VertexId)] = {
if(edge.attr > maxWeight){
if(edge.srcAttr<edge.dstAttr){
Iterator((edge.dstId,edge.srcAttr))
}else if(edge.dstAttr<edge.srcAttr){
Iterator((edge.srcId,edge.dstAttr))
}else{
Iterator.empty
}
}else{
Iterator.empty
}
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:31,代码来源:PSCANConnectedComponents.scala
示例8: compute
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.clustering
import it.unimi.dsi.fastutil.longs.LongOpenHashSet
import ml.sparkling.graph.api.operators.measures.{VertexMeasure, VertexMeasureConfiguration}
import ml.sparkling.graph.operators.measures.utils.CollectionsUtils._
import ml.sparkling.graph.operators.measures.utils.{CollectionsUtils, NeighboursUtils}
import ml.sparkling.graph.operators.predicates.AllPathPredicate
import org.apache.spark.graphx.Graph
import scala.reflect.ClassTag
override def compute[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
vertexMeasureConfiguration: VertexMeasureConfiguration[VD,ED])
(implicit num: Numeric[ED]) = {
val firstLevelNeighboursGraph = NeighboursUtils.getWithNeighbours(graph, vertexMeasureConfiguration.treatAsUndirected, AllPathPredicate)
val localClusteringSums=firstLevelNeighboursGraph.aggregateMessages[Double](
sendMsg=edgeContext=>{
def messageCreator=(neighbours1:LongOpenHashSet,neighbours2:LongOpenHashSet)=>{
intersectSize(neighbours1,neighbours2)
}
val message=messageCreator(edgeContext.srcAttr,edgeContext.dstAttr)
edgeContext.sendToSrc(message)
if(vertexMeasureConfiguration.treatAsUndirected){
edgeContext.sendToDst(message)
}
},
mergeMsg=(a,b)=>a+b)
firstLevelNeighboursGraph.outerJoinVertices(localClusteringSums)((vId,oldValue,newValue)=>(newValue.getOrElse(0d),oldValue)).mapVertices {
case (vId, (sum, neighbours)) => {
val possibleConnections = neighbours.size * (neighbours.size - 1)
if (possibleConnections == 0) 0d else sum / possibleConnections
}
}
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:37,代码来源:LocalClustering.scala
示例9: Modularity
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.graph
import ml.sparkling.graph.api.operators.algorithms.community.CommunityDetection.ComponentID
import ml.sparkling.graph.api.operators.measures.{VertexDependentGraphMeasure, GraphIndependentMeasure}
import org.apache.spark.graphx.{EdgeTriplet, VertexRDD, Graph}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object Modularity extends VertexDependentGraphMeasure[Double,ComponentID]{
def compute[V<:ComponentID:ClassTag,E:ClassTag](graph: Graph[V, E]): Double = {
val edgesNum=graph.numEdges.toDouble;
val edgesCounts: RDD[(V, (Int, Int))] = graph.triplets.flatMap(triplet => {
if (triplet.srcAttr == triplet.dstAttr) {
Iterator((triplet.srcAttr, (1, 0)))
} else {
Iterator((triplet.srcAttr, (0, 1)))
}
})
edgesCounts.aggregateByKey((0,0))(
(agg:(Int,Int),data:(Int,Int))=>
(agg,data) match{
case ((a1,b1),(a2,b2))=>(a1+a2,(b1+b2))
},
(agg1:(Int,Int),agg2:(Int,Int))=>{
(agg1,agg2) match{
case ((a1,b1),(a2,b2))=>(a1+a2,(b1+b2))
}
}
).treeAggregate(0.0)(
(agg:Double,data:(V,(Int,Int)))=>{
data match{
case (_,(edgesFull,edgesSome))=>
agg+(edgesFull/edgesNum)-Math.pow(edgesSome/edgesNum,2)
}
},
(agg1,agg2)=>agg1+agg2
)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:45,代码来源:Modularity.scala
示例10: PSCANBasedPartitioning
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.partitioning
import ml.sparkling.graph.api.operators.algorithms.community.CommunityDetection.ComponentID
import ml.sparkling.graph.operators.algorithms.community.pscan.PSCAN
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Graph, VertexId}
import scala.collection.mutable
import scala.reflect.ClassTag
object PSCANBasedPartitioning {
@transient
val logger=Logger.getLogger(PSCANBasedPartitioning.getClass())
def partitionGraphBy[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],numberOfPartitions:Int)(implicit sc:SparkContext): Graph[VD, ED] ={
val (numberOfCommunities: VertexId, coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int, strategy: ByComponentIdPartitionStrategy) = buildPartitioningStrategy(graph, numberOfPartitions)
logger.info(s"Partitioning graph using coarsed map with ${coarsedVertexMap.size} entries and ${coarsedNumberOfPartitions} partitions (before ${numberOfCommunities})")
val out=new CustomGraphPartitioningImplementation[VD,ED](graph).partitionBy(strategy)
out.edges.foreachPartition((_)=>{})
graph.unpersist(false)
out
}
def buildPartitioningStrategy[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
val (numberOfCommunities: VertexId, coarsedVertexMap: Map[VertexId, Int], coarsedNumberOfPartitions: Int) = precomputePartitions(graph, numberOfPartitions)
val strategy = ByComponentIdPartitionStrategy(coarsedVertexMap, coarsedNumberOfPartitions)
(numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions, strategy)
}
def precomputePartitions[ED: ClassTag, VD: ClassTag](graph: Graph[VD, ED], numberOfPartitions: Int)(implicit sc:SparkContext) = {
logger.info("Computing components using PSCAN")
val (communities, numberOfCommunities): (Graph[ComponentID, ED], VertexId) = PSCAN.computeConnectedComponentsUsing(graph, numberOfPartitions)
logger.info("Components computed!")
communities.unpersist(false)
val (coarsedVertexMap, coarsedNumberOfPartitions) = ParallelPartitioningUtils.coarsePartitions(numberOfPartitions, numberOfCommunities, communities.vertices)
(numberOfCommunities, coarsedVertexMap, coarsedNumberOfPartitions)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:43,代码来源:PSCANBasedPartitioning.scala
示例11: MeasureTest
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.scalatest._
abstract class MeasureTest(implicit sc:SparkContext) extends FlatSpec with BeforeAndAfterAll with GivenWhenThen with Matchers with BeforeAndAfterEach{
def time[T](str: String)(thunk: => T): (T,Long) = {
logger.info(s"$str...")
val t1 = System.currentTimeMillis
val x = thunk
val t2 = System.currentTimeMillis
val diff=t2 - t1
logger.info(s"$diff ms")
(x,diff)
}
val logger=Logger.getLogger(this.getClass)
def loadGraph(file:String)={
val out: Graph[Int, Int] =GraphLoader.edgeListFile(sc,file.toString)
out.vertices.setName(s"Graph vertices ${file}")
out.edges.setName(s"Graph edges ${file}")
out.triplets.setName(s"Graph triplets ${file}")
out
out
}
override def beforeEach(testData: TestData) = {
logger.info(s"${Console.GREEN} Running test ${testData.name} ${Console.RESET} ")
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:38,代码来源:MeasureTest.scala
示例12: BasicLinkPredictor
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.algorithms.link
import ml.sparkling.graph.operators.MeasureTest
import ml.sparkling.graph.operators.measures.edge.CommonNeighbours
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import ml.sparkling.graph.operators.OperatorsDSL._
class BasicLinkPredictor$Test (implicit sc:SparkContext) extends MeasureTest {
"In open triad" should " propose to close it" in{
Given("graph")
val filePath = getClass.getResource("/graphs/3_nodes_directed")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes new links")
val links = BasicLinkPredictor.predictLinks(graph,CommonNeighbours,0,true)
Then("Should compute links correctly")
links.collect() should equal(Array((1,3)))
graph.unpersist(true)
}
"In open 4 nodes graph" should " propose to close it fully" in{
Given("graph")
val filePath = getClass.getResource("/graphs/4_nodes_open")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes new links")
val links = graph.predictLinks(CommonNeighbours,1,true)
Then("Should compute links correctly")
links.collect().toSet should equal(Set((1,3),(2,4)))
graph.unpersist(true)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:34,代码来源:BasicLinkPredictor$Test.scala
示例13: BetweennessHua
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.hua
import java.nio.file.Files
import ml.sparkling.graph.operators.MeasureTest
import ml.sparkling.graph.operators.measures.vertex.betweenness.edmonds.EdmondsBC
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.scalatest.tagobjects.Slow
class BetweennessHua$Test (implicit sc: SparkContext) extends MeasureTest {
val tempDir = Files.createTempDirectory("spark-checkpoint")
override def beforeAll() = {
sc.setCheckpointDir(tempDir.toAbsolutePath.toString)
}
override def afterAll() = {
FileUtils.deleteDirectory(tempDir.toFile)
}
"Hua betweenness centrality for random graph" should "be correctly calculated" in {
Given("graph")
val filePath = getClass.getResource("/graphs/graph_ER_15")
val graph: Graph[Int, Int] = loadGraph(filePath.toString)
When("Computes betweenness")
val result = HuaBC.computeBC(graph)
Then("Should calculate betweenness correctly")
val bcFile = getClass.getResource("/graphs/graph_ER_15_bc")
val bcCorrectValues = sc.textFile(bcFile.getPath)
.filter(_.nonEmpty)
.map(l => { val t = l.split("\t", 2); (t(0).toInt, t(1).toDouble) })
.sortBy({ case (vId, data) => vId })
.map({ case (vId, data) => data}).collect()
val bcValues = result.sortBy({ case (vId, data) => vId })
.map({ case (vId, data) => data }).collect()
bcCorrectValues.zip(bcValues).foreach({ case (a, b) =>
a should be(b +- 1e-5)
})
result.unpersist(false)
}
"Hua betweenness centrality for random graph" should "take no longer then Edmonds" taggedAs(Slow) in {
Given("graph")
val filePath = getClass.getResource("/graphs/graph_ER_15")
val graph: Graph[Int, Int] = loadGraph(filePath.toString)
When("computes betwenness centrality")
val (_, edmondsTime) = time("Edmonds algorithm for betweenness centrality")(EdmondsBC.computeBC(graph))
val (_, huaTime) = time("Hua algorithm for betweenness centrality")(HuaBC.computeBC(graph))
Then("Hua algorithm should be faster")
huaTime should be <= edmondsTime
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:58,代码来源:BetweennessHua$Test.scala
示例14: BetweennessEdmonds
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.vertex.betweenness.edmonds
import java.nio.file.Files
import ml.sparkling.graph.operators.MeasureTest
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Graph, VertexRDD}
class BetweennessEdmonds$Test(implicit sc: SparkContext) extends MeasureTest {
val tempDir = Files.createTempDirectory("spark-checkpoint")
override def beforeAll() = {
sc.setCheckpointDir(tempDir.toAbsolutePath.toString)
}
override def afterAll() = {
FileUtils.deleteDirectory(tempDir.toFile)
}
"Edmonds betweenness centrality for random graph" should "be correctly calculated" in {
Given("graph")
val filePath = getClass.getResource("/graphs/graph_ER_15")
val graph: Graph[Int, Int] = loadGraph(filePath.toString)
When("Computes betweenness")
val result = EdmondsBC.computeBC(graph)
Then("Should calculate betweenness correctly")
val bcFile = getClass.getResource("/graphs/graph_ER_15_bc")
val bcCorrectValues = sc.textFile(bcFile.getPath)
.filter(_.nonEmpty)
.map(l => { val t = l.split("\t", 2); (t(0).toInt, t(1).toDouble) })
.sortBy({ case (vId, data) => vId })
.map({ case (vId, data) => data}).collect()
val bcValues = result.sortBy({ case (vId, data) => vId })
.map({ case (vId, data) => data }).collect()
bcCorrectValues.zip(bcValues).foreach({ case (a, b) =>
a should be(b +- 1e-5)
})
result.unpersist(false)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:45,代码来源:BetweennessEdmonds$Test.scala
示例15: AdamicAdar
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.edge
import ml.sparkling.graph.operators.MeasureTest
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import ml.sparkling.graph.operators.OperatorsDSL._
class AdamicAdar$Test(implicit sc:SparkContext) extends MeasureTest {
"Adamic/Adar for star graph" should "be 0 for each node" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes Adamic/Adar")
val result=AdamicAdar.computeWithPreprocessing(graph)
Then("Should calculate Adamic/Adar")
val resultValues=result.edges.map(_.attr).distinct().collect()
resultValues(0) should equal(0)
resultValues.size should equal(1)
graph.unpersist(true)
}
"Adamic/Adar for full graph using DSL" should "be 1.8205 for each node" in{
Given("graph")
val filePath = getClass.getResource("/graphs/4_nodes_full")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes Adamic/Adar")
val result=graph.adamicAdar(true)
Then("Should calculate Adamic/Adar")
val resultValues=result.edges.map(_.attr).distinct().collect()
resultValues(0) should equal(1.82047 +- 1e-5)
resultValues.size should equal(1)
graph.unpersist(true)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:39,代码来源:AdamicAdar$Test.scala
示例16: CommonNeighbours
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.edge
import ml.sparkling.graph.operators.MeasureTest
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import ml.sparkling.graph.operators.OperatorsDSL._
class CommonNeighbours$Test (implicit sc:SparkContext) extends MeasureTest {
"Common neighbours for star graph" should "be 0 for each node" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes common neighbours")
val result=CommonNeighbours.computeWithPreprocessing(graph)
Then("Should calculate common neighbours")
val resultValues=result.edges.map(_.attr).distinct().collect()
resultValues(0) should equal(0)
resultValues.size should equal(1)
}
"Common neighbours for full graph using DSL" should "be 2 for each node" in{
Given("graph")
val filePath = getClass.getResource("/graphs/4_nodes_full")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes common neighbours")
val result=graph.commonNeighbours(true)
Then("Should calculate common neighbours")
val resultValues=result.edges.map(_.attr).distinct().collect()
resultValues(0) should equal(2)
resultValues.size should equal(1)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:37,代码来源:CommonNeighbours$Test.scala
示例17: FreemanCentrality
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.graph
import ml.sparkling.graph.operators.MeasureTest
import ml.sparkling.graph.operators.OperatorsDSL._
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
class FreemanCentrality$Test (implicit sc:SparkContext) extends MeasureTest {
"Freeman Centrality for star graph" should "be 1" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes Freemans Centrality")
val result=FreemanCentrality.compute(graph)
Then("Should calculate Freemans Centrality")
result should be (1)
graph.unpersist(true)
}
"Freeman Centrality for star graph" should "be 1 when calculated using DSL" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes Freemans Centrality")
val result=graph.freemanCentrality()
Then("Should calculate Freemans Centrality")
result should be (1)
graph.unpersist(true)
}
"Freeman Centrality for 5 node line graph" should "be 0.167" in{
Given("graph")
val filePath = getClass.getResource("/graphs/5_nodes_directed")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
When("Computes Freemans Centrality")
val result=FreemanCentrality.compute(graph)
Then("Should calculate Freemans Centrality")
result should be (0.16666666 +- 1e-5)
graph.unpersist(true)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:46,代码来源:FreemanCentrality$Test.scala
示例18: Modularity
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.operators.measures.graph
import ml.sparkling.graph.operators.MeasureTest
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import ml.sparkling.graph.operators.OperatorsDSL._
class Modularity$Test (implicit sc:SparkContext) extends MeasureTest{
"Modularity for star graph" should "be 1" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
val graphComponents=graph.PSCAN(epsilon = 0)
When("Computes Freemans Centrality")
val result=Modularity.compute(graphComponents)
Then("Should calculate Freemans Centrality")
result should be (1)
graph.unpersist(true)
}
"Modularity for star graph" should "be 1 when calculated using DSL" in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
val graphComponents=graph.PSCAN(epsilon = 0)
When("Computes Freemans Centrality")
val result=graphComponents.modularity()
Then("Should calculate Freemans Centrality")
result should be (1)
graph.unpersist(true)
}
"Modularity for all single components" should "be -1 " in{
Given("graph")
val filePath = getClass.getResource("/graphs/6_nodes_star")
val graph:Graph[Int,Int]=loadGraph(filePath.toString)
val graphComponents=graph.PSCAN(epsilon=1)
When("Computes Freemans Centrality")
val result=graphComponents.modularity()
Then("Should calculate Freemans Centrality")
result should be (-1)
graph.unpersist(true)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:50,代码来源:Modularity$Test.scala
示例19: GraphLoading
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.api.loaders
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import scala.reflect.ClassTag
object GraphLoading {
trait GraphLoader[VD,ED]{
def load(parameters:List[Parameter])(implicit sc:SparkContext):Graph[VD,ED]
}
trait TypedGraphLoader[VD2,ED2] extends GraphLoader[VD2,ED2]{
def load[VD:ClassTag,ED:ClassTag](parameters:List[Parameter])(implicit sc:SparkContext):Graph[VD,ED]
}
trait FromPathLoader[VD,ED] {
def apply(path:String):GraphLoader[VD,ED]
}
object LoadGraph{
def from[VD:ClassTag,ED:ClassTag](graphLoader: GraphLoader[VD,ED]):GraphLoaderConfigurator[VD,ED]={
GraphLoaderConfigurator(List.empty,graphLoader)
}
}
case class GraphLoaderConfigurator[VD:ClassTag,ED:ClassTag](parameters:List[Parameter], loader:GraphLoader[_,_]){
def using(parameter:Parameter)={
GraphLoaderConfigurator[VD,ED](parameter::parameters,loader)
}
def load[VD:ClassTag,ED:ClassTag]()(implicit sc:SparkContext): Graph[VD,ED] ={
loader match{
case typed:TypedGraphLoader[_,_]=>typed.load[VD,ED](parameters)
case normal:GraphLoader[VD @unchecked,ED @unchecked] => normal.load(parameters)
}
}
}
trait Parameter
trait WithValueParameter[V] extends Parameter{
def value:V
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:50,代码来源:GraphLoading.scala
示例20: VertexMeasureConfigurationTest
//设置package包名称以及导入依赖的类
package ml.sparkling.graph.api.operators.measures
import ml.sparkling.graph.api.operators.IterativeComputation.BucketSizeProvider
import org.apache.spark.graphx.Graph
import org.scalatest.{FlatSpec, GivenWhenThen}
class VertexMeasureConfigurationTest extends FlatSpec with GivenWhenThen {
"Creation without parameters" should "be possible" in{
VertexMeasureConfiguration()
}
"Creation with undirected flag" should "be possible" in{
Given("Directed flag")
val flag=false
When("Configuration creation")
VertexMeasureConfiguration(treatAsUndirected = flag )
}
"Creation with bucket size provider" should "be possible" in{
Given("Bucker size provider")
val provider:BucketSizeProvider[Long,Long]=(g:Graph[Long,Long])=>1l
When("Configuration creation")
VertexMeasureConfiguration(bucketSizeProvider = provider)
}
"Creation with bucket size provider and directed flag" should "be possible" in{
Given("Bucker size provider")
val provider:BucketSizeProvider[Long,Long]=(g:Graph[Long,Long])=>1l
When("Configuration creation")
VertexMeasureConfiguration( false, provider)
}
}
开发者ID:sparkling-graph,项目名称:sparkling-graph,代码行数:36,代码来源:VertexMeasureConfigurationTest.scala
注:本文中的org.apache.spark.graphx.Graph类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论