I am working on a use case , where I am getting data on pubsub. I am reading that data, using ReadFromPubSub method of beam.
The published data look like this:
b"('A', 'Stream2', 10)"
b"('B', 'Stream1', 14)"
b"('D', 'Stream3', 16)"
I want to write Stream1 data to another table and Stream 2,3 data to another table. So, I have used side_outputs to achieve this.
Below is my code:
import json
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
table_spec1 = bigquery.TableReference(
projectId=<PROJECT_ID>,
datasetId='training',
tableId='dflow_stream1')
table_spec23 = bigquery.TableReference(
projectId=<PROJECT_ID>,
datasetId='training',
tableId='dflow_stream23')
SCHEMA = {
"fields": [
{
"name": 'name',
"type": "STRING",
},
{
"name": 'stream',
"type": "STRING"
},
{
"name": 'salary',
"type": "INT64",
"mode": "NULLABLE"
}
]
}
pipeline_options = PipelineOptions( streaming=True)
class ProcessWords(beam.DoFn):
def process(self, ele):
Name,Stream,Salary=eval(ele)
if Stream=="Stream1":
yield {"Name":Name,"Stream":Stream,"Salary":Salary}
else:
yield beam.pvalue.TaggedOutput('Stream23', {"Name":Name,"Stream":Stream,"Salary":Salary})
class word_split(beam.DoFn):
def process(selff,ele):
Name,Stream,Salary=eval(ele)
# print(eval(ele))
# print(type(eval(ele)))
yield {"Name":Name,"Stream":Stream,"Salary":Salary}
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
|"Formatting " >> beam.ParDo(ProcessWords()).with_outputs("Stream23",main="Stream1")
)
s1=out.Stream1
s23=out.Stream23
s1 | "Table1" >> beam.io.WriteToBigQuery(
table=table_spec1,
dataset='training',
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
triggering_frequency=5,
with_auto_sharding=True
)
s23 |"Table23" >> beam.io.WriteToBigQuery(
table=table_spec23,
dataset='training',
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
triggering_frequency=5,
with_auto_sharding=True
)
p.run()
I am able to achieve , what I wanted. This pipeline is reading data from pubsub and creating tables in dataset and appending data to respective tables. But, i dont know why, while running it throws below error:
WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [{'index': 0, 'errors': [{'message': 'POST https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT_ID>/datasets/training/tables/dflow_stream23/insertAll?prettyPrint=false: Table 431017404487:training.dflow_stream23 not found.', 'reason': 'Not Found'}]}]
I have tried the above implementation. I just want to understand, why that error is there.