Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
348 views
in Technique[技术] by (71.8m points)

scala - Spark DeltaLake Upsert (merge) is throwing "org.apache.spark.sql.AnalysisException"

In the below I'm code trying to merge a dataframe to a delta table. Here I'm joining the new dataframe with the delta table and then transforming the joined data to match the delta table schema, and then merging that into the delta table.

But I'm getting AnalysisException.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#514 missing from _file_name_#872,age#516,id#879,name#636,age#881,name#880,city#882,id#631,_row_id_#866L,city#641 in operator !Join Inner, (id#514 = id#631). Attribute(s) with the same name appear in the operation: id. Please check if the right attribute(s) are used.;;
!Join Inner, (id#514 = id#631)
:- SubqueryAlias deltaData
:  +- Project [id#631, name#636, age#516, city#641]
:     +- Project [age#516, id#631, name#636, new_city#510 AS city#641]
:        +- Project [age#516, id#631, new_name#509 AS name#636, new_city#510]
:           +- Project [age#516, new_id#508 AS id#631, new_name#509, new_city#510]
:              +- Project [age#516, new_id#508, new_name#509, new_city#510]
:                 +- Join Inner, (id#514 = new_id#508)
:                    :- Relation[id#514,name#515,age#516,city#517] parquet
:                    +- LocalRelation [new_id#508, new_name#509, new_city#510]
+- Project [id#879, name#880, age#881, city#882, _row_id_#866L, input_file_name() AS _file_name_#872]
   +- Project [id#879, name#880, age#881, city#882, monotonically_increasing_id() AS _row_id_#866L]
      +- Project [id#854 AS id#879, name#855 AS name#880, age#856 AS age#881, city#857 AS city#882]
         +- Relation[id#854,name#855,age#856,city#857] parquet

My setup is Spark 3.0.0, Delta Lake 0.7.0, Hadoop 2.7.4

But the below code is running fine in Databricks 7.4 runtime, and the new dataframe is getting merged with the delta table

Code Snippet:

import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}

object CodePen extends App {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()

  val deltaPath = "<delta-path>"
  val oldEmployee = Seq(
    Employee(10, "Django", 22, "Bangalore"),
    Employee(11, "Stephen", 30, "Bangalore"),
    Employee(12, "Calvin", 25, "Hyderabad"))

  val newEmployee = Seq(EmployeeNew(10, "Django", "Bangkok"))
  spark.createDataFrame(oldEmployee).write.format("delta").mode(SaveMode.Overwrite).save(deltaPath) // Saving the data to a delta table
  val newDf = spark.createDataFrame(newEmployee)

  val deltaTable = DeltaTable.forPath(deltaPath)
  val joinedDf = deltaTable.toDF.join(newDf, col("id") === col("new_id"), "inner")

  joinedDf.show()
  val cols = newDf.columns
  // Transforming the joined Dataframe to match the schema of the delta table
  var intDf = joinedDf.drop(cols.map(removePrefix): _*)
  for (column <- newDf.columns)
    intDf = intDf.withColumnRenamed(column, removePrefix(column))

  intDf = intDf.select(deltaTable.toDF.columns.map(col): _*)

  deltaTable.toDF.show()
  intDf.show()

  deltaTable.as("oldData")
    .merge(
      intDf.as("deltaData"),
      col("oldData.id") === col("deltaData.id"))
    .whenMatched()
    .updateAll()
    .execute()

  deltaTable.toDF.show()

  def removePrefix(column: String) = {
    column.replace("new_", "")
  }
}

case class Employee(id: Int, name: String, age: Int, city: String)

case class EmployeeNew(new_id: Int, new_name: String, new_city: String)

Below is the output of the dataframes.

New Dataframe:

+---+------+-------+
| id|  name|   city|
+---+------+-------+
| 10|Django|Bangkok|
+---+------+-------+

Joined Datafame:

+---+------+---+---------+------+--------+--------+
| id|  name|age|     city|new_id|new_name|new_city|
+---+------+---+---------+------+--------+--------+
| 10|Django| 22|Bangalore|    10|  Django| Bangkok|
+---+------+---+---------+------+--------+--------+

Delta Table Data:

+---+-------+---+---------+
| id|   name|age|     city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|Bangalore|
+---+-------+---+---------+

Transformed New Dataframe:

+---+------+---+-------+
| id|  name|age|   city|
+---+------+---+-------+
| 10|Django| 22|Bangkok|
+---+------+---+-------+
question from:https://stackoverflow.com/questions/66048584/spark-deltalake-upsert-merge-is-throwing-org-apache-spark-sql-analysisexcepti

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You are getting this AnalysisException because the schemas of deltaTable and intDf are slightly different:

deltaTable.toDF.printSchema()
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



intDf.printSchema()
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

Due to the fact, that intDf table resulted out of a join where the column "id" is used as a key, it will force your join condition column to be non-nullable.

If you change the nullale property as explained here you will get the desired output:

+---+-------+---+---------+
| id|   name|age|     city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|  Bangkok|
+---+-------+---+---------+

Tested with Spark 3.0.1 and Delta 0.7.0.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...