I am working on a use case where I am reading from PubSub and I want to write aggregated values to bigquery.
Here is my PubSub input which I am writing to that Topic:
b"('B', 'Stream1', 77)"
b"('C', 'Stream3', 11)"
Here is my code:
import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform
table_spec1 = bigquery.TableReference(
projectId='phonic-vortex-417406',
datasetId='training',
tableId='dflow_agg')
SCHEMA = {
"fields": [
{
"name": 'name',
"type": "STRING",
},
{
"name": 'stream',
"type": "STRING"
},
{
"name": 'salary',
"type": "INT64",
"mode": "NULLABLE"
}
]
}
pipeline_options = PipelineOptions( streaming=True)
class ProcessWords(beam.DoFn):
def process(self, ele):
yield eval(ele)
def run():
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
|"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
| "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
|"window" >> beam.WindowInto(beam.window.FixedWindows(30))
| SqlTransform(
"""
SELECT
Name,
AVG(Salary) AS avg_sal
FROM PCOLLECTION
GROUP BY Name
""")
# SqlTransform yields python objects with attributes corresponding to
# the outputs of the query.
# Collect those attributes, as well as window information, into a dict
| "Assemble Dictionary" >> beam.Map(
lambda row,
window=beam.DoFn.WindowParam: {
"Name": row.Name,
"avg_sal": row.avg_sal,
"window_start": window.start.to_rfc3339(),
"window_end": window.end.to_rfc3339()
})
# | "Write to BigQuery" >> beam.io.WriteToBigQuery(
# output_table,
# schema=SCHEMA,
# create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
# )
# |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
| beam.Map(print)
)
p.run()
run()
I am running this script using below command"
python3 stream_dflow.py --runner=DataflowRunner --project="<PROJECT_ID>" --region="us-east1" --temp_location="gs://datafllow-stg/staging" --save_main_session=True
But, I am getting this error.
NameError: name 'beam' is not defined [while running 'Create beam Row-ptransform-36']
but, beam is already defined.
Does anyone know what's the issue?
I have tried above all things, but still getting error
For the pipeline options,
--save_main_sessionis a boolean and specifying--save_main_sessionis sufficient enough. you don't need to pass true(--save_main_session=True).Let's structure the code to call your run method as
since beam python pickles main and somehow not running your function
run()under main is causing an issue?