How to use BigQuery Storage API to concurrently read streams in Python threads

1.4k Views Asked by At

I have a large table (external to BigQuery as the data is in Google Cloud Storage). I want to scan the table using BigQuery to a client machine. For throughput, I fetch multiple streams concurrently in multiple threads.

From all I can tell, concurrency is not working. There's actually some penalty when using multiple threads.


import concurrent.futures
import logging
import queue
import threading
import time

from google.cloud.bigquery_storage import types
from google.cloud import bigquery_storage

PROJECT_ID = 'abc'
CREDENTIALS = {....}


def main():
    table = "projects/{}/datasets/{}/tables/{}".format(PROJECT_ID, 'db', 'tb')

    requested_session = types.ReadSession()
    requested_session.table = table
    requested_session.data_format = types.DataFormat.AVRO
    requested_session.read_options.selected_fields = ["a", "b"]
    requested_session.read_options

    client = bigquery_storage.BigQueryReadClient(credentials=CREDENTIALS)
    session = client.create_read_session(
        parent="projects/{}".format(PROJECT_ID),
        read_session=requested_session,
        max_stream_count=0,
    )

    if not session.streams:
        return

    n_streams = len(session.streams)
    print("Total streams", n_streams)  # this prints 1000

    q_out = queue.Queue(1024)
    concurrency = 4

    with concurrent.futures.ThreadPoolExecutor(concurrency) as pool:
        tasks = [
            pool.submit(download_row,
                        client._transport.__class__,
                        client._transport._grpc_channel,
                        s.name,
                        q_out)
            for s in session.streams
        ]

        t0 = time.perf_counter()
        ntotal = 0
        ndone = 0
        while True:
            page = q_out.get()
            if page is None:
                ndone += 1
                if ndone == len(tasks):
                    break
            else:
                for row in page:
                    ntotal += 1
                    if ntotal % 10000 == 0:
                        qps = int(ntotal / (time.perf_counter() - t0))
                        print(f'QPS so far:  {qps}')

        for t in tasks:
            t.result()


def download_row(transport_cls, channel, stream_name, q_out):
    try:
        transport = transport_cls(channel=channel)
        client = bigquery_storage.BigQueryReadClient(
            transport=transport,
            )
        reader = client.read_rows(stream_name)
        for page in reader.rows().pages:
            q_out.put(page)
    finally:
        q_out.put(None)


if __name__ == '__main__':
    main()

Google BigQuery Storage API doc and multiple source claim one can fetch multiple "streams" concurrently for higher throughput, yet I didn't find any functional example. I've followed the advice to share a GRPC "channel" across the threads.

The data items are large. The QPS I got is roughly

150, concurrency=1
120, concurrency=2
140, concurrency=4

Each "page" contains about 200 rows.

Thoughts:

  1. BigQuery quota? I only saw request rate limit, and did not see limit on volume of data traffic per second. The quotas do not appear to be limiting for my case.

  2. BigQuery server side options? Doesn't seem to be relevant. BigQuery should accept concurrent requests with enough capability.

  3. GPRC usage? I think this is the main direction for digging. But I don't know what's wrong in my code.

Can anyone shed some light on this? Thanks.

1

There are 1 best solutions below

2
On

Python threads do not run in parallel because of the GIL.

You are creating threads, and not multiprocesses. And by definition Python is single core because of GIL.

ThreadPoolExecutor has been available since Python 3.2, it is not widely used, perhaps because of misunderstandings of the capabilities and limitations of Threads in Python. This is enforced by the Global Interpreter Lock ("GIL"). More

Look into using multiprocessing module, a good read is here.

UPDATE:

Also in your code you need one more param: requested_streams

n_streams = 2
session = client.create_read_session(
    table_ref,
    parent,
    requested_streams=n_streams,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)