BigQuery Storage API: what happened to AppendRowsStream

661 Views Asked by At

Seeing no Errors but no Data Submitted

I am trying to use the new Python BigQuery Storage API (google.cloud.bigquery_storage instead of google.cloud.bigquery_storage_v1 or google.cloud.bigquery_storage_v1beta2), as can be seen here.

But there are no end-to-end documents or examples to write to BigQuery using the new API (which it seems is version 2, but is not explicitly referred to as version 2 other than in that migration document).

If I look at version 1 examples like this one or this one, they use the AppendRowsStream method. That method does not exist anywhere (that I can find) in version 2.

I am trying to use version 2, as it seems the right one to use since it is fully released, but without much documentation and with breaking changes that I cannot understand, I'm unable to get a publish to work. I am running code which has no errors, both an Async and a Sync version, but neither are publishing anything. They just run -- without errors -- and complete with no data being pushed to the table.

Code

The code is quite long, and I can publish it, but for now I will just publish the high level flow (I am publishing a batch using a PENDING type stream):

  1. Create a bigquery client: client = google.cloud.bigquery_storage.BigQueryWriteClient()
  2. Create a write stream request: request = google.cloud.bigquery_storage.CreateWriteStreamRequest(parent="<string_with_full_table_path", write_stream=google.cloud.bigquery_storage.WriteStream(type_=google.cloud.bigquery_storage.WriteStream.PENDING)
  3. Create a write stream: stream = client.create_write_stream(request=request)
  4. Create an iterator of append rows requests. This is just a def - for - yield wrapper around the append rows request immediately below.
  5. Create an append rows request: request = google.cloud.bigquery_storage.AppendRowsRequest(write_stream=stream.name, proto_rows=proto_data)
    • I've skipped describing proto_data, but it was written correctly because I could inspect it. It is a google.cloud.bigquery_storage.AppendRowsRequest.ProtoData object with the proper writer_schema and rows data attached to it.
  6. Iterate through the iterator. It seems from the examples I showed above that all which is necessary is to iterate through them.
  7. Finalize the stream: client.finalize_write_stream(name=stream.name)
  8. Create a batch write stream request: batch_commit_request = google.cloud.bigquery_storage.BatchCommitWriteStreamsRequest( parent=table_path, write_streams=[stream.name] )
  9. Batch commit the stream: commit_response = client.batch_commit_write_streams(request=batch_commit_request)

Analysis

I have sprinkled in logs, and I can see in the logs that the data is being generated and is within the Protobufs and the Rows. I am also getting proper responses from the client object. But I am not getting any data pushed.

The only thing that I can see that I am doing differently from the examples I pointed to earlier is that I am not including the AppendRowsStream action. For instance, one of the examples I linked to has these sections:

from google.cloud.bigquery_storage_v1 import writer
.
.
.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
.
.
.
response_future_1 = append_rows_stream.send(request)
.
.
.
append_rows_stream.close()

This is the only reason that I am seeing why my data might not be arriving in the table: without the stream ever closed, I am "committing" something that was never finished. Maybe there are "two steps" to ensure the commit, so that when I am committing an unclosed stream, I am committing something that is effectively empty, because it is trying to ensure ACID compliance.

But I cannot find a way to create -- or therefore submit data to or close -- such an AppendRowsStream. This is the only wrinkle I am seeing why my version might not be working.

Summary

Two questions:

  1. Is there an AppendRowsStream in the new google.cloud.bigquery_storage API? If so, where is it hidden?
  2. Is there anything else you can see that I might be doing wrong?
1

There are 1 best solutions below

1
On

There is no AppendRowsStream in the new google.cloud.bigquery_storage API. Instead, you can use the AppendRows method. The following code shows how to use AppendRows to write data to a BigQuery table:

from google.cloud import bigquery_storage

client = bigquery_storage.BigQueryWriteClient()

# Create a write stream.
request = bigquery_storage.CreateWriteStreamRequest(
    parent="<string_with_full_table_path>",
    write_stream=bigquery_storage.WriteStream(type_=bigquery_storage.WriteStream.PENDING),
)
stream = client.create_write_stream(request=request)

# Create an iterator of append rows requests.
def append_rows_iter():
    for row in rows:
        yield bigquery_storage.AppendRowsRequest(
            write_stream=stream.name, proto_rows=row
        )

# Iterate through the iterator and send each append rows request.
for request in append_rows_iter():
    client.append_rows(request=request)

# Finalize the stream.
client.finalize_write_stream(name=stream.name)

# Batch commit the stream.
batch_commit_request = bigquery_storage.BatchCommitWriteStreamsRequest(
    parent=table_path, write_streams=[stream.name]
)
commit_response = client.batch_commit_write_streams(request=batch_commit_request)

The main difference between this code and the code you posted is that you are using the AppendRows method instead of the AppendRowsStream object. The AppendRows method is the new way to write data to BigQuery using the BigQuery Storage API. The AppendRowsStream object is still supported in the v1 and v1beta2 APIs, but it is deprecated in the v2 API.

I hope this helps!