How to apply backpressure with rsocket-go?

35 Views Asked by At

I know I must be missing some basic here. I'm familiar with RX Request(n) semantics for backpressure, but I'm struggling to make it work with rsocket-go. What I tried was calling DoOnNext and then blocking inside that call, but rsocket appears to just continue buffering and filling up memory. How do I prevent it from receiving further messages until I've processed them?

1

There are 1 best solutions below

0
anderspitman On BEST ANSWER

I was able to solve this by following the test case here.

Basically:

var su rx.Subscription

f.DoOnNext(func(input payload.Payload) error {
        su.Request(1)
        return nil
}).
Subscribe(context.Background(), rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
        su = s
        su.Request(1)
}))