I'm trying to built a chat frontend with rsocket-websocket-client. I'm able to send message from frontend using requestChannel(new Flowable(source...)) and receive message using requestChannel(new Flowable.just({metatdata})).
I was trying to use FlowableProcessor to reduce two invocations of requestChannel into one.
Couldn't find documentation on FlowableProcessor for rsocket.
Here is my attempt:
const processor = new FlowableProcessor(
new Flowable(source => {
source.onSubscribe({
cancel: () => {},
request: n => {}
});
source.onNext({
metadata: constructMetadataWithChannelId(channelId),
});
})
);
sock.requestChannel(processor.map(item => item))
.subscribe({
onComplete: () => {
console.log(
`complted subscribe`,
);
},
onError: error1 => {
console.log(
`subscriber err: ${error1}`,
);
},
onSubscribe: subscription => {
console.log(
`onSubscribe`,
);
setConnectStatus('connected');
setChannelIdDone(true);
subscription.request(1000);
},
onNext: (val: any) => {
const value = JSON.parse(val) as Message;
console.log(
`received event from channel: ${JSON.stringify(
value,
)}`,
);
}
})
I understand it's type issue. Not able to figure out where processor.map(item => item) is erroring out.
TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take
The error is trivial.
FlawableProcessorcannot be used cause it does not implement the same interface asFlawable.At the moment
rsocket-jsis not well polished and has some flaws. Some of such flaws is inconsistent type usage. AFAIU it's supposedIPublisherandISubscriberinterfaces should be used in all other public interfaces. But for writer's simplicity (I guess) they're replaced withFlowableandSingletypes.According to the source code
FlowableProcessordoes not extendFlowablebut rather implementsIPublisher,ISubscriberandISubscriptioninterfaces itself and does not implementliftandtakemethods implemented byFlowable. So it cannot be used directly instead ofFlowablealthough it's supposed to be used asIPublisher.In your sample I see no reason to use
FlowableProcessor. Instead you can passFlowableused as an argument for constructingFlowableProcessortorequestChannelmethod directly:If you really-really need to use
FlowableProcessorprocessor in this piece of code then you can force cast it toFlowablebut it can be source of unexpected error in future:Please, also note you use
Flowableincorrectly. You send data upon subscription when data was not requested yet. This violates RSocket contract. The proper implementation should be something like: