I have a function getC
which depends on the output of getB
which depends on the output of getA
. I want to create an observable stream
, so that when I subscribe to it I get the output of each function, as shown in the snippet.
Is there an operator in RxJS that will give me the same behaviour as in the snippet? Or is nesting switchMap
, concat
and of
the only way?
const { concat, from, of } = rxjs const { switchMap } = rxjs.operators const getA = async() => new Promise(resolve => setTimeout(() => resolve({ a: 'A' }), 500)); const getB = async value => new Promise(resolve => setTimeout(() => resolve({ a: value.a, b: 'B', }), 500)); const getC = async value => new Promise(resolve => setTimeout(() => resolve({ a: value.a, b: value.b, c: 'C', }), 500)); const stream = from(getA()).pipe( switchMap(a => concat( of(a), from(getB(a)).pipe( switchMap(b => concat( of(b), getC(b) ) ) ) )) ); stream.subscribe(value => { console.log(value); });
<script src="https://unpkg.com/rxjs@7.3.0/dist/bundles/rxjs.umd.min.js"></script>
Advertisement
Answer
If you know your dependencies in advance you can hardcode them in the different streams:
c$
requires the latest emission fromb$
b$
requires the latest emission froma$
- Run the streams in “order” whilst respecting their dependencies so you end up with three emissions, each (except for the first) depending on the previous one.
const a$ = of('A').pipe(delay(600)); const b$ = of('B').pipe(delay(400), combineLatestWith(a$), map(([b, a]) => a + b)); const c$ = of('C').pipe(delay(200), combineLatestWith(b$), map(([c, b]) => b + c)); concat(a$, b$, c$).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.3.0/rxjs.umd.min.js" integrity="sha512-y3JTS47nnpKORJX8Jn1Rlm+QgRIIZHtu3hWxal0e81avPrqUH48yk+aCi+gprT0RMAcpYa0WCkapxe+bpBHD6g==" crossorigin="anonymous" referrerpolicy="no-referrer"></script> <script> const {of, concat} = rxjs; const {delay, combineLatestWith, map} = rxjs.operators; </script>