In the below I'm code trying to merge a dataframe to a delta table.
Here I'm joining the new dataframe with the delta table and then transforming the joined data to match the delta table schema, and then merging that into the delta table.
But I'm getting AnalysisException.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#514 missing from _file_name_#872,age#516,id#879,name#636,age#881,name#880,city#882,id#631,_row_id_#866L,city#641 in operator !Join Inner, (id#514 = id#631). Attribute(s) with the same name appear in the operation: id. Please check if the right attribute(s) are used.;;
!Join Inner, (id#514 = id#631)
:- SubqueryAlias deltaData
: +- Project [id#631, name#636, age#516, city#641]
: +- Project [age#516, id#631, name#636, new_city#510 AS city#641]
: +- Project [age#516, id#631, new_name#509 AS name#636, new_city#510]
: +- Project [age#516, new_id#508 AS id#631, new_name#509, new_city#510]
: +- Project [age#516, new_id#508, new_name#509, new_city#510]
: +- Join Inner, (id#514 = new_id#508)
: :- Relation[id#514,name#515,age#516,city#517] parquet
: +- LocalRelation [new_id#508, new_name#509, new_city#510]
+- Project [id#879, name#880, age#881, city#882, _row_id_#866L, input_file_name() AS _file_name_#872]
+- Project [id#879, name#880, age#881, city#882, monotonically_increasing_id() AS _row_id_#866L]
+- Project [id#854 AS id#879, name#855 AS name#880, age#856 AS age#881, city#857 AS city#882]
+- Relation[id#854,name#855,age#856,city#857] parquet
My setup is Spark 3.0.0, Delta Lake 0.7.0, Hadoop 2.7.4
But the below code is running fine in Databricks 7.4 runtime, and the new dataframe is getting merged with the delta table
Code Snippet:
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}
object CodePen extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val deltaPath = "<delta-path>"
val oldEmployee = Seq(
Employee(10, "Django", 22, "Bangalore"),
Employee(11, "Stephen", 30, "Bangalore"),
Employee(12, "Calvin", 25, "Hyderabad"))
val newEmployee = Seq(EmployeeNew(10, "Django", "Bangkok"))
spark.createDataFrame(oldEmployee).write.format("delta").mode(SaveMode.Overwrite).save(deltaPath) // Saving the data to a delta table
val newDf = spark.createDataFrame(newEmployee)
val deltaTable = DeltaTable.forPath(deltaPath)
val joinedDf = deltaTable.toDF.join(newDf, col("id") === col("new_id"), "inner")
joinedDf.show()
val cols = newDf.columns
// Transforming the joined Dataframe to match the schema of the delta table
var intDf = joinedDf.drop(cols.map(removePrefix): _*)
for (column <- newDf.columns)
intDf = intDf.withColumnRenamed(column, removePrefix(column))
intDf = intDf.select(deltaTable.toDF.columns.map(col): _*)
deltaTable.toDF.show()
intDf.show()
deltaTable.as("oldData")
.merge(
intDf.as("deltaData"),
col("oldData.id") === col("deltaData.id"))
.whenMatched()
.updateAll()
.execute()
deltaTable.toDF.show()
def removePrefix(column: String) = {
column.replace("new_", "")
}
}
case class Employee(id: Int, name: String, age: Int, city: String)
case class EmployeeNew(new_id: Int, new_name: String, new_city: String)
Below is the output of the dataframes.
New Dataframe:
+---+------+-------+
| id| name| city|
+---+------+-------+
| 10|Django|Bangkok|
+---+------+-------+
Joined Datafame:
+---+------+---+---------+------+--------+--------+
| id| name|age| city|new_id|new_name|new_city|
+---+------+---+---------+------+--------+--------+
| 10|Django| 22|Bangalore| 10| Django| Bangkok|
+---+------+---+---------+------+--------+--------+
Delta Table Data:
+---+-------+---+---------+
| id| name|age| city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|Bangalore|
+---+-------+---+---------+
Transformed New Dataframe:
+---+------+---+-------+
| id| name|age| city|
+---+------+---+-------+
| 10|Django| 22|Bangkok|
+---+------+---+-------+
question from:
https://stackoverflow.com/questions/66048584/spark-deltalake-upsert-merge-is-throwing-org-apache-spark-sql-analysisexcepti