Custom MessageChannel bizarre behavior in Node.js 19

108 Views Asked by At

As the title says, I'm experiencing a behavior that I find pretty strange regarding a re-implementation of mine of the MessageChannel provided by Node.js.

The goal of my implementation is to provide better performance for sending UTF-16 strings to another thread.

QUICK DISCLAIMER: This is NOT meant for production use (at least for now). I am just experimenting with an algorithm/proof-of-concept and want to implement it myself. Therefore, I won't accept answers along the lines of "why are you re-inventing the wheel" or "there is already this npm package that perfectly fits your use-case".

The algorithm

My idea is to have two handles: one for writing, which is managed and used by the main thread, and the other one for reading, which I hand to a Worker.

They communicate over a SharedArrayBuffer using two typed arrays: a Uint16Array, which spans across the entire buffer - 8 bytes, and a Int32Array, which spans across those last 8 bytes and serves to store the writer and reader indexes respectively to provide a synchronization mechanism.

The writing handle enqueues the strings it gets to write by pushing them inside an array. After setImmediate fires, it flushes the strings by writing their length in the shared buffer at the current writing index and then encoding the strings themselves one code point at a time. The process is repeated for each string until the queue is eventually cleared while also wrapping around the end of the buffer if necessary.

This means that if one were to write "Hello" and " world!" as two separate strings, at the end of the flush operation, the buffer would look like this: [5, 72, 101, 108, 108, 111, 7, 32, 119, 111, 114, 108, 100, 33, ..., 14, 0] Where the first element (5) is the length of "Hello", the following 5 numbers are the UTF-16 encoded characters, the following element (7) is the length of " world!", the following 7 numbers are again the encoded string, and the last two numbers (14 and 0) are the writer and reader indexes respectively. After this flush, the next writing operation will take place starting from the 15th element of the buffer.

The reading handle is just a generator that waits to be notified of a change in writer's index, which happens after the queue is flushed. When this happens, the reading handle reads the value located at the current reading index as the length of the next piece of string to yield. It then decodes and yields the first string of the original queue. The process goes on until the reading handle catches up with the writing handle, at which point it goes to sleep and waits to be notified to start reading again.

The implementation

import {nextTick} from "node:process"
import {isMainThread, Worker, workerData, type WorkerOptions as _WO} from "node:worker_threads"

/**
 * Key used to access the {@link SharedArrayBuffer} in worker data.
 */
const kSharedBuffer = "__$SharedBuffer$" as const

export class WorkerStream {
    ////////////////////////////////////////////////////////////////////////////
    // region Buffer
    ////////////////////////////////////////////////////////////////////////////

    /**
     * The writable section of the {@link SharedArrayBuffer}.
     */
    readonly #buffer: Uint16Array

    /**
     * The section of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
     * of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
     */
    readonly #stateBuffer: Int32Array

    /**
     * Wraps an index around the buffer.
     *
     * @throws {RangeError} If the index is negative.
     * @throws {RangeError} If the index needs to be wrapped around more than once.
     */
    private wrapIndex(index: number): number {
        if (index < 0) {
            throw new RangeError("Index cannot be negative")
        }

        const {length} = this.#buffer
        if (index >= length * 2) {
            throw new RangeError("Index cannot be wrapped around more than once")
        }

        return (index >= length) ? index - length : index
    }

    ////////////////////////////////////////////////////////////////////////////
    // endregion
    ////////////////////////////////////////////////////////////////////////////

    ////////////////////////////////////////////////////////////////////////////
    // region Data Queue
    ////////////////////////////////////////////////////////////////////////////

    /**
     * The maximum length of a single string in UTF-16 code units.
     */
    static readonly MAX_STRING_LENGTH = 10_000

    /**
     * The queue of strings to be written to the {@link WorkerStreamHandle}.
     */
    readonly #dataQueue: string[] = []

    /**
     * The number of cells in the buffer required to flush the data queue.
     */
    #dataQueueSize = 0

