Writing to bigquery using apache beam throws error in between

23 Views Asked by At

I am working on a use case , where I am getting data on pubsub. I am reading that data, using ReadFromPubSub method of beam.

The published data look like this:

b"('A', 'Stream2', 10)"
b"('B', 'Stream1', 14)"
b"('D', 'Stream3', 16)"

I want to write Stream1 data to another table and Stream 2,3 data to another table. So, I have used side_outputs to achieve this.

Below is my code:

import json
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery

table_spec1 = bigquery.TableReference(
    projectId=<PROJECT_ID>,
    datasetId='training',
    tableId='dflow_stream1')

table_spec23 = bigquery.TableReference(
    projectId=<PROJECT_ID>,
    datasetId='training',
    tableId='dflow_stream23')



SCHEMA = {
        "fields": [
            {
                "name": 'name',
                "type": "STRING",
                
            },
            {
                "name": 'stream',
                "type": "STRING"
            },
            {
                "name": 'salary',
                "type": "INT64",
                "mode": "NULLABLE"
            }
        ]
    }

pipeline_options = PipelineOptions( streaming=True)

class ProcessWords(beam.DoFn):
  def process(self, ele):
    Name,Stream,Salary=eval(ele)
    if Stream=="Stream1":
      yield {"Name":Name,"Stream":Stream,"Salary":Salary}
    else:

      yield beam.pvalue.TaggedOutput('Stream23', {"Name":Name,"Stream":Stream,"Salary":Salary})



class word_split(beam.DoFn):
  def process(selff,ele):
    Name,Stream,Salary=eval(ele)
    # print(eval(ele))
    # print(type(eval(ele)))
    yield {"Name":Name,"Stream":Stream,"Salary":Salary}
with beam.Pipeline(options=pipeline_options) as p:


  out= (
                p
                | "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
                | "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
                |"Formatting " >> beam.ParDo(ProcessWords()).with_outputs("Stream23",main="Stream1")

            )
  s1=out.Stream1
  s23=out.Stream23
  s1 | "Table1" >> beam.io.WriteToBigQuery(
                table=table_spec1,
                dataset='training',
                    schema=SCHEMA,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                    triggering_frequency=5,
                    with_auto_sharding=True

 )
  
  s23 |"Table23" >> beam.io.WriteToBigQuery(
                table=table_spec23,
                dataset='training',
                    schema=SCHEMA,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                    triggering_frequency=5,
                    with_auto_sharding=True
 )

p.run()

I am able to achieve , what I wanted. This pipeline is reading data from pubsub and creating tables in dataset and appending data to respective tables. But, i dont know why, while running it throws below error:

WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [{'index': 0, 'errors': [{'message': 'POST https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT_ID>/datasets/training/tables/dflow_stream23/insertAll?prettyPrint=false: Table 431017404487:training.dflow_stream23 not found.', 'reason': 'Not Found'}]}]

I have tried the above implementation. I just want to understand, why that error is there.

0

There are 0 best solutions below