Skip to content
Advertisement

How to convert node readable stream to RX observable

If I have a Node js stream, say for example from something like process.stdin or from fs.createReadStream, how can I convert this to be an RxJs Observable stream using RxJs5?

I see that RxJs-Node has a fromReadableStream method, but that looks like it hasn’t been updated in close to a year.

Advertisement

Answer

For anyone looking for this, following Mark’s recommendation, I adapted rx-node fromStream implementation for rxjs5.

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

Note that it intrinsically breaks all back pressure functionalities of streams. Observables’ are a push technology. All input chunks are going to be read and pushed to the observer as quickly as possible. Depending on your case, it might not be the best solution.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement