Is it possible to broadcast with RSockets?

524 Views Asked by At

My question:
I have implemented RSockets on my back-end (java spring boot) and on my front-end (React.js).
My front-end and back-end are able to communicate. However, I would like to know if it is possible to broadcast messages to multiple connected clients from my back-end.
Is this possible?

1

There are 1 best solutions below

0
On

Yes it is possible.

Here is an example of a rsocket endpoint using spring webflux & kotlin coroutines which multicasts events to all it's observers.

This uses MutableSharedFlow, but this can be achieved with a PublishSubject or a Flowable in rxjava3.

In this example a rsocket server broadcasts a timestamp every 30 seconds to all the connected clients.

Controller:

@Controller
class ApiResource(
    private val chatManager: ChatManager
) {
    @MessageMapping("broadcast")
    suspend fun broadcast(): Flow<Message> =
        chatManager.broadcastStream()
}

Stream manager as well as a demo publisher:

@Service
class ChatManager {
    val broadcast = MutableSharedFlow<Message>()
    fun broadcastStream() = broadcast

    @Scheduled(fixedRate = 30 * 1000)
    fun sendBroadcast() = runBlocking{
        broadcast.emit(
            Message(
                "SpringServer",
                System.currentTimeMillis().toString(),
                "broadcast"
            )
        )
    }
}

React component BroadcastApp.tsx:

import React, {useEffect, useState} from 'react';
import {
    BufferEncoders, encodeAndAddCustomMetadata, encodeAndAddWellKnownMetadata,
    MESSAGE_RSOCKET_COMPOSITE_METADATA, MESSAGE_RSOCKET_ROUTING,
    RSocketClient,
    toBuffer, createBuffer
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string // message/x.rsocket.composite-metadata.v0
const dataMimeType = 'application/json';
const websocketEndpoint = 'ws://localhost:5051/rsocket';
const transport = new RSocketWebSocketClient({
    url: websocketEndpoint,
}, BufferEncoders);

const endpoint = 'broadcast';

interface Message {
    senderId: string;
    text: string;
    channelId: string;
}

type CONNECT_STATUS = 'disconnected' | 'connecting' | 'connected';

function BroadcastApp() {
    const defaultMetadata = encodeAndAddWellKnownMetadata(
        createBuffer(0),
        MESSAGE_RSOCKET_ROUTING,
        toBuffer(String.fromCharCode(endpoint.length) + endpoint)
    )

    const constructMetadataWithChannelId = (cid: string) => encodeAndAddCustomMetadata(
        defaultMetadata,
        'messaging/x.chat.client-id',
        toBuffer(String.fromCharCode(cid.length) + cid)
    )

    const client = new RSocketClient({
        setup: {
            keepAlive: 60000,
            lifetime: 180000,
            dataMimeType,
            metadataMimeType,
        },
        transport,
    });

    const [connectStatus, setConnectStatus] = useState<CONNECT_STATUS>('disconnected');
    const [msgs, setMsgs] = useState<string[]>([]);

    useEffect(() => {
        if (connectStatus === 'connecting') {
            console.log(`rsocket client connecting...`);
            client
                .connect()
                .subscribe({
                    onError: error => {
                        console.log(`error: client connect: ${error}`)
                        setConnectStatus('disconnected');
                    },
                    onSubscribe: cancel => {},
                    onComplete: (sock) => {
                        sock.requestStream({
                            metadata: constructMetadataWithChannelId('broadcast')
                        }).subscribe({
                            onSubscribe: (subscription) => {
                                console.log(`rsocket client connected ✅`);
                                setConnectStatus('connected');
                                subscription.request(1000)
                            },
                            onNext: (event:any) => {
                                console.log(`received event from channel: ${JSON.stringify(event)}`);
                                const value = JSON.parse(event.data) as Message;
                                setMsgs(prev => [value.text, ...prev]);
                            },
                            onError: (error) => {
                                console.log(`err with rsocket subscribe: ${error}`)
                            }
                        });
                    }
                });
        }
    }, [connectStatus])
    const handleConnect = () => {
        setConnectStatus('connecting');
    };
    const handleDisconnect = () => {
        alert('todo: implement disconnect');
    }
    return (
        <div style={{ padding: 20}}>

            <div>
                {(connectStatus === 'connected') ? (
                    <div style={{margin: 10, fontSize: 18}}>
                        <button
                            onClick={handleDisconnect}
                            style={{
                                padding: 10,
                                borderRadius: 10,
                                fontSize: 28,
                                margin: 10,
                            }}
                        >
                            Disconnect
                        </button>
                    </div>
                ) : (<div>
                    {connectStatus === 'disconnected' ? (<button
                        onClick={handleConnect}
                        style={{
                            padding: 10,
                            borderRadius: 10,
                            fontSize: 28,
                        }}
                    >
                        Connect
                    </button>) : (
                        <></>
                    )}
                </div>)}

            </div>
            <div>
                {msgs.map(item => (
                    <div
                        style={{
                            backgroundColor: 'lightgreen',
                            fontSize: 18,
                            width: 300,
                            padding: 10,
                            borderRadius: 10,
                            margin: 10,
                        }}>
                        {item}
                    </div>
                ))}
            </div>
        </div>
    );
}

export default BroadcastApp;