Skip to content
Advertisement

RxJs: Abort a deferred and shared observable when all unsubscribe

I want to create an rxjs Observable that runs a long polling operation. Each iteration emits intermediate results. When isComplete returns true, the Observable completes.

This function should behave as follows

  1. It should start only when there’s at least one subscriber
  2. It should allow multiple subscribers to share the results
  3. It should abort the polling and call cancel if there are no subscribers left

The following code works properly and satisfies conditions (1) and (2):

function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
   return defer(() => { // defer to start running when there's a single subscriber
     return from(fetch()).pipe(
         expand(() => timer(1000).pipe(switchMap(fetch))),
         takeWhile<Response>(isComplete, false),
    );
   }).pipe(share()); // share to allow multiple subscribers
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

How can I modify this code to satisfy (3) as well? With the current implementation the polling stops, but how do I call cancel?

Advertisement

Answer

Using Finalize

You can call cancel using finalize. Here’s how that might look:

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    finalize(cancel),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

Callback on complete

The tap operator has access to next, error, and complete emissions. For a callback: () => void, that’s good enough.

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    tap({
      complete: cancel
    }),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

Callback on unsubscribe

I don’t think such an operator exists, but we can make one easily enough. This operator will only fire the callback if unsubscribed. It will ignore error, and complete.

function onUnsubscribe<T>(
  fn: () => void
): MonoTypeOperatorFunction<T> {
  return s => new Observable(observer => {
    const bindOn = name => observer[name].bind(observer);
    const sub = s.subscribe({
      next: bindOn("next"),
      error: bindOn("error"),
      complete: bindOn("complete")
    });
   
    return {
      unsubscribe: () => {
        fn();
        sub.unsubscribe()
      }
    };
  });
}

Then you can use it like this:

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    onUnsubscribe(cancel),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

Since share is managing your subscriptions and share will only unsubscribe once refCount < 1, then the only way to call cancel in this case is to have no subscribers.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement