//设置package包名称以及导入依赖的类
package com.jjzhk.sparkexamples.sql
import org.apache.spark.sql.functions.{countDistinct, sum}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object SparkSQLAgg {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkSQLAgg").master("local").getOrCreate()
val userData = Array(
"2016-03-27,001,http://spark.apache.org/,1000",
"2016-03-27,001,http://hadoop.apache.org/,2000",
"2016-03-27,002,http://flink.apache.org/,3000",
"2016-03-28,003,http://kafka.apache.org/,1500",
"2016-03-28,004,http://spark.apache.org/,6000",
"2016-03-28,002,http://hive.apache.org/,1200",
"2016-03-28,001,http://parquet.apache.org/,1800",
"2016-03-28,001,http://spark.apache.org/,1040"
)
val userDataRDD = spark.sparkContext.parallelize(userData)
val userDataRow = userDataRDD.map(row => {
val splitted = row.split(",")
Row(splitted(0), splitted(1), splitted(2), splitted(3).toInt)
})
val structTypes = StructType(Seq(
StructField("time", DataTypes.StringType, true),
StructField("id", DataTypes.StringType, true),
StructField("url", DataTypes.StringType, true),
StructField("amount", DataTypes.IntegerType, true)
))
val userDataDF = spark.createDataFrame(userDataRow, structTypes)
import spark.implicits._
// ??????????
userDataDF.groupBy("time").agg(countDistinct($"id")).show() // select distinct time, count(id) from *** group by time
// ??????
userDataDF.groupBy("time").agg(sum($"amount")).show()
}
}
请发表评论