Problems streaming arrow to DuckDB wasm

541 Views Asked by At

Apologies in advance for the wall of text

I am working on a PoC to see whether we can expose data from our Data warehouse in an relatively effective effective way out to a browser client.

Data warehouse: Bigquery

API: Bigquery storage read api

One of approaches that I'm trying to get working is utilizing BigQuery's Storage Read API that allows us to open a stream (in a Arrow IPC RecordBatch format).

Currently I have tried to create a simple Python API that invokes the BigQuery Storage Read API to then stream the response back to the client (i.e. browser client running DuckDB wasm).

Note: I've made some smaller tests and I can see that the stream from the Bigquery Storage API only contains the schema as part of the first message. Which I understand aligns to the arrow streaming format.

DuckDB Wasm - according to the docs is capable of loading data from an arrow IPC stream so that's what I am looking to use.

However I am currently getting this error in the client when making the request to the python (streaming) api.

async_bindings.ts:150 Uncaught Error: Expected IPC message of type schema but got record batch

But I don't quite understand what I am doing wrong here, any help would be appreciated?

Python API

bq_storage_reader.py

def bq_storage_reader(
    client: BigQueryReadClient,
    source_model: BQStorageQuery,
    streams: int = 1
) -> tuple[ReadSession, list[ReadRowsStream]]:

    table = source_model.source.table
    dataset = source_model.source.dataset
    project = source_model.source.project

    read_session = types.ReadSession()
    read_session.table = BIGQUERY_TABLE_TEMPLATE.format(
        project, dataset, table)
    read_session.data_format = types.DataFormat.ARROW

    read_session.read_options.selected_fields = source_model.fields
    read_session.read_options.row_restriction = 'AND '.join(
        source_model.record_filters)

    session = client.create_read_session(
        parent=PARENT_PROJECT.format(project),
        read_session=read_session,
        max_stream_count=streams,
    )

    # message_reader = pa.ipc.open_stream(session.arrow_schema.serialized_schema)
    # schema = message_reader.schema

    readers = [client.read_rows(stream.name) for stream in session.streams]

    return session, readers

get_record_batches_async.py

async def get_record_batches_async(session: ReadSession, stream: ReadRowsStream):
    # only first message from the Iterable[stream] has the schema
    for record_batch in stream:
        yield record_batch.arrow_record_batch.serialized_record_batch
 

main.py

client = bigquery_storage_v1.BigQueryReadClient()
    
router = APIRouter()

@router.get('/stores/{store_id}/stream', tags=["pos"])
async def get_stream(
        store_id: str,
        from_date: str = None,
        to_date: str = None):

    logger.info({'store': store_id, 'from': from_date, 'to': to_date})

    bsq = BQStorageQuery(
        fields=None,
        record_filters=[f"store_identifier in ('{store_id}')",
                        f"d_date_sale between '{from_date}' AND '{to_date}'"],
        source=BaseSource(
            table='<tablename>',
            dataset='<dataset>',
            project='<gcp-project>',
            partition_cols=['<partition_col>']
        )
    )

    session, streams = bq_storage_reader(client, bsq)

    if streams is None or len(streams) == 0:
        raise HTTPException(status_code=404, detail="data not found")

    return StreamingResponse(
        get_record_batches_async(session=session, stream=streams[0])
    )

Client (duckdb wasm)

const bundle = await duckdb.selectBundle(MANUAL_BUNDLES);

// Instantiate the asynchronus version of DuckDB-wasm
const worker = new Worker(bundle.mainWorker);
const logger = new duckdb.ConsoleLogger();
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(bundle.mainModule, bundle.pthreadWorker);

const conn = await db.connect(); // Connect to db

const queryParams = {
  from_date: "2022-04-08",
  to_date: "2022-07-08",
};

const url =
  "http://localhost:8081/stores/1003446/stream?from_date=2022-04-08&to_date=2022-07-08";

const streamResponse = await fetch(url);
const streamReader = streamResponse.body.getReader();
const streamInserts = [];
let createTable = true;
while (true) {
  const { value, done } = await streamReader.read();
  if (done) break;
  streamInserts.push(
    conn.insertArrowFromIPCStream(value, {
      name: "streamed",
      create: createTable,
    })
  );
  if (createTable) {
    createTable = false;
  }
}
console.log(streamInserts.length);
await Promise.all(streamInserts);

// Basic query
console.log("Basic query");
let q = await conn.query(`select count(*)::integer as cnt from streamed`); // Returns v = 101
console.log("Query result (Arrow Table):", q);
0

There are 0 best solutions below