Does Apache Beam's BigQuery IO Support JSON Datatype Fields for Streaming Inserts?

38 Views Asked by At

I'm currently working with Apache Beam (Dataflow) to process and insert data into a Google BigQuery table. The table has a field with a JSON datatype (test_jsontype_field), and I'm encountering issues during BigQueryIO.Write.Method.STREAMING_INSERTS.

Issue Description: My pipeline reads JSON data from GCP PubSub and writes to BigQuery using streaming inserts. I am using TableRowJsonCoder to decode the JSON input string into a TableRow object. However, when Beam tries to insert data into BigQuery, the process fails with an error: The field <FIELD> is not a record. Basically the <FIELD> is a JSON datatype in BigQuery schema.

Question Is there known support for JSON datatype fields in BigQuery when using Apache Beam for streaming inserts? If so, are there specific configurations or coding practices required to ensure compatibility?

I would appreciate any insights or guidance on how to handle JSON datatype fields effectively in this scenario with Apache Beam and BigQuery.

Code

static TableRow convertJsonToTableRow(String json) {
        TableRow row;
        try (InputStream inputStream = new ByteArrayInputStream(json.getBytes())) {
            row = TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
            logger.debug("message {}", row.toPrettyString());
        } catch (IOException e) {
            throw new RuntimeException("failed to serialize json to table row: " + json, e);
        }
        return row;
    }

Error

"org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"test_jsontype_field","message":"This field: test_jsontype_field is not a record.","reason":"invalid"}],"index":0}]
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776)
    at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"test_jsontype_field","message":"This field: test_jsontype_field is not a record.","reason":"invalid"}],"index":0}]
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:416)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$900(BatchedStreamingWrite.java:67)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:286)
Caused by: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"test_jsontype_field","message":"This field: test_jsontype_field is not a record.","reason":"invalid"}],"index":0}]
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1262)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1281)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:403)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$900(BatchedStreamingWrite.java:67)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:286)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776)
    at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
0

There are 0 best solutions below