Custom Payload class in Python for precombine and combineAndGet in Apache Hudi And Pyspark

326 Views Asked by At

We are migrating our code base from spark-java to PySpark. We were handling custom aggregations for merging data using preCombine() and combineAndGetUpdateValue() and had implemented this in our Spark-Java code. Example below:

package com.paytm.sparkjobs.utils.hudi;


public class MergeMdrPayloadAndPersist extends BaseAvroPayload implements HoodieRecordPayload<MergeMdrPayloadAndPersist> {

    public static final Logger logger = LoggerFactory.getLogger(MergeMdrPayloadAndPersist.class);

    private GenericRecord record = null;

    public MergeMdrPayloadAndPersist(GenericRecord record, Comparable orderingVal) {
        super(record, orderingVal);
        this.record = record;
    }

    @Override
    public MergeMdrPayloadAndPersist preCombine(MergeMdrPayloadAndPersist mergeMdrPayloadAndPersist) {
        //custom logic for aggregations
        return new MergeMdrPayloadAndPersist(mergeMdrPayloadAndPersist.record, mergeMdrPayloadAndPersist.orderingVal);
    }

    @Override
    public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord indexedRecord, Schema schema) throws IOException {

        //custom logic for aggregations

        MergeMdrPayloadAndPersist mergedDoc = new MergeMdrPayloadAndPersist(inputPayload.record, inputPayload.orderingVal);

        return mergedDoc.getInsertValue(schema);
    }

    @Override
    public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
        if (this.recordBytes.length == 0) {
            return Option.empty();
        } else {
            IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(this.recordBytes, schema);
            return this.isDeleteRecord((GenericRecord)indexedRecord) ? Option.empty() : Option.of(indexedRecord);
        }
    }

    private boolean isDeleteRecord(GenericRecord genericRecord) {
        Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
        return deleteMarker instanceof Boolean && (Boolean)deleteMarker;
    }

}

Can I know how do we write a custom Payload class/function in python to handle our aggregation and merging logic? Some code examples would help.

1

There are 1 best solutions below

0
On BEST ANSWER

There is no way to achieve this with pyspark, where Hudi doesn't have its own python API, it uses spark python API to interact with its java/scala classes, which is based on py4j, and you cannot create a java class using py4j because the java class needs to be created before compiling the java code.

The best way is creating a small java jar containing your classes, and adding it to your pyspark shell/submit.