    /**
     * Flushes the queue of strings to the {@link buffer}.
     */
    flush() {
        const {length: bufferLength} = this.#buffer

        let [w] = this.#stateBuffer
        while (this.#dataQueue.length > 0) {
            const s = this.#dataQueue.shift()!

            this.#buffer[w] = s.length
            for (let i = 0; i < s.length; i++) {
                if (++w === bufferLength) {
                    w = 0
                }

                this.#buffer[w] = s.charCodeAt(i)
            }
            if (++w === bufferLength) {
                w = 0
            }
        }

        this.#dataQueueSize = 0
        Atomics.store(this.#stateBuffer, 0, w)
        Atomics.notify(this.#stateBuffer, 0)
    }

    ////////////////////////////////////////////////////////////////////////////
    // endregion
    ////////////////////////////////////////////////////////////////////////////

    /**
     * @throws {RangeError} If the buffer size is less than 2 * {@link Int32Array.BYTES_PER_ELEMENT}.
     */
    constructor(
        workerPath: URL | string,
        workerOptions: Omit<_WO, "workerData"> & { workerData?: Record<string | symbol, any> } = {},
        unrefWorker = true,
        bufferSize: number = 4 * 1024 * 1024,
    ) {
        if (bufferSize < 2 * Int32Array.BYTES_PER_ELEMENT) {
            throw new RangeError("Buffer size must be at least 2 * Int32Array.BYTES_PER_ELEMENT")
        }

        const buffer = new SharedArrayBuffer(bufferSize)
        this.#buffer = new Uint16Array(
            buffer,
            0,
            (bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
        )
        this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)

        workerOptions.workerData ??= {}
        workerOptions.workerData[kSharedBuffer] = buffer
        const worker = new Worker(workerPath, workerOptions)
        worker.once("exit", () => {
            throw new Error("Worker exited unexpectedly")
        })
        worker.once("error", (err) => {
            throw err
        })
        if (unrefWorker) {
            worker.unref()
        }
    }

    ////////////////////////////////////////////////////////////////////////////
    // region Streaming
    ////////////////////////////////////////////////////////////////////////////

    /**
     * Writes a string to the {@link WorkerStream}.
     *
     * # Summary
     *
     * This method enqueues a string to be streamed to the {@link WorkerStreamHandle}
     * as soon as {@link setImmediate} allows it.
     *
     * The string is encoded using UTF-16, so it can contain any Unicode character,
     * and its length is limited to {@link MAX_STRING_LENGTH}.
     *
     * If there is not enough space in the buffer to write the string,
     * calling this method will return `false` and the string will not be enqueued.
     * You are advised to check with {@link canWrite} before calling this method.
     */
    write(s: string): boolean {
        if (!this.canWrite(s)) {
            return false
        }

        this.#dataQueue.push(s)
        if (this.#dataQueueSize === 0) {
            this.#dataQueueSize++
            setImmediate(() => this.flush())
        }
        this.#dataQueueSize += s.length + 1
        return true
    }

    /**
     * Determines whether there is enough space in the buffer to write a string.
     */
    canWrite({length}: string): boolean {
        if (length > WorkerStream.MAX_STRING_LENGTH) {
            return false
        }

        const w = this.#stateBuffer[0], r = Atomics.load(this.#stateBuffer, 1)
        const end = this.wrapIndex(w + length + 1)
        const didNotWrapAroundBuffer = end > w
        const endsBehindReader = end < r
        const wasAlreadyBehindReader = w < r

        if (wasAlreadyBehindReader) {
            return endsBehindReader && didNotWrapAroundBuffer
        } else {
            return endsBehindReader || didNotWrapAroundBuffer
        }
    }

    ////////////////////////////////////////////////////////////////////////////
    // endregion
    ////////////////////////////////////////////////////////////////////////////
}

export class WorkerStreamHandle {
    ////////////////////////////////////////////////////////////////////////////
    // region Buffer
    ////////////////////////////////////////////////////////////////////////////

