T3 Stack: Stream error: TRPCClientError: Subscriptions should use wsLink

71 Views Asked by At

I am attempting to use the trpc websockets and got everything setup EXCEPT subscriptions + useSubscriptions. I am getting the following error:

Stream error: TRPCClientError: Subscriptions should use wsLink

Here is my ws file that I import on the client: src/utils/ws.ts

import { createWSClient, loggerLink, wsLink } from '@trpc/client';
import { createTRPCNext } from '@trpc/next';
import getConfig from 'next/config';
import superjson from 'superjson';
import { AppRouterWS } from '~/server/api/root';

const { publicRuntimeConfig } = getConfig();

const { WS_URL } = publicRuntimeConfig;

const client = createWSClient({
  url: WS_URL,
});

export const ws = createTRPCNext<AppRouterWS>({
  ssr: false,
  config() {
    return {
      links: [
        loggerLink({
          enabled: (opts) =>
            (process.env.NODE_ENV === 'development' && typeof window !== 'undefined') ||
            (opts.direction === 'down' && opts.result instanceof Error),
        }),
        wsLink({
          client,
        }),
      ],
      transformer: superjson,
    };
  },
});

Here is the code I am trying to use the useSubscription:

import { ws } from '~/utils/ws';

ws.chatWS.stream.useSubscription(undefined, {
    onStarted() {
      console.log('STARTING');
    },
    onData(data: { message: any; status: 'STREAMING' | 'ERROR' }) {
      console.log(data);
      setMessages((prev) => [
        ...prev,
        {
          id: currentMessageId,
          type: 'response',
          content: data.message,
          status: data.status === 'STREAMING' ? 'streaming' : data.status,
          error: data.status === 'ERROR',
        },
      ]);
    },
    onError(error) {
      console.log('ASDASD');
      console.error('Stream error:', error);
    },
  });

Here is my backend router: src/server/api/routers/websocket/chatWSRouter.ts

import { observable } from '@trpc/server/observable';
import { z } from 'zod';
import { createTRPCRouter, publicProcedure } from '../../trpc';
import awsWsClient from '~/server/wssAwsServer';

export const chatWSRouter = createTRPCRouter({
  invokeChatbot: publicProcedure
    .input(z.object({ prompt: z.string() }))
    .mutation(async ({ input }) => {
      if (awsWsClient.isConnected()) {
        awsWsClient.send({ action: 'streamChatbot', message: input.prompt });
      } else {
        console.error('WebSocket is not connected.');
      }
    }),
  invokeAgent: publicProcedure
    .input(z.object({ prompt: z.string(), agentAliasId: z.string() }))
    .mutation(async ({ input }) => {}),
  stream: publicProcedure.subscription(() => {
    return observable<{ message: string; status: 'STREAMING' | 'ERROR' }>((emit) => {
      const onMessage = (data: { message: string; status: 'STREAMING' | 'ERROR' }) => {
        console.log(data);
        emit.next(data);
      };
      awsWsClient.on('stream_chat', onMessage);
    });
  }),
});

Here is my root.ts file: src/server/api/root.ts

import { fieldRouter } from './routers/rest/fieldRouter';
import { formRouter } from './routers/rest/formRouter';
import { formTemplateRouter } from './routers/rest/formTemplate';
import { transcriptionRouter } from './routers/rest/transcriptionRouter';
import { userGroupRouter } from './routers/rest/userGroupRouter';
import { userRouter } from './routers/rest/userRouter';
import { narrativeRouter } from './routers/rest/narrativeRouter';
import { createTRPCRouter, mergeRouters } from './trpc';
import { narrativeTemplateRouter } from './routers/rest/narrativeTemplate';
import { eventRouter } from './routers/rest/eventRouter';
import { chatWSRouter } from './routers/websocket/chatWSRouter';
import { formWSRouter } from './routers/websocket/formWSRouter';
import { narrativeWSRouter } from './routers/websocket/narrativeWSRouter';
import { transcribeWSRouter } from './routers/websocket/transcribeWSRouter';

/**
 * This is the primary router for your server.
 *
 * All routers added in /api/routers should be manually added here.
 */
export const appRouter = createTRPCRouter({
  formTemplate: formTemplateRouter,
  field: fieldRouter,
  form: formRouter,
  transcription: transcriptionRouter,
  user: userRouter,
  userGroup: userGroupRouter,
  narrative: narrativeRouter,
  narrativeTemplate: narrativeTemplateRouter,
  events: eventRouter,
});

export const appRouterWS = createTRPCRouter({
  chatWS: chatWSRouter,
  formWS: formWSRouter,
  narrativeWS: narrativeWSRouter,
  transcibreWS: transcribeWSRouter,
});

export const combinedRouter = mergeRouters(appRouter, appRouterWS);

// export type definition of API
export type AppRouter = typeof appRouter;

export type AppRouterWS = typeof appRouterWS;

export type CombinedRouter = typeof combinedRouter;

When I invoke the streamChatbot it works and returns the result, I can print it through my awsWebsocket server, but when I go to grab the emittion within the chatWSRouter its not picking it up, I imagine because I am getting this in my client console:

>> subscription #1 chatWS.stream Object
gpt.tsx:212 Stream error: TRPCClientError: Subscriptions should use wsLink
    at TRPCClientError.from (TRPCClientError-38f9a32a.mjs:44:16)
    at eval (httpUtils-b9d0cb48.mjs:139:81)
    ```

I am using the t3 stack, I cannot find any info on this and I have the wsLink setup so have no idea why the useSubscription is not working, also here is my package.json trpc for versions:
"@trpc/client": "10.45.0",
"@trpc/next": "10.45.0",
"@trpc/react-query": "10.45.0",
"@trpc/server": "10.45.0",
```

Tried finding any and all solutions, changing versions of packages, etc. but nothing works.

1

There are 1 best solutions below

0
J K On

Have you checked whether your WS_URL in .env points to correct url (for example ws://localhost:3001)

You may also have to use splitlink (wslink for subscription and httpBatchLink for query and mutations.

For example

export const trpc = createTRPCNext<AppRouter>({
  config() {
    return {
      links: [
        loggerLink({
          enabled: (opts) =>
            process.env.NODE_ENV === 'development' || (opts.direction === 'down' && opts.result instanceof Error)
        }),
        splitLink({
          condition: (op) => op.type === 'subscription',
          false: httpBatchLink({ url: 'http://localhost:3001/trpc' }),
          true: wsLink({ client: createWSClient({ url: 'ws://localhost:3001/trpc' }) })
        })
      ],
      queryClientConfig: {
        defaultOptions: {
          queries: {
            refetchOnWindowFocus: false
          }
        }
      }
    }
  },
  ssr: false
})