Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
73 views
in Technique[技术] by (71.8m points)

python - PySpark Sum fields that are imbedded in a array within an array

I would like to sum up a field that is in an array within an array. Here is an example of the structure:

records = '[{"_main_object":{"parent_array":[{"child_array":[{"amount_set":{"presentment_money":{"amount":"2.73","currency_code":"USD"},"shop_money":{"amount":"2.73","currency_code":"USD"}}}],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[{"amount_set":{"presentment_money":{"amount":"2.27","currency_code":"USD"},"shop_money":{"amount":"2.27","currency_code":"USD"}}}],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""}]}},{"_main_object":{"parent_array":[{"child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[{"amount_set":{"presentment_money":{"amount":"2.20","currency_code":"USD"},"shop_money":{"amount":"2.20","currency_code":"USD"}}}],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""},{"child_array":[{"amount_set":{"presentment_money":{"amount":"2.80","currency_code":"USD"},"shop_money":{"amount":"2.80","currency_code":"USD"}}}],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""}]}}]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()

Here is the schema:

root
 |-- _main_object: struct (nullable = true)
 |    |-- parent_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child_array: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- amount_set: struct (nullable = true)
 |    |    |    |    |    |    |-- presentment_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |    |    |    |-- shop_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |-- rollup_total_presentment_money_amount: string (nullable = true)
 |    |    |    |-- rollup_total_shop_money_amount: string (nullable = true)

I am finding it challenging, to sum the value in _main_object.parent_array[*].child_array[*].amount_set.presentment_money.amount and store it in _main_object.parent_array[*].rollup_total_presentment_money_amount.

Based on the recommendation from @mck, I came up with the following:

df4 = df.withColumn("_main_object", struct(
                                    expr("""transform(_main_object.parent_array, p -> struct(
                                            p.child_array as child_array,
                                            p.rollup_total_shop_money_amount as rollup_total_shop_money_amount,
                                            aggregate(transform(p.child_array, c -> double(c.amount_set.presentment_money.amount) ), double(0), (acc, x) -> acc + x) as rollup_total_presentment_money_amount
                                            ))
                                        """).alias("parent_array")    
                             ))
df4.printSchema()
df4.select("_main_object.parent_array").show(truncate=False) 

The schema looks right:

root
 |-- _main_object: struct (nullable = false)
 |    |-- parent_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- child_array: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- amount_set: struct (nullable = true)
 |    |    |    |    |    |    |-- presentment_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |    |    |    |-- shop_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |-- rollup_total_shop_money_amount: string (nullable = true)
 |    |    |    |-- rollup_total_presentment_money_amount: double (nullable = true)

The output:

df4.select("_main_object.parent_array").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|parent_array                                                                                                  |
+--------------------------------------------------------------------------------------------------------------+
|[[[[[[2.73, USD], [2.73, USD]]]], , 2.73], [[[[[2.27, USD], [2.27, USD]]]], , 2.27], [[], , 0.0], [[], , 0.0]]|
|[[[], , 0.0], [[[[[2.20, USD], [2.20, USD]]]], , 2.2], [[], , 0.0], [[[[[2.80, USD], [2.80, USD]]]], , 2.8]]  |
+--------------------------------------------------------------------------------------------------------------+
question from:https://stackoverflow.com/questions/65907795/pyspark-sum-fields-that-are-imbedded-in-a-array-within-an-array

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Hope you're using Spark >= 2.4, where the aggregate and transform functions were introduced:

df2 = df.selectExpr("""
    aggregate(
        flatten(
            transform(
                _main_object.parent_array,
                p -> transform(
                    p.child_array,
                    c -> double(c.amount_set.presentment_money.amount)
                )
            )
        ),
    double(0),
    (acc, x) -> acc + x
    ) total
""")

df2.show()
+-----+
|total|
+-----+
|  5.0|
|  5.0|
+-----+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...