Is it possible to pass the schema as a side input to beam.io.parquetio.WriteToParquet?

52 Views Asked by At

When attempting to provide the schema as a side input, beam errors out:

Query result is a {"schema": arrow_table.schema, "data": arrow_table.to_pydict()}

...
    with beam.Pipeline(options=options) as pipeline:
        query_result = ( pipeline
        | 'Create Query' >> beam.Create(view_queries)
        | 'Fetch Data Into Arrow Table' >> beam.ParDo(FetchDataFromPostgresToArrow())
        )

        schema = (query_result
        | 'Get Schema' >> beam.Map(lambda x: x["schema"])
        )

        data = (query_result
        | 'Get Data' >> beam.Map(lambda x: x["data"])
        )

        _ = (data
        | 'Write To Parquet' >> beam.io.parquetio.WriteToParquet(
            file_path_prefix=known_args.output,
            schema=beam.pvalue.AsSingleton(schema),
            num_shards=1)
        )
...

Getting:

...
  File "/Users/maininformer/Library/Caches/pypoetry/virtualenvs/beam-pipelines-uv3pWvyN-py3.10/lib/python3.10/site-packages/apache_beam/io/parquetio.py", line 104, in __init__
    self._buffer = [[] for _ in range(len(schema.names))]
AttributeError: 'AsSingleton' object has no attribute 'names'
0

There are 0 best solutions below