Usually there should be no need for that and it is better to use UDFs but here you are:
How should I define function to pass it into df.rdd.mapPartitions, if I want to create new Row with few additional columns
It should take Iterator[Row]
and return Iterator[T]
so in your case you should use something like this
import org.apache.spark.sql.Row
def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
How can I add few columns into Row object(or create a new one)
There are multiple ways of accessing Row
values including Row.get*
methods, Row.toSeq
etc. New Row
can be created using Row.apply
, Row.fromSeq
, Row.fromTuple
or RowFactory
. For example:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
How create DataFrame from created RDD
If you have RDD[Row]
you can use SQLContext.createDataFrame
and provide schema.
Putting this all together:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
val newSchema = StructType(df.schema.fields ++ Array(
StructField("z", IntegerType, false), StructField("v", IntegerType, false)))
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
// +---+----+---+---+
// | x| y| z| v|
// +---+----+---+---+
// |1.0| 2.0| -1| 1|
// |0.0|-1.0| -1| 1|
// |3.0| 4.0| -1| 1|
// |6.0|-2.3| -1| 1|
// +---+----+---+---+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…