I am looking to use R2 as a store for a data structure that is a 2D array of Uint32, but when I read the stored object from my worker, I am receiving unexpected results.
The goal is to create a worker that receives a series of updates as JSON, reads the existing stream and applies the changes as it writes them back to the R2 object. I implemented this as a TransformStream
let transformer = DataShapeStreamer(fields, req.doc);
let doc = await facts.get(key);
let stm = stm.body.pipeThrough(transformer);
let rtn = await context.env.facts.put(req.key,stm);
DataShapeStreamer
generates a TransformStream
and declares an object to track the buffer, fields and inbound data. The stream should contain a header row of 3303 elements starting with 1
, ending with 0
, and having unique+sorted values in between.
Usually, when I read from the stream, the byte array returned expected values until about element 884 (byte position 3534) at which point it returns 0
in all bytes for a while, then what appear to be random values to the end of the chunk.
Some error detection I put in throws an error on the invalid row.
new TransformStream({
async transform(inboundBuffer, controller) {
// make sure it is a Uint8Array
inboundBuffer = new Uint8Array(inboundBuffer.buffer);
// if this buffer is already partially processed, remove the front
params.buffChunk = params.buffChunk.slice(params.buffPos);
params.buffPos = 0;
// append the inbound to the existing buffer
let newBuff = new Uint8Array(inboundBuffer.length+params.buffChunk.length);
newBuff.set(params.buffChunk);
newBuff.set(inboundBuffer,params.buffChunk.length);
// set that as our new buffer of data
params.buffChunk = newBuff;
while(params.buffChunk.length-params.buffPos >= 13212){
let chunk = new Uint32Array(params.buffChunk.buffer, params.buffPos, 3303);
params.buffPos += 13212;
if(chunk.at(-1) !== 0 || chunk.at(0) !== 1){
throw new Error('Issue with field list in old Dataset.');
}
//...
//... Do the actual transform
//...
result = new Uint32Array(result);
result = new Uint8Array(result.buffer);
controller.enqueue(result);
}
})
The above is simplified as the full code with integration of the data is long.
Am I doing something wrong with my read? Am I misunderstanding how TransformStream
works?
NOTES
- I'm pretty sure something is wrong with the read. When I pipe the stream directly to a response object, I can parse the blob into an HTML report that is as expected.
- When the transform function is called, it appears to receive more than a single row's worth in the first read (so it hasn't needed to buffer anything)
- The issue does not occur when using
wrangler
, only when it is moved to Cloudflare - Full Code