Skip to content
Advertisement

RxJS observable of results from chain of functions

I have a function getC which depends on the output of getB which depends on the output of getA. I want to create an observable stream, so that when I subscribe to it I get the output of each function, as shown in the snippet.

Is there an operator in RxJS that will give me the same behaviour as in the snippet? Or is nesting switchMap, concat and of the only way?

const { concat, from, of } = rxjs
const { switchMap } = rxjs.operators


const getA = async() => new Promise(resolve => setTimeout(() => resolve({
  a: 'A'
}), 500));

const getB = async value => new Promise(resolve => setTimeout(() => resolve({
  a: value.a,
  b: 'B',
}), 500));

const getC = async value => new Promise(resolve => setTimeout(() => resolve({
  a: value.a,
  b: value.b,
  c: 'C',
}), 500));

const stream = from(getA()).pipe(
  switchMap(a => concat(
    of(a),
    from(getB(a)).pipe(
      switchMap(b => concat(
          of(b),
          getC(b)
        )
      )
    )
  ))
);

stream.subscribe(value => {
  console.log(value);
});
<script src="https://unpkg.com/rxjs@7.3.0/dist/bundles/rxjs.umd.min.js"></script>

Advertisement

Answer

If you know your dependencies in advance you can hardcode them in the different streams:

  1. c$ requires the latest emission from b$
  2. b$ requires the latest emission from a$
  3. Run the streams in “order” whilst respecting their dependencies so you end up with three emissions, each (except for the first) depending on the previous one.

const a$ = of('A').pipe(delay(600));
const b$ = of('B').pipe(delay(400), combineLatestWith(a$), map(([b, a]) => a + b));
const c$ = of('C').pipe(delay(200), combineLatestWith(b$), map(([c, b]) => b + c));
concat(a$, b$, c$).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.3.0/rxjs.umd.min.js" integrity="sha512-y3JTS47nnpKORJX8Jn1Rlm+QgRIIZHtu3hWxal0e81avPrqUH48yk+aCi+gprT0RMAcpYa0WCkapxe+bpBHD6g==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script>
const {of, concat} = rxjs;
const {delay, combineLatestWith, map} = rxjs.operators;
</script>
Advertisement