I want to create an rxjs Observable that runs a long polling operation.
Each iteration emits intermediate results.
When isComplete returns true, the Observable completes.
This function should behave as follows
- It should start only when there’s at least one subscriber
- It should allow multiple subscribers to share the results
- It should abort the polling and call cancel if there are no subscribers left
The following code works properly and satisfies conditions (1) and (2):
function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
return defer(() => { // defer to start running when there's a single subscriber
return from(fetch()).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
);
}).pipe(share()); // share to allow multiple subscribers
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
How can I modify this code to satisfy (3) as well? With the current implementation the polling stops, but how do I call cancel?
Advertisement
Answer
Using Finalize
You can call cancel using finalize. Here’s how that might look:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
finalize(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
Callback on complete
The tap operator has access to next, error, and complete emissions. For a callback: () => void, that’s good enough.
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
tap({
complete: cancel
}),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
Callback on unsubscribe
I don’t think such an operator exists, but we can make one easily enough. This operator will only fire the callback if unsubscribed. It will ignore error, and complete.
function onUnsubscribe<T>(
fn: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
return {
unsubscribe: () => {
fn();
sub.unsubscribe()
}
};
});
}
Then you can use it like this:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
Since share is managing your subscriptions and share will only unsubscribe once refCount < 1, then the only way to call cancel in this case is to have no subscribers.