FlowableProcessor rsocket-js typescript

205 Views Asked by At

I'm trying to built a chat frontend with rsocket-websocket-client. I'm able to send message from frontend using requestChannel(new Flowable(source...)) and receive message using requestChannel(new Flowable.just({metatdata})).

I was trying to use FlowableProcessor to reduce two invocations of requestChannel into one.

Couldn't find documentation on FlowableProcessor for rsocket.

Here is my attempt:

const processor = new FlowableProcessor(
    new Flowable(source => {
        source.onSubscribe({
            cancel: () => {},
            request: n => {}
        });
        source.onNext({
            metadata: constructMetadataWithChannelId(channelId),
        });
    })
);
sock.requestChannel(processor.map(item => item))
    .subscribe({
        onComplete: () => {
            console.log(
                `complted subscribe`,
            );
        },
        onError: error1 => {
            console.log(
                `subscriber err: ${error1}`,
            );
        },
        onSubscribe: subscription => {
            console.log(
                `onSubscribe`,
            );
            setConnectStatus('connected');
            setChannelIdDone(true);
            subscription.request(1000);
        },
        onNext: (val: any) => {
            const value = JSON.parse(val) as Message;
            console.log(
                `received event from channel: ${JSON.stringify(
                                            value,
                                        )}`,
            );
        }
    })

I understand it's type issue. Not able to figure out where processor.map(item => item) is erroring out.

TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take
1

There are 1 best solutions below

0
On

The error is trivial. FlawableProcessor cannot be used cause it does not implement the same interface as Flawable.

At the moment rsocket-js is not well polished and has some flaws. Some of such flaws is inconsistent type usage. AFAIU it's supposed IPublisher and ISubscriber interfaces should be used in all other public interfaces. But for writer's simplicity (I guess) they're replaced with Flowable and Single types.

According to the source code FlowableProcessor does not extend Flowable but rather implements IPublisher, ISubscriber and ISubscription interfaces itself and does not implement lift and take methods implemented by Flowable. So it cannot be used directly instead of Flowable although it's supposed to be used as IPublisher.

In your sample I see no reason to use FlowableProcessor. Instead you can pass Flowable used as an argument for constructing FlowableProcessor to requestChannel method directly:

const requestSource = new Flowable(source => {
    source.onSubscribe({
        cancel: () => {},
        request: n => {}
    });
    source.onNext({
        metadata: constructMetadataWithChannelId(channelId),
    });
});
sock.requestChannel(requestSource.map(item => item))
    ...

If you really-really need to use FlowableProcessor processor in this piece of code then you can force cast it to Flowable but it can be source of unexpected error in future:

sock.requestChannel(processor.map(item => item) as any as Flowable)

Please, also note you use Flowable incorrectly. You send data upon subscription when data was not requested yet. This violates RSocket contract. The proper implementation should be something like:

    let requestsSink: {
        sendRequest(myRequest: unknown): void,
        complete(): void
    };
    const requestsSource = new Flowable((requestsSubscriber) => {
        // Number of the requests requested by subscriber.
        let requestedRequests = 0;
        // Buffer for requests which should be sent but not requested yet.
        const pendingRequests: unknown[] = [];
        let completed = false;

        requestsSink = {
            sendRequest(myRequest: unknown) {
                if (completed) {
                    // It's completed, nobody expects this request.
                    return;
                }
                if (requestedRequests > 0) {
                    --requestedRequests;
                    requestsSubscriber.onNext(myRequest);
                } else {
                    pendingRequests.push(myRequest);
                }
            },
            complete() {
                if (!completed) {
                    completed = true;
                    requestsSubscriber.onComplete();
                }
            },
        };

        requestsSubscriber.onSubscribe({
            cancel: () => {
                // TODO: Should be handled somehow.
            },
            request(n: number) {
                const toSend = pendingRequests.splice(n);
                requestedRequests += n - toSend.length;
                for (const pending of toSend) {
                    requestsSubscriber.onNext(pending);
                }
            }
        });
    });

    sock.requestChannel(requestsSource.map(item => item))
        ...
    
    // Somewhere else the data is provided:
    if (requestsSink != null) {
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.complete();
    }