    /**
     * The readable section of the {@link SharedArrayBuffer}.
     */
    readonly #buffer: Uint16Array

    /**
     * The portion of the {@link SharedArrayBuffer} used to store the atomic W/R indexes
     * of the {@link WorkerStream} and the {@link WorkerStreamHandle} respectively.
     */
    readonly #stateBuffer: Int32Array

    ////////////////////////////////////////////////////////////////////////////
    // endregion
    ////////////////////////////////////////////////////////////////////////////

    /**
     * @throws {Error} If instantiated from the main thread.
     * @throws {Error} If the {@link SharedArrayBuffer} is not found in {@link workerData}.
     */
    constructor() {
        if (isMainThread) {
            throw new Error("WorkerStreamHandle can only be used inside a worker")
        }

        const buffer = workerData[kSharedBuffer]
        if (!(buffer instanceof SharedArrayBuffer)) {
            throw new Error("SharedBuffer not found in workerData; did you instantiate WorkerStreamHandle from the right worker?")
        }
        delete workerData[kSharedBuffer]

        const {byteLength: bufferSize} = buffer
        this.#buffer = new Uint16Array(
            buffer,
            0,
            (bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT) / Uint16Array.BYTES_PER_ELEMENT,
        )
        this.#stateBuffer = new Int32Array(buffer, bufferSize - 2 * Int32Array.BYTES_PER_ELEMENT)
    }

    ////////////////////////////////////////////////////////////////////////////
    // region Streaming
    ////////////////////////////////////////////////////////////////////////////

    #isStreaming = false

    async* [Symbol.asyncIterator](): AsyncIterableIterator<string> {
        if (this.#isStreaming) {
            throw new Error("Already streaming data")
        }
        this.#isStreaming = true

        const {length: bufferLength} = this.#buffer
        while (true) {
            let r = this.#stateBuffer[1]
            await new Promise<void>((resolve) => {
                nextTick(() => {
                    Atomics.wait(this.#stateBuffer, 0, r)
                    resolve()
                })
            })

            const length = this.#buffer[r]
            const buffer = new Uint16Array(length)
            for (let i = 0; i < length; i++) {
                if (++r >= bufferLength) {
                    r = 0
                }

                buffer[i] = this.#buffer[r]
            }

            this.#stateBuffer[1] = (++r >= bufferLength) ? 0 : r
            yield String.fromCharCode(...buffer)
        }
    }

    static async* incoming(): AsyncIterableIterator<string> {
        yield* new WorkerStreamHandle()
    }

    ////////////////////////////////////////////////////////////////////////////
    // endregion
    ////////////////////////////////////////////////////////////////////////////
}

PS: I'm using private fields to make the buffer inaccessible to the outside to prevent malicious interactions to the underlying buffer

The issue

This implementation worked fine until yesterday: to make sure that the underlying worker was receiving the stream of text, I've written these tests:

logger.test.ts

import {equal, ok, throws} from "node:assert/strict"
import {execSync} from "node:child_process"
import {MessageChannel, Worker} from "node:worker_threads"
import {WorkerStream} from "../src/logger/stream"

const testingWorkerPath = new URL("./data/worker.js", import.meta.url)

describe("WorkerStream", () => {
    before(() => {
        // Suppress the "Worker exited unexpectedly" error
        // so that we can manually close Workers to stop tests
        // without having Mocha crying about uncaught exceptions
        const original = process.listeners("uncaughtException").pop()!
        process.removeListener("uncaughtException", original)
        process.on("uncaughtException", (err) => {
            if (err.message !== "Worker exited unexpectedly") {
                original(err, "uncaughtException")
            }
        })
    })

    it("writes data to the worker", async () => {
        const {port1: workerPort, port2: parentPort} = new MessageChannel()
        const stream = new WorkerStream(
            testingWorkerPath,
            {workerData: {parentPort}, transferList: [parentPort]},
        )
        stream.write("Hello World!")

        const receivedMessage = await new Promise((resolve) => {
            workerPort.on("message", resolve)
        })
        equal(receivedMessage, "Hello World!")

        stream.write("done")
    })

    it("wraps around when it reaches the end of the buffer", async () => {
        const DATA = [
            "ab".repeat(15),
            "bc".repeat(25),
            "cd".repeat(35),
            "de".repeat(35),
            "ef".repeat(35),
            "fg".repeat(35),
        ]

        const {port1: workerPort, port2: parentPort} = new MessageChannel()
        const stream = new WorkerStream(
            testingWorkerPath,
            {workerData: {parentPort}, transferList: [parentPort]},
            true,
            1024,
        )

        for (const data of DATA) {
            const iterations = Math.floor(1024 / (data.length * 2) - 1)
            for (let _ = 0; _ < 15; _++) {
                for (let i = 0; i < iterations; i++) {
                    ok(stream.write(data))
                }
                for (let i = 0; i < iterations; i++) {
                    equal(
                        await new Promise((resolve) => {
                            workerPort.on("message", resolve)
                        }),
                        data,
                    )
                }
            }
        }

        stream.write("done")
    })
})

data/worker.js

import {isMainThread, workerData} from "node:worker_threads"
import {WorkerStreamHandle} from "../../src/logger/stream.js"

if (isMainThread) {
    throw new Error("This file must be run as a worker")
}

/**
 * @type {import("node:worker_threads").MessagePort}
 */
const parentPort = workerData.parentPort

/**
 * Incoming stream of text.
 */
for await (const chunk of WorkerStreamHandle.incoming()) {
    if (chunk === "done") {
        break
    }

    parentPort.postMessage(chunk)
}

All the tests above pass like a charm. Every message that goes into a worker is always reported correctly. No data corruption, no data races, no issues whatsoever even when wrapping around the buffer.

The issue arises when I try to use the WorkerStream in a more """realistic""" environment like this:

import {isMainThread} from "node:worker_threads"
import {WorkerStreamHandle} from "./logger/stream"

if (isMainThread) {
    throw new Error("This file must be run as a worker")
}

for await (const chunk of WorkerStreamHandle.incoming()) {
    console.log(chunk)
}

When I tried to run this worker by sending it 2 messages, only 1 seemed to make it to the other side. In short, the issue is that in any setup other than the testing one only the first string ever makes it to the other side of the channel; all other strings seem to "disappear".

Things I tried

