I'm trying to built a chat frontend with rsocket-websocket-client. I'm able to send message from frontend using requestChannel(new Flowable(source...))
and receive message using requestChannel(new Flowable.just({metatdata}))
.
I was trying to use FlowableProcessor
to reduce two invocations of requestChannel
into one.
Couldn't find documentation on FlowableProcessor
for rsocket.
Here is my attempt:
const processor = new FlowableProcessor(
new Flowable(source => {
source.onSubscribe({
cancel: () => {},
request: n => {}
});
source.onNext({
metadata: constructMetadataWithChannelId(channelId),
});
})
);
sock.requestChannel(processor.map(item => item))
.subscribe({
onComplete: () => {
console.log(
`complted subscribe`,
);
},
onError: error1 => {
console.log(
`subscriber err: ${error1}`,
);
},
onSubscribe: subscription => {
console.log(
`onSubscribe`,
);
setConnectStatus('connected');
setChannelIdDone(true);
subscription.request(1000);
},
onNext: (val: any) => {
const value = JSON.parse(val) as Message;
console.log(
`received event from channel: ${JSON.stringify(
value,
)}`,
);
}
})
I understand it's type issue. Not able to figure out where processor.map(item => item)
is erroring out.
TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take
The error is trivial.
FlawableProcessor
cannot be used cause it does not implement the same interface asFlawable
.At the moment
rsocket-js
is not well polished and has some flaws. Some of such flaws is inconsistent type usage. AFAIU it's supposedIPublisher
andISubscriber
interfaces should be used in all other public interfaces. But for writer's simplicity (I guess) they're replaced withFlowable
andSingle
types.According to the source code
FlowableProcessor
does not extendFlowable
but rather implementsIPublisher
,ISubscriber
andISubscription
interfaces itself and does not implementlift
andtake
methods implemented byFlowable
. So it cannot be used directly instead ofFlowable
although it's supposed to be used asIPublisher
.In your sample I see no reason to use
FlowableProcessor
. Instead you can passFlowable
used as an argument for constructingFlowableProcessor
torequestChannel
method directly:If you really-really need to use
FlowableProcessor
processor in this piece of code then you can force cast it toFlowable
but it can be source of unexpected error in future:Please, also note you use
Flowable
incorrectly. You send data upon subscription when data was not requested yet. This violates RSocket contract. The proper implementation should be something like: