Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
123 views
in Technique[技术] by (71.8m points)

java - Evolving a schema with Spark DataFrame

I'm working with a Spark dataframe which could be loading data from one of three different schema versions:

// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }

I can process the additional "C" by checking if the schema contains a field "C" and if not adding a new column to the dataframe. However I can't work out how to create a field for the sub-object.

public void evolvingSchema() {
    String versionOne = "{ "A": {"B": 1 } }";
    String versionTwo = "{ "A": {"B": 1 }, "C": 2 }";
    String versionThree = "{ "A": {"B": 1, "D": 3 }, "C": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
    if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
        df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
    }
    // Not sure what to put here. The fieldNames does not contain the "A.D"

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

JSON sources are not very well suited for data with evolving schema (how about Avro or Parquet instead) but the simple solution is to use the same schema for all sources and make new fields optional / nullable:

import org.apache.spark.sql.types.{StructType, StructField, LongType}

val schema = StructType(Seq(
  StructField("A", StructType(Seq(
    StructField("B", LongType, true), 
    StructField("D", LongType, true)
  )), true),
  StructField("C", LongType, true)))

You can pass schema like this to DataFrameReader:

val rddV1 = sc.parallelize(Seq("{ "A": {"B": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)

val rddV2 = sc.parallelize(Seq("{ "A": {"B": 1 }, "C": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)

val rddV3 = sc.parallelize(Seq("{ "A": {"B": 1, "D": 3 }, "C": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)

and you'll get a consistent structure independent of a variant:

require(df1.schema == df2.schema && df2.schema == df3.schema)

with missing columns automatically set to null:

df1.printSchema
// root
//  |-- A: struct (nullable = true)
//  |    |-- B: long (nullable = true)
//  |    |-- D: long (nullable = true)
//  |-- C: long (nullable = true)

df1.show
// +--------+----+
// |       A|   C|
// +--------+----+
// |[1,null]|null|
// +--------+----+

df2.show
// +--------+---+
// |       A|  C|
// +--------+---+
// |[1,null]|  2|
// +--------+---+

df3.show
// +-----+---+
// |    A|  C|
// +-----+---+
// |[1,3]|  2|
// +-----+---+

Note:

This solutions is data source dependent. It may or may not work with other sources, or even result in malformed records.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...