Inserting into BigQuery via load jobs (not streaming)

3.5k Views Asked by At

I'm looking to use Dataflow to load data into BigQuery tables using BQ load jobs - not streaming (streaming would cost too much for our use case). I see that the Dataflow SDK has built in support for inserting data via BQ streaming, but I wasn't able to find anything in the Dataflow SDK that supports load jobs out of the box.

Some questions:

1) Does the Dataflow SDK have OOTB support for BigQuery load job inserts? If not, is it planned?

2) If I need to roll my own, what are some good approaches?

If I have to roll my own, performing a BQ load job using Google Cloud Storage is a multi step process - write the file to GCS, submit the load job via the BQ API, and (optionally) check the status until the job has completed (or failed). I'd hope I could use the existing TextIO.write() functionality to write to GCS, but I'm not sure how I'd compose that step with the subsequent call to the BQ API to submit the load job (and optionally the subsequent calls to check the status of the job until it's complete).

Also, I'd be using Dataflow in streaming mode, with windows of 60 seconds - so I'd want to do the load job every 60 seconds as well.

Suggestions?

2

There are 2 best solutions below

0
On

BigQueryIO.write() always uses BigQuery load jobs when the input PCollection is bounded. If you'd like it to also use them if it is unbounded, specify .withMethod(FILE_LOADS).withTriggeringFrequency(...).

0
On

I'm not sure which version of Apache Beam you are using, but now it's possible to use a micro-batching tactic using a Stream Pipeline. If you decide one way or another you can use something like this:

.apply("Saving in batches", BigQueryIO.writeTableRows()
                    .to(destinationTable(options))
                    .withMethod(Method.FILE_LOADS)
                    .withJsonSchema(myTableSchema)
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withTriggeringFrequency(Duration.standardMinutes(2))
                    .withNumFileShards(1);
                    .optimizedWrites());

Things to keep in mind

  1. There are 2 different methods: FILE_LOADS and STREAMING_INSERT, if you use the first one you need to include the withTriggeringFrequency and withNumFileShards. For the first one, from my experience, is better to use minutes and the number will depend on the amount of throughput data. If you receive quite a lot try to keep it small, I have seen "stuck errors" when you increase it too much. The shards can affect mostly your GCS billing, if you add to much shards it will create more files per table per x amount of minutes.
  2. If your input data size is not so big the streaming insert can work really well and the cost shouldn't be a big deal. In that scenario you can use STREAMING_INSERT method and remove the withTriggeringFrequency and withNumFileShards. Also, you can add withFailedInsertRetryPolicy like InsertRetryPolicy.retryTransientErrors() so no rows are being lost (keep in mind that idempotency is not guaranteed with STREAM_INSERTS, so duplication is possible)
  3. You can check your Jobs in BigQuery and validate that everything is working! Keep in mind the policies for jobs with BigQuery (I think is 1000 jobs per table) when you are trying to define triggering frequency and shards.

Note: You can always read this article about efficient aggregation pipelines https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow