When attempting to provide the schema as a side input, beam errors out:
Query result is a {"schema": arrow_table.schema, "data": arrow_table.to_pydict()}
...
with beam.Pipeline(options=options) as pipeline:
query_result = ( pipeline
| 'Create Query' >> beam.Create(view_queries)
| 'Fetch Data Into Arrow Table' >> beam.ParDo(FetchDataFromPostgresToArrow())
)
schema = (query_result
| 'Get Schema' >> beam.Map(lambda x: x["schema"])
)
data = (query_result
| 'Get Data' >> beam.Map(lambda x: x["data"])
)
_ = (data
| 'Write To Parquet' >> beam.io.parquetio.WriteToParquet(
file_path_prefix=known_args.output,
schema=beam.pvalue.AsSingleton(schema),
num_shards=1)
)
...
Getting:
...
File "/Users/maininformer/Library/Caches/pypoetry/virtualenvs/beam-pipelines-uv3pWvyN-py3.10/lib/python3.10/site-packages/apache_beam/io/parquetio.py", line 104, in __init__
self._buffer = [[] for _ in range(len(schema.names))]
AttributeError: 'AsSingleton' object has no attribute 'names'