I want to create an rxjs
Observable
that runs a long polling operation.
Each iteration emits intermediate results.
When isComplete
returns true, the Observable
completes.
This function should behave as follows
- It should start only when there’s at least one subscriber
- It should allow multiple subscribers to share the results
- It should abort the polling and call cancel if there are no subscribers left
The following code works properly and satisfies conditions (1) and (2):
function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> { return defer(() => { // defer to start running when there's a single subscriber return from(fetch()).pipe( expand(() => timer(1000).pipe(switchMap(fetch))), takeWhile<Response>(isComplete, false), ); }).pipe(share()); // share to allow multiple subscribers } function isComplete(r: Response): boolean { // returns true if r is complete. }
How can I modify this code to satisfy (3) as well? With the current implementation the polling stops, but how do I call cancel
?
Advertisement
Answer
Using Finalize
You can call cancel using finalize
. Here’s how that might look:
function longPollingAction( fetch: () => Promise<Response>, cancel: () => void ): Observable<Response> { // defer to turn eager promise into lazy observable return defer(fetch).pipe( expand(() => timer(1000).pipe(switchMap(fetch))), takeWhile<Response>(isComplete, false), finalize(cancel), share() // share to allow multiple subscribers ); } function isComplete(r: Response): boolean { // returns true if r is complete. }
Callback on complete
The tap operator has access to next
, error
, and complete
emissions. For a callback: () => void
, that’s good enough.
function longPollingAction( fetch: () => Promise<Response>, cancel: () => void ): Observable<Response> { // defer to turn eager promise into lazy observable return defer(fetch).pipe( expand(() => timer(1000).pipe(switchMap(fetch))), takeWhile<Response>(isComplete, false), tap({ complete: cancel }), share() // share to allow multiple subscribers ); } function isComplete(r: Response): boolean { // returns true if r is complete. }
Callback on unsubscribe
I don’t think such an operator exists, but we can make one easily enough. This operator will only fire the callback if unsubscribed. It will ignore error
, and complete
.
function onUnsubscribe<T>( fn: () => void ): MonoTypeOperatorFunction<T> { return s => new Observable(observer => { const bindOn = name => observer[name].bind(observer); const sub = s.subscribe({ next: bindOn("next"), error: bindOn("error"), complete: bindOn("complete") }); return { unsubscribe: () => { fn(); sub.unsubscribe() } }; }); }
Then you can use it like this:
function longPollingAction( fetch: () => Promise<Response>, cancel: () => void ): Observable<Response> { // defer to turn eager promise into lazy observable return defer(fetch).pipe( expand(() => timer(1000).pipe(switchMap(fetch))), takeWhile<Response>(isComplete, false), onUnsubscribe(cancel), share() // share to allow multiple subscribers ); } function isComplete(r: Response): boolean { // returns true if r is complete. }
Since share
is managing your subscriptions and share will only unsubscribe once refCount < 1
, then the only way to call cancel in this case is to have no subscribers.