NextJS 14 SSE with TransformStream() sending messages in a single response

41 Views Asked by At

I am trying to implement SSE in NextJS 14 to update the user while processing some data. Since the data is provided by the user, it needs to be a POST request, so I can't use EventSource but have to use fetch().

I got the client side working like this:

"use client";

import { useState } from "react";

export default function Home() {
    const [message, setMessage] = useState("");

    async function onClick() {
        const response = await fetch("/api/test2", {
            method: "POST",
            headers: {
                "Content-Type": "application/json",
            },
            body: JSON.stringify({}),
        });

        const reader = response.body?.getReader();
        if (!reader) return;

        let decoder = new TextDecoder();
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            if (!value) continue;
            const lines = decoder.decode(value);
            const text = lines
                .split("\n")
                .filter((line) => line.startsWith("data:"))[0]
                .replace("data:", "")
                .trim();

            setMessage((prev) => prev + text);
        }
    }

    return (
        <div>
            <button onClick={onClick}>START</button>
            <p>{message}</p>
        </div>
    );
}

For the server side, by googling I found a code that works like this:

import { NextRequest, NextResponse } from "next/server";

export async function POST(req: NextRequest, res: NextResponse) {
    const { readable, writable } = new TransformStream();
    const writer = writable.getWriter();

    const text = "Some test text";

    let index = 0;
    const interval = setInterval(() => {
        if (index < text.length) {
            writer.write(`event: message\ndata: ${text[index]}\n\n`);
            index++;
        } else {
            writer.write(`event: message\ndata: [DONE]\n\n`);
            clearInterval(interval);
            writer.close();
        }
    }, 1);

    return new NextResponse(readable, {
        headers: {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            Connection: "keep-alive",
        },
    });
}

The problem is that I need to send messages after some functions are done processing not in an interval.

Something like:

import { NextRequest, NextResponse } from "next/server";

export async function POST(req: NextRequest, res: NextResponse) {
    const { readable, writable } = new TransformStream();
    const writer = writable.getWriter();

    writer.write(`event: "start"\ndata:"Process 1"\n\n`)
    await processThatTakesTime(); //something like clicking a button with puppeteer
    writer.write(`event: "done"\ndata:"Process 1"\n\n`)

    writer.write(`event: "start"\ndata:"Process 2"\n\n`)
    await anotherProcess(); //something like clicking a button with puppeteer
    writer.write(`event: "done"\ndata:"Process 2"\n\n`)

    writer.close();

    return new NextResponse(readable, {
        headers: {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            Connection: "keep-alive",
        },
    });
}

This code just sends all the messages in a single response, or even worse: closes the writer before even writing the messages to it.

I have tried adding "new Promise((resolve) => setTimeout(resolve, ms));" between them and all but nothing seemed to work.

  1. add a sleep function between the writes/close
  2. awaiting all the writes and close.
  3. turning them into a promise like
await new Promise<void>((resolve)=>{
    setTimeout(()=>{
        writer.write(`event: "start"\ndata:"Process 1"\n\n`);
        resolve();
    ),100}
});
1

There are 1 best solutions below

1
On

It seems the key issue in you approach is the asynchronous nature of the processing functions (processThatTakesTime() and anotherProcess()). When you call these functions without waiting for them to complete, JavaScript continues to execute the next lines of code without blocking. This is why all your messages seem to be sent at once, or the connection closes unexpectedly.

To manage this, you should ensure that your processing functions return Promises. This way, you can await them, ensuring that the server waits for each process to complete before sending the next message. Let's refactor your server-side code with proper async handling:

import { NextApiRequest, NextApiResponse } from 'next';

async function processThatTakesTime() {
  // Simulate a long-running process
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve('Process 1 Complete');
    }, 1000); // Simulate delay
  });
}

async function anotherProcess() {
  // Simulate another long-running process
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve('Process 2 Complete');
    }, 1000); // Simulate delay
  });
}

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  if (req.method === 'POST') {
    const { readable, writable } = new TransformStream();
    const writer = writable.getWriter();

    // Set headers for SSE
    res.setHeader("Content-Type", "text/event-stream");
    res.setHeader("Cache-Control", "no-cache");
    res.setHeader("Connection", "keep-alive");

    // Immediately pipe the readable stream to the response object
    readable.pipeTo(res.socket.writable as any).catch(() => {/* Handle error */});

    // Async processing
    try {
      writer.write(`event: start\ndata: Process 1\n\n`);
      await processThatTakesTime();
      writer.write(`event: done\ndata: Process 1\n\n`);

      writer.write(`event: start\ndata: Process 2\n\n`);
      await anotherProcess();
      writer.write(`event: done\ndata: Process 2\n\n`);

      // After all processing is done
      writer.close();
    } catch (error) {
      console.error('Error during event stream:', error);
      writer.close();
    }

    // Do not send any response here because the response is already being streamed
  } else {
    res.status(405).send('Method Not Allowed');
  }
}