Implement for-await-of statement in RxJS

Tags: , , , ,



I have the following statement:

 for await (const blob of client.list()) {
    console.log('t', blob.name);
}

client.list() returns an async iterable iterator, and expects the use of for await…of to resolve the promises. I would like to incorporate the code into an existing rxjs pipe that instantiates the client.

I looked everywhere and I couldn’t figure out how to do so without resolving the promise inside the pipe rather than converting into observables.

Any help would be appreciated!

Answer

I can’t find an existing rxjs operator, but it doesn’t seem too difficult to make your own. When integrating other APIs with observables you can interact with the API within the function passed to the observable constructor. This allows a lot of flexibility when triggering next/error/complete.

Edit – I’ve added a second option for doing this, using rxjs operators and avoiding explicitly calling next/error/complete.

const {
  Observable,
  operators,
  from
} = rxjs;
const {take, takeWhile, expand, map, filter} = operators;

const asyncGen = async function*(x = -1) {
  while(x++ < 5) {
    yield x;
  }
};

const fromAsyncIter = iterable => new Observable(subscriber => {
  let unsubscribed = false;
  const iterate = async () => {
    try {
      for await (let n of iterable) {
        console.log('await', n);
        subscriber.next(n);
        if (unsubscribed) return;
      }
      subscriber.complete();
    } catch (e) {
      subscriber.error(e);
    }
  }
  iterate();
  return () => unsubscribed = true;
});

const fromAsyncIter2 = iterable =>
  from(iterable.next()).pipe(
    expand(() => iterable.next()),
    takeWhile(x => !x.done),
    map(x => x.value)
  );

// const source = fromAsyncIter(asyncGen()).pipe(take(2));
const source = fromAsyncIter2(asyncGen()).pipe(take(2));

source.subscribe({
  next: x => console.log('next', x),
  error: e => console.error(e),
  complete: () => console.log('complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.4/rxjs.umd.js"></script>


Source: stackoverflow