How to get failed insert record for file load insertion in BigQuery

721 Views Asked by At

I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load method (File loads). I want to retrieve those records which failed during insertion.

Is it possible to have a retry policy on failed records?

Below is my code:

public static void insertToBigQueryDataLake(
        final PCollectionTuple dataStoresCollectionTuple,
        final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
        final Long loadJobTriggerFrequency,
        final Integer loadJobNumShard) {


    WriteResult writeResult = dataStoresCollectionTuple
            .get(dataLakeValidTag)
            .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
            .apply(
                    WRITING_EVENTS_NAME,
                    BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
                            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                            .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
                            .withNumFileShards(loadJobNumShard)
                            .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
                            .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));

    writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
        @ProcessElement
        public void processElement(final ProcessContext processContext) throws IOException {
            System.out.println("Table Row : " + processContext.element().toPrettyString());
        }
    }));

}
1

There are 1 best solutions below

1
On

Using the getFailedInsertsWithErr() method we can push the failed inserts to another table for performing Root cause analysis(RCA), check here for more details.

Example:
// write failed rows with their error to error table                
writeResult
        .getFailedInsertsWithErr()
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
        .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                .to(errTableSpec)
                .withJsonSchema(errSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));