Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
730 views
in Technique[技术] by (71.8m points)

apache spark - How to group by common element in array?

I am trying to find the solution in spark to group data with a common element in an array.

 key                            value
[k1,k2]                         v1
[k2]                            v2
[k3,k2]                         v3
[k4]                            v4

If any element matches in key, we have to assign same groupid to that.(Groupby common element)

Result:

key                             value  GroupID
[k1,k2]                           v1    G1
[k2]                              v2    G1
[k3,k2]                           v3    G1 
[k4]                              v4    G2

Some suggestions are already given with Spark Graphx, but at this moment learning curve will be more to implement this for a single feature.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Include graphframes (the latest supported Spark version is 2.1, but it should support 2.2 as well, if you use newer you'll have to build your own with 2.3 patch) replacing XXX with Spark version and YYY with Scala version:

spark.jars.packages  graphframes:graphframes:0.5.0-sparkXXX-s_YYY

Add explode keys:

import org.apache.spark.sql.functions._

val df = Seq(
   (Seq("k1", "k2"), "v1"), (Seq("k2"), "v2"),
   (Seq("k3", "k2"), "v3"), (Seq("k4"), "v4")
).toDF("key", "value")

val edges = df.select(
  explode($"key") as "src", $"value" as "dst")

Convert to graphframe:

import org.graphframes._

val gf = GraphFrame.fromEdges(edges)

Set checkpoint directory (if not set):

import org.apache.spark.sql.SparkSession

val path: String = ???
val spark: SparkSession = ???
spark.sparkContext.setCheckpointDir(path)

Find connected components:

val components = GraphFrame.fromEdges(edges).connectedComponents.setAlgorithm("graphx").run

Join result with input data:

 val result = components.where($"id".startsWith("v")).toDF("value", "group").join(df, Seq("value"))

Check result:

result.show

// +-----+------------+--------+
// |value|       group|     key|
// +-----+------------+--------+
// |   v3|489626271744|[k3, k2]|
// |   v2|489626271744|    [k2]|
// |   v4|532575944704|    [k4]|
// |   v1|489626271744|[k1, k2]|
// +-----+------------+--------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...