splice/shift first n bytes of AsyncIterable<Uint8Array>

200 Views Asked by At

reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.

Is there a better way to implement this?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}
2

There are 2 best solutions below

3
Mulan On BEST ANSWER

stream primitives

We'll start by defining stream primitives -

flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
  for await (const a of t) {
    yield *a
  }
}

async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- <= 0) return
    yield v 
  }
  if (n > 0) throw Error("buffer underflow")
}

async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- > 0) continue
    yield v
  }
  if (n > 0) throw Error("buffer underflow")
}

async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
  const r = []
  for await (const v of t) r.push(v)
  return r
}

shift

Using these stream primitives, we can write shift in a comfortable and safe way -

shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
  return [
    new Uint8Array(await toArray(take(flatten(stream), count))),
    skip(flatten(stream), count)
  ] as const
}

Let's create a mock buffer and test it -

const buffer: AsyncIterable<Uint8Array> = {
  async *[Symbol.asyncIterator]() {
    for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
      yield new Uint8Array(v)
      await new Promise(r => setTimeout(r, 100))
    }
  }
}

async function main() {
  const [first, rest] = await shift(buffer, 4)
  console.log({
    first: Array.from(first),
    rest: await toArray(rest)
  })
}

main().then(console.log, console.error)
{
  first: [0, 1, 2, 3],
  rest: [4, 5, 6, 7, 8, 9]
}

demo

Run and verify the result on the typescript playground

0
Bergi On

I think the iterator logic itself can be simplified by using a notClosing helper and normal iteration:

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);
  const iter = stream[Symbol.asyncIterator]();
  let offset = 0;
  for await (const chunk of notClosing(iter)) {
    if (chunk.length < length - offset) {
      prefix.set(chunk, offset);
      offset += chunk.length;
    } else {
      const slice = chunk.slice(0, length - offset);
      prefix.set(slice, offset);
      return [prefix, prepend(chunk.slice(slice.length), iter)];
    }
  }
  throw new Error("Buffer underflow");
}

Unless you want to convert the stream from an iterator of chunks into a much less efficient iterator of individual bytes, there's nothing you can further simplify about the offset logic.

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype)) as AsyncIterator<any>;
function prepend<T>(val: T, iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    first: true,
    next() {
      if (this.first) {
        const res = {done: false, value: val};
        val = undefined!; // GC
        this.first = false;
        return res;
      }
      return iter.next();
    },
    return: iter.return ? () => iter.return!() : undefined,
  });
}
function notClosing<T>(iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    next: iter.next.bind(iter),
  });
}