I have a large table (external to BigQuery as the data is in Google Cloud Storage). I want to scan the table using BigQuery to a client machine. For throughput, I fetch multiple streams concurrently in multiple threads.
From all I can tell, concurrency is not working. There's actually some penalty when using multiple threads.
import concurrent.futures
import logging
import queue
import threading
import time
from google.cloud.bigquery_storage import types
from google.cloud import bigquery_storage
PROJECT_ID = 'abc'
CREDENTIALS = {....}
def main():
table = "projects/{}/datasets/{}/tables/{}".format(PROJECT_ID, 'db', 'tb')
requested_session = types.ReadSession()
requested_session.table = table
requested_session.data_format = types.DataFormat.AVRO
requested_session.read_options.selected_fields = ["a", "b"]
requested_session.read_options
client = bigquery_storage.BigQueryReadClient(credentials=CREDENTIALS)
session = client.create_read_session(
parent="projects/{}".format(PROJECT_ID),
read_session=requested_session,
max_stream_count=0,
)
if not session.streams:
return
n_streams = len(session.streams)
print("Total streams", n_streams) # this prints 1000
q_out = queue.Queue(1024)
concurrency = 4
with concurrent.futures.ThreadPoolExecutor(concurrency) as pool:
tasks = [
pool.submit(download_row,
client._transport.__class__,
client._transport._grpc_channel,
s.name,
q_out)
for s in session.streams
]
t0 = time.perf_counter()
ntotal = 0
ndone = 0
while True:
page = q_out.get()
if page is None:
ndone += 1
if ndone == len(tasks):
break
else:
for row in page:
ntotal += 1
if ntotal % 10000 == 0:
qps = int(ntotal / (time.perf_counter() - t0))
print(f'QPS so far: {qps}')
for t in tasks:
t.result()
def download_row(transport_cls, channel, stream_name, q_out):
try:
transport = transport_cls(channel=channel)
client = bigquery_storage.BigQueryReadClient(
transport=transport,
)
reader = client.read_rows(stream_name)
for page in reader.rows().pages:
q_out.put(page)
finally:
q_out.put(None)
if __name__ == '__main__':
main()
Google BigQuery Storage API doc and multiple source claim one can fetch multiple "streams" concurrently for higher throughput, yet I didn't find any functional example. I've followed the advice to share a GRPC "channel" across the threads.
The data items are large. The QPS I got is roughly
150, concurrency=1
120, concurrency=2
140, concurrency=4
Each "page" contains about 200 rows.
Thoughts:
BigQuery quota? I only saw request rate limit, and did not see limit on volume of data traffic per second. The quotas do not appear to be limiting for my case.
BigQuery server side options? Doesn't seem to be relevant. BigQuery should accept concurrent requests with enough capability.
GPRC usage? I think this is the main direction for digging. But I don't know what's wrong in my code.
Can anyone shed some light on this? Thanks.
Python threads do not run in parallel because of the GIL.
You are creating threads, and not multiprocesses. And by definition Python is single core because of GIL.
Look into using
multiprocessing
module, a good read is here.UPDATE:
Also in your code you need one more param:
requested_streams