I am attempting to use the trpc websockets and got everything setup EXCEPT subscriptions + useSubscriptions. I am getting the following error:
Stream error: TRPCClientError: Subscriptions should use wsLink
Here is my ws file that I import on the client: src/utils/ws.ts
import { createWSClient, loggerLink, wsLink } from '@trpc/client';
import { createTRPCNext } from '@trpc/next';
import getConfig from 'next/config';
import superjson from 'superjson';
import { AppRouterWS } from '~/server/api/root';
const { publicRuntimeConfig } = getConfig();
const { WS_URL } = publicRuntimeConfig;
const client = createWSClient({
url: WS_URL,
});
export const ws = createTRPCNext<AppRouterWS>({
ssr: false,
config() {
return {
links: [
loggerLink({
enabled: (opts) =>
(process.env.NODE_ENV === 'development' && typeof window !== 'undefined') ||
(opts.direction === 'down' && opts.result instanceof Error),
}),
wsLink({
client,
}),
],
transformer: superjson,
};
},
});
Here is the code I am trying to use the useSubscription:
import { ws } from '~/utils/ws';
ws.chatWS.stream.useSubscription(undefined, {
onStarted() {
console.log('STARTING');
},
onData(data: { message: any; status: 'STREAMING' | 'ERROR' }) {
console.log(data);
setMessages((prev) => [
...prev,
{
id: currentMessageId,
type: 'response',
content: data.message,
status: data.status === 'STREAMING' ? 'streaming' : data.status,
error: data.status === 'ERROR',
},
]);
},
onError(error) {
console.log('ASDASD');
console.error('Stream error:', error);
},
});
Here is my backend router: src/server/api/routers/websocket/chatWSRouter.ts
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
import { createTRPCRouter, publicProcedure } from '../../trpc';
import awsWsClient from '~/server/wssAwsServer';
export const chatWSRouter = createTRPCRouter({
invokeChatbot: publicProcedure
.input(z.object({ prompt: z.string() }))
.mutation(async ({ input }) => {
if (awsWsClient.isConnected()) {
awsWsClient.send({ action: 'streamChatbot', message: input.prompt });
} else {
console.error('WebSocket is not connected.');
}
}),
invokeAgent: publicProcedure
.input(z.object({ prompt: z.string(), agentAliasId: z.string() }))
.mutation(async ({ input }) => {}),
stream: publicProcedure.subscription(() => {
return observable<{ message: string; status: 'STREAMING' | 'ERROR' }>((emit) => {
const onMessage = (data: { message: string; status: 'STREAMING' | 'ERROR' }) => {
console.log(data);
emit.next(data);
};
awsWsClient.on('stream_chat', onMessage);
});
}),
});
Here is my root.ts file: src/server/api/root.ts
import { fieldRouter } from './routers/rest/fieldRouter';
import { formRouter } from './routers/rest/formRouter';
import { formTemplateRouter } from './routers/rest/formTemplate';
import { transcriptionRouter } from './routers/rest/transcriptionRouter';
import { userGroupRouter } from './routers/rest/userGroupRouter';
import { userRouter } from './routers/rest/userRouter';
import { narrativeRouter } from './routers/rest/narrativeRouter';
import { createTRPCRouter, mergeRouters } from './trpc';
import { narrativeTemplateRouter } from './routers/rest/narrativeTemplate';
import { eventRouter } from './routers/rest/eventRouter';
import { chatWSRouter } from './routers/websocket/chatWSRouter';
import { formWSRouter } from './routers/websocket/formWSRouter';
import { narrativeWSRouter } from './routers/websocket/narrativeWSRouter';
import { transcribeWSRouter } from './routers/websocket/transcribeWSRouter';
/**
* This is the primary router for your server.
*
* All routers added in /api/routers should be manually added here.
*/
export const appRouter = createTRPCRouter({
formTemplate: formTemplateRouter,
field: fieldRouter,
form: formRouter,
transcription: transcriptionRouter,
user: userRouter,
userGroup: userGroupRouter,
narrative: narrativeRouter,
narrativeTemplate: narrativeTemplateRouter,
events: eventRouter,
});
export const appRouterWS = createTRPCRouter({
chatWS: chatWSRouter,
formWS: formWSRouter,
narrativeWS: narrativeWSRouter,
transcibreWS: transcribeWSRouter,
});
export const combinedRouter = mergeRouters(appRouter, appRouterWS);
// export type definition of API
export type AppRouter = typeof appRouter;
export type AppRouterWS = typeof appRouterWS;
export type CombinedRouter = typeof combinedRouter;
When I invoke the streamChatbot it works and returns the result, I can print it through my awsWebsocket server, but when I go to grab the emittion within the chatWSRouter its not picking it up, I imagine because I am getting this in my client console:
>> subscription #1 chatWS.stream Object
gpt.tsx:212 Stream error: TRPCClientError: Subscriptions should use wsLink
at TRPCClientError.from (TRPCClientError-38f9a32a.mjs:44:16)
at eval (httpUtils-b9d0cb48.mjs:139:81)
```
I am using the t3 stack, I cannot find any info on this and I have the wsLink setup so have no idea why the useSubscription is not working, also here is my package.json trpc for versions:
"@trpc/client": "10.45.0",
"@trpc/next": "10.45.0",
"@trpc/react-query": "10.45.0",
"@trpc/server": "10.45.0",
```
Tried finding any and all solutions, changing versions of packages, etc. but nothing works.
Have you checked whether your
WS_URLin.envpoints to correct url (for examplews://localhost:3001)You may also have to use splitlink (wslink for subscription and httpBatchLink for query and mutations.
For example