The following code works well with PySpark 3.2.1
df.withColumn(
"total_amount",
f.aggregate(f.col("taxes"), f.lit(0.00), lambda acc, x: acc + x["amount"]),
)
I've downgraded to PySpark 3.0.3. How to change the above code to something like this:
df.withColumn(
"total_amount",
# f.aggregate(f.col("taxes"), f.lit(0.00), lambda acc, x: acc + x["amount"]),
f.lit(expr("aggregate(taxes,0,(acc,x)->acc+x['amount'])"))
)
x['amount']
does not work in our case! Is there something wrong the expression or I must change the taxes to have a list of numbers?
2nd case
df.withColumn(
"total_amount_2",
f.aggregate(
f.filter(
"lines",
lambda x: (
x["id"].isNotNull()
& (
x["code"].isin(["CODE1", "CODE2"]) == False
)
),
),
f.lit(0.00),
lambda acc, x: acc + x["amount"],
),
)
How to refactor these cases using spark.sql expr function ?
Try the following. I'm certain that one can access struct fields using dot notation too. I'm just not sure about the data type that you use (
0.00
), as this should be of the same data type as before. I have added theD
letter which indicates it's a double.Regarding the 2nd case, review the following test case. I have tested it using Spark 3.0.3.
Input df:
Script: