Apologies in advance for the wall of text
I am working on a PoC to see whether we can expose data from our Data warehouse in an relatively effective effective way out to a browser client.
Data warehouse: Bigquery
API: Bigquery storage read api
One of approaches that I'm trying to get working is utilizing BigQuery's Storage Read API that allows us to open a stream (in a Arrow IPC RecordBatch format).
Currently I have tried to create a simple Python API that invokes the BigQuery Storage Read API to then stream the response back to the client (i.e. browser client running DuckDB wasm).
Note: I've made some smaller tests and I can see that the stream from the Bigquery Storage API only contains the schema as part of the first message. Which I understand aligns to the arrow streaming format.
DuckDB Wasm - according to the docs is capable of loading data from an arrow IPC stream so that's what I am looking to use.
However I am currently getting this error in the client when making the request to the python (streaming) api.
async_bindings.ts:150 Uncaught Error: Expected IPC message of type schema but got record batch
But I don't quite understand what I am doing wrong here, any help would be appreciated?
Python API
bq_storage_reader.py
def bq_storage_reader(
client: BigQueryReadClient,
source_model: BQStorageQuery,
streams: int = 1
) -> tuple[ReadSession, list[ReadRowsStream]]:
table = source_model.source.table
dataset = source_model.source.dataset
project = source_model.source.project
read_session = types.ReadSession()
read_session.table = BIGQUERY_TABLE_TEMPLATE.format(
project, dataset, table)
read_session.data_format = types.DataFormat.ARROW
read_session.read_options.selected_fields = source_model.fields
read_session.read_options.row_restriction = 'AND '.join(
source_model.record_filters)
session = client.create_read_session(
parent=PARENT_PROJECT.format(project),
read_session=read_session,
max_stream_count=streams,
)
# message_reader = pa.ipc.open_stream(session.arrow_schema.serialized_schema)
# schema = message_reader.schema
readers = [client.read_rows(stream.name) for stream in session.streams]
return session, readers
get_record_batches_async.py
async def get_record_batches_async(session: ReadSession, stream: ReadRowsStream):
# only first message from the Iterable[stream] has the schema
for record_batch in stream:
yield record_batch.arrow_record_batch.serialized_record_batch
main.py
client = bigquery_storage_v1.BigQueryReadClient()
router = APIRouter()
@router.get('/stores/{store_id}/stream', tags=["pos"])
async def get_stream(
store_id: str,
from_date: str = None,
to_date: str = None):
logger.info({'store': store_id, 'from': from_date, 'to': to_date})
bsq = BQStorageQuery(
fields=None,
record_filters=[f"store_identifier in ('{store_id}')",
f"d_date_sale between '{from_date}' AND '{to_date}'"],
source=BaseSource(
table='<tablename>',
dataset='<dataset>',
project='<gcp-project>',
partition_cols=['<partition_col>']
)
)
session, streams = bq_storage_reader(client, bsq)
if streams is None or len(streams) == 0:
raise HTTPException(status_code=404, detail="data not found")
return StreamingResponse(
get_record_batches_async(session=session, stream=streams[0])
)
Client (duckdb wasm)
const bundle = await duckdb.selectBundle(MANUAL_BUNDLES);
// Instantiate the asynchronus version of DuckDB-wasm
const worker = new Worker(bundle.mainWorker);
const logger = new duckdb.ConsoleLogger();
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(bundle.mainModule, bundle.pthreadWorker);
const conn = await db.connect(); // Connect to db
const queryParams = {
from_date: "2022-04-08",
to_date: "2022-07-08",
};
const url =
"http://localhost:8081/stores/1003446/stream?from_date=2022-04-08&to_date=2022-07-08";
const streamResponse = await fetch(url);
const streamReader = streamResponse.body.getReader();
const streamInserts = [];
let createTable = true;
while (true) {
const { value, done } = await streamReader.read();
if (done) break;
streamInserts.push(
conn.insertArrowFromIPCStream(value, {
name: "streamed",
create: createTable,
})
);
if (createTable) {
createTable = false;
}
}
console.log(streamInserts.length);
await Promise.all(streamInserts);
// Basic query
console.log("Basic query");
let q = await conn.query(`select count(*)::integer as cnt from streamed`); // Returns v = 101
console.log("Query result (Arrow Table):", q);