Skip to content
Advertisement

Passing pipeline item to promise argument in `takeUntil`

I have code with a similar control flow to this example (obviously, the predicate below doesn’t need to be async, but it’s an example):

const items [1,2,3,4,5];
const predicate = async (i) => i < 3;
const pipeline = from(items).pipe(
  takeUntil(predicate),
);

pipeline.subscribe(console.log);

But this throws a TypeError with the message “You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.”

I’ve tried making predicate a promise (new Promise(...), and using takeWhile in place of takeUntil but neither worked as expected (the promise always returned true – I am assuming it is coerced to be truthy). Is this some sort of misinterpretation on my behalf of how takeUntil/takeWhile works?

As a workaround, I’m currently using this:

const takeWhileAsync = (predicate = tautology) => {
  const resultSymbol = Symbol('predicateResult');
  const valueSymbol = Symbol('value');

  const predicateResolver = item => of(item).pipe(
    concatMap(async (i) => {
      const predicateResult = await predicate(i);
      return {[resultSymbol]: predicateResult, [valueSymbol]: i};
    }),
  );

  return pipe(
    concatMap(predicateResolver),
    takeWhile(({[resultSymbol]: predicateResult}) => predicateResult),
    pluck(valueSymbol),
  );
};

Advertisement

Answer

Idiomatic RxJS

Most RxJS operators (concatMap, mergeMap, switchMap, ect…) take ObservableInput as the returned values (meaning they work with Promises natively).

This is a take on @’Nick Bull’s answer that doesn’t do any promise (async/await) stuff. This way you can use promises or (as is probably advisable), stick entirely with Observables.

function takeWhileConcat<T>(genPred: (v:T) => ObservableInput<Boolean>): MonoTypeOperatorFunction<T>{
  return pipe(
    concatMap((payload: T) => from(genPred(payload)).pipe(
      take(1),
      map((pass: boolean) => ({payload, pass}))
    )),
    takeWhile(({pass}) => pass),
    map(({payload}) => payload)
  );
}

const items = [1,2,3,4,5];
const predicate = async (i) => i < 3;
const pipeline = from(items).pipe(
  takeWhileConcat(predicate),
);

pipeline.subscribe(console.log);

Now, if you wanted to, you can replace predicate with an observable:

const predicate = i => of(i < 3);

and not change anything else. Which is nice, because observables and promises sometimes don’t play as nicely as you’d expect.

Consider that promises are eager and observables are lazy and you can get some strange execution orders that are tough to debug.


This solution doesn’t allow for non-promise predicates!

So, you’re right. This solution requires you to return an ObservableInput (Any iterable, promise, or observable). Actually, any ES6 Iterable, so Arrays, Generators, Maps, HashMaps, Vectors, custom iterables, you name it. They’ll all work.

  • Observable: predicate = value => of(value > 3)
  • Iterable: predicate = value => [value > 3]
  • Promise: predicate = value => Promise.resolve(value > 3)
  • Syntactic sugar for promise: predicate = async value => value > 3

What it does not allow is anything that isn’t an ObservableInput. This matches how every other RxJS operator that takes an ObservableInput functions. Of course, we could martial any value as an observable using of, but this was decided against as it’s much more likely to be a foot-gun than to be useful.

In a dynamically-typed language, it can be difficult to decide what your API allows vs where it should throw an error. I like that RxJS doesn’t treat values as Observables by default. I think the RxJS api is much clearer for it.

Operators do a better job of making their intentions clear. Imagine these two were the same:

map(x => x + 1)
mergeMap(x = x + 1)

That second one could turn the returned value into an observable and merge that observable, but this requires a lot of specialized knowledge about this operator. Map, on the other hand, works exactly the same way it does on the other iterators/collections we’re already familiar with.

How to accept a non-promise predicate

Anyway, all this to say, you can change my answer to accept a standard predicate (v => boolean) as well as an async predicate (v => ObservableInput<boolean>) if you so choose. Just supply a value and inspect what is returned.

I’m just not convinced this is desirable behavior.

What if the input items are an infinite generator?

Here’s an generator that generates integers forever.

const range = function*() { 
  for (let i = 0; true; i++) yield i; 
}

from(range()) doesn’t know when to stop calling the generator (or even that the generator is infinite). from(range()).subscribe(console.log) will print numbers to the console indefinitely.

The key here is that in such a case, the code that stops us from calling back to the generator must run synchronously.

For example:

from(range()).pipe(
  take(5)
).subscribe(console.log);

will print the numbers 0 – 4 to the console.

This is true for our custom operator as well. Code that still works:

from(range()).pipe(
  takeWhileConcat(v => of(v < 10))
).subscribe(console.log);

// or 

from(range()).pipe(
  takeWhileConcat(v => [v < 10])
).subscribe(console.log);

code that doesn’t halt:

from(range()).pipe(
  takeWhileConcat(v => of(v < 10).pipe(
    delay(0)
  ))
).subscribe(console.log);

// or

from(range()).pipe(
  takeWhileConcat(async v => v < 10)
).subscribe(console.log);

That’s a consequence of how javascript engine deals with async behavior. Any current code is run to completion before the engine looks at the event queue. Every promise is put in the event queue, and async observables are put on the event queue as well (which is why delay(0) is basically the same as a promise that resolves right away)

concatMap does have built-in backpressure, but the asynchronous part of the code never gets to run since the synchronous part of the code has created an infinite loop.

This is one of the downsides of a push-based streaming library (Like RxJS). If it were pull-based (like generators are), this wouldn’t be an issue, but other issues emerge instead. You can google pull/push based streaming for lots of articles on the topic.

There are safe ways to interface pull-based and push-based streams, but it takes a bit of work.

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement