Skip to content

How to convert node readable stream to RX observable

If I have a Node js stream, say for example from something like process.stdin or from fs.createReadStream, how can I convert this to be an RxJs Observable stream using RxJs5?

I see that RxJs-Node has a fromReadableStream method, but that looks like it hasn’t been updated in close to a year.



For anyone looking for this, following Mark’s recommendation, I adapted rx-node fromStream implementation for rxjs5.

import { Observable } from 'rxjs';

// Adapted from
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {

  return new Observable((observer) => {
    function dataHandler(data) {;

    function errorHandler(err) {

    function endHandler() {

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);


    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);

Note that it intrinsically breaks all back pressure functionalities of streams. Observables’ are a push technology. All input chunks are going to be read and pushed to the observer as quickly as possible. Depending on your case, it might not be the best solution.

4 People found this is helpful