How to model asynchronous callback in functional reactive programming?

80 Views Asked by At

As I understand, in FRP (Functional Reactive Programming), we model the system as a component which receives some input signals and generates some output signals:

                ,------------.
--- input1$ --> |            | -- output1$ -->
                |   System   | -- output2$ -->
--- input2$ --> |            | -- output3$ -->
                `------------'

In this way, if we have multiple subsystems, we can plump them together as long as we can provide operators that can pipe inputs and outputs.

Now, I'm building an app, which processes video frames asynchronously. The actual processing logic is abstracted and can be provided as an argument. In non-FRP way of thinking, I can construct the app as

new App(async (frame) => {
    return await processFrame(frame)
})

The App is responsible for establishing communication with underlying video pipeline and repeatedly get video frames and then pass that frame to the given callback, and once the callback resolves,App sends back the processed frame.

Now I want to model the App in a FRP way so I can flexibly design the frame processing.

const processedFrameSubject = new Subject()
const { frame$ } = createApp(processedFrameSubject)
frame$.pipe(
    map(toRGB),
    mergeMap(processRGBFrame),
    map(toYUV)
).subscribe(processedFrameSubject)

The benefit is that it enables the consumer of createApp to define the processing pipeline declaratively.

However, in createApp, given a processedFrame, I need to reason about which frame it is related to. Since frame$ and processedFrameSubject is now separated, it's really hard for createApp to reason about which frame a processedFrame is related to, which was quite easy in non-FRP implementation because the frame and processedFrame were in same closure.

1

There are 1 best solutions below

1
olivarra1 On

In functional reactive programming, you would avoid using side effects as much as possible, this means avoiding .subscribe(, tap(() => subject.next()), etc. With FRP, your state is declared on how it works and how it's wired up, but it doesn't execute until someone needs it and performs the side effect.

So I think that the following API would still be considered FRP:

function createApp(
  processFrame: (frame: Frame) => Observable<ProcessedFrame>
): Observable<void>

const app$ = createApp(frame => of(frame).pipe(
  map(toRGB),
  mergeMap(processRGBFrame),
  map(toYUV)
));

// `app$` is an Observable that can be consumed by composing it to other
// observables, or by "executing the side effect" by .subscribe() on it

// possible implementation of createApp for this API
function createApp(
  processFrame: (frame: Frame) => Observable<ProcessedFrame>
) {
  return new Observable<void>(() => {
    const stopVideoHandler = registerVideoFrameHandler(
      (frame: Frame) => firstValueFrom(processFrame(frame))
    );

    return () => {
      // teardown
      stopVideoHandler()
    }
  });
}

Something worth noting is that createApp is returning a new Observable. Inside new Observable( we can escape from FRP because it's the only way we can integrate with external parties, and all the side effects we have written won't be called until someone .subscribe()s to the observable.

This API is simple and would still be FRP, but it has one limitation: the processFrame callback can only process frames independently from others.

If you need an API that supports that, then you need to expose the frames$, but again, this is a project function for createApp:

function createApp(
  projectFn: (frame$: Observable<Frame>) => Observable<ProcessedFrame>
): Observable<void>

const app$ = createApp(frame$ => frame$.pipe(
  map(toRGB),
  mergeMap(processRGBFrame),
  map(toYUV)
));

// possible declaration of createApp
function createApp(
  projectFn: (frame$: Observable<Frame>) => Observable<ProcessedFrame>
) {
  return new Observable<void>(() => {
    const frame$ = new Subject<Frame>;
    const processedFrame$ = connectable(frame$.pipe(projectFn));
    const processedSub = processedFrame$.connect();

    const stopVideoHandler = registerVideoFrameHandler(
      (frame: Frame) => {
        // We need to create the promise _before_ we send in the next `frame$`,  in case it's processed synchronously
        const resultFrame = firstValueFrom(processedFrame$);

        frame$.next(frame);

        return resultFrame;
      })
    );

    return () => {
      // teardown
      stopVideoHandler()
      processedSub.unsubscribe();
    }
  });
}

I'm guessing here registerVideoFrameHandler will call the function one-by-one without overlap? If there's overlap then you'd need to track the frame number in some way, if the SDK doesn't give you any option, then try something like:

// Assuming `projectFn` will emit frames in order. If not, then the API
// should change to be able to match them
const processedFrame$ = connectable(frame$.pipe(
  projectFn,
  map((result, index) => ({ result, index }))
));
const processedSub = processedFrame$.connect();

let frameIdx = 0;
const stopVideoHandler = registerVideoFrameHandler(
  (frame: Frame) => {
    const thisIdx = frameIdx;
    frameIdx++;

    const resultFrame = firstValueFrom(processedFrame$.pipe(
      filter(({ index }) => index === thisIdx),
      map(({ result }) => result)
    ));

    frame$.next(frame);

    return resultFrame;
  })
);