Cloudflare R2 returning zeros in stream using workers and pages

142 Views Asked by At

I am looking to use R2 as a store for a data structure that is a 2D array of Uint32, but when I read the stored object from my worker, I am receiving unexpected results.

The goal is to create a worker that receives a series of updates as JSON, reads the existing stream and applies the changes as it writes them back to the R2 object. I implemented this as a TransformStream

let transformer = DataShapeStreamer(fields, req.doc);
let doc = await facts.get(key);
let stm = stm.body.pipeThrough(transformer);
let rtn = await context.env.facts.put(req.key,stm);

DataShapeStreamer generates a TransformStream and declares an object to track the buffer, fields and inbound data. The stream should contain a header row of 3303 elements starting with 1, ending with 0, and having unique+sorted values in between.

Usually, when I read from the stream, the byte array returned expected values until about element 884 (byte position 3534) at which point it returns 0 in all bytes for a while, then what appear to be random values to the end of the chunk.

Some error detection I put in throws an error on the invalid row.

new TransformStream({
  async transform(inboundBuffer, controller) {
    // make sure it is a Uint8Array
    inboundBuffer = new Uint8Array(inboundBuffer.buffer);
    // if this buffer is already partially processed, remove the front
    params.buffChunk = params.buffChunk.slice(params.buffPos);
    params.buffPos = 0;

    // append the inbound to the existing buffer
    let newBuff = new Uint8Array(inboundBuffer.length+params.buffChunk.length);
    newBuff.set(params.buffChunk);
    newBuff.set(inboundBuffer,params.buffChunk.length);

    // set that as our new buffer of data
    params.buffChunk = newBuff;

    while(params.buffChunk.length-params.buffPos >= 13212){
      let chunk = new Uint32Array(params.buffChunk.buffer, params.buffPos, 3303);
      params.buffPos += 13212;
      if(chunk.at(-1) !== 0 || chunk.at(0) !== 1){
        throw new Error('Issue with field list in old Dataset.');
      }
      //... 
      //... Do the actual transform
      //... 
      result = new Uint32Array(result);
      result = new Uint8Array(result.buffer);
      controller.enqueue(result);
    }
})

The above is simplified as the full code with integration of the data is long.

Am I doing something wrong with my read? Am I misunderstanding how TransformStream works?

NOTES

  • I'm pretty sure something is wrong with the read. When I pipe the stream directly to a response object, I can parse the blob into an HTML report that is as expected.
  • When the transform function is called, it appears to receive more than a single row's worth in the first read (so it hasn't needed to buffer anything)
  • The issue does not occur when using wrangler, only when it is moved to Cloudflare
  • Full Code
0

There are 0 best solutions below