pySpark hudi table partial updating with org.apache.hudi.common.model.PartialUpdateAvroPayload not working

33 Views Asked by At

I have two tables in S3 tableA with columns id, col1, col2 and col3. tableB with columns id, col4 and col5. I want to write this data into another s3 in Hudi format as tableC with columns id, col1, col2, col3, col4, col5.

I do not want to join the data frames because in my real time use case i will receive incremental data and may not always have matching records available in both tables, so i am trying to implement org.apache.hudi.common.model.PartialUpdateAvroPayload option.

Below is the script I run for the first time where it inserts tableA records first, by adding empty value to col4 and col5 of tableB for the initial table creation and it works fine.

tablea_df = (spark.read 
       .format("hudi")
       .load(s3_path_tablea))

additional_options = {
    "hoodie.table.name": "hudiTableAplusB",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "id",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.parquet.compression.codec": "gzip",
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload",
    "hoodie.datasource.write.reconcile.schema": "true",
    "hoodie.bloom.index.update.partition.path": "false",
}
tablea_df = tablea_df.withColumn("col4", lit('').cast(StringType()))
tablea_df = tablea_df.withColumn("col5", lit('').cast(StringType()))

tablea_df.write.format("hudi").options(
    **additional_options
).mode("append").save(
    s3_path_curated
)

tableb_df = (spark.read 
       .format("hudi")
       .load(s3_path_tableb))

additional_options = {
    "hoodie.table.name": "hudiTableAplusB",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "id",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.parquet.compression.codec": "gzip",
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload",
    "hoodie.datasource.write.reconcile.schema": "true",
    "hoodie.bloom.index.update.partition.path": "false",
}
tableb_df.write.format("hudi").options(
    **additional_options
).mode("append").save(
    s3_path_curated
)

For the second time I don't want to add empty values to col4 and col5 as I want to retain the existing value. I removed the below lines form above code and try to apply a partial payload upsert.

tablea_df = tablea_df.withColumn("col4", lit('').cast(StringType()))
tablea_df = tablea_df.withColumn("col5", lit('').cast(StringType()))

I get the below error

Caused by: java.lang.NullPointerException: null of string in field col4 of hoodie.hudiTableAplusB.hudiTableAplusB_record

Note: Strange part is for the second time if I run partial upsert reading the data from tableB only, by removing the tableA write logic it works fine! But dosent work for TableA.

Using following jar files Hudi Jar: hudi-spark3.3-bundle_2.12-0.14.0.jar, Spark Avro: spark-avro_2.13-3.3.0.jar

0

There are 0 best solutions below