  • Initially, I thought that maybe the worker needed more time to receive and log the second message, but even when refing the worker or keeping the main thread artificially alive via promises/timers/sockets/other streams, nothing happens.
  • I then tried to make the reader's generator synchronous, nothing changed
  • Then it was time to add some console logs to see if I could quickly identify the culprit, and this is where the bizarre part comes in: not all debug messages where logged, even those that were literally one after the other with no code whatsoever in between
  • I finally decided to open the debugger and the profiler, only to see that with the debugger open and at least one breakpoint set everything works fine (!??!?)
  • I tried to ditch generators in favor of a next() method and, while I was "dismounting" the generator I tried to call its next() method twice manually in the worker instead of using a loop statement (for await (of)) and, to my extreme surprise, both messages where logged successfully.
  • I then swapped the for await (of) loop for a while (true) loop that manually awaits the next value... AND THIS TIME IT DIDN'T WORK!

Finally, after this pretty long introduction, my questions are:

How in the world the reading handle hangs up/blocks/paralyzes the worker thread when used inside a for/while loop without yielding anything after the first string but works like a charm when iterated and awaited manually without any looping system? Why does it work while being debugged or during tests? Is there any synchronization issue or is it something much worse related to the algorithm's design itself? Should I empty the buffer all at once because it is right to not be working with one string at a time or am I right to think that it should work even when decoding and reading one string at a time?

1

There are 1 best solutions below

0
On BEST ANSWER

I finally figured it out!

The "culprit" was the system that handles backpressure built into Node.js Streams.

All I needed to do was transforming the reading loop in the worker into a recursive function proxied by setImmediate.