If I have a Node js stream, say for example from something like process.stdin
or from fs.createReadStream
, how can I convert this to be an RxJs Observable stream using RxJs5?
I see that RxJs-Node has a fromReadableStream
method, but that looks like it hasn’t been updated in close to a year.
Advertisement
Answer
For anyone looking for this, following Mark’s recommendation, I adapted rx-node fromStream
implementation for rxjs5.
JavaScript
x
33
33
1
import { Observable } from 'rxjs';
2
3
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
4
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
5
stream.pause();
6
7
return new Observable((observer) => {
8
function dataHandler(data) {
9
observer.next(data);
10
}
11
12
function errorHandler(err) {
13
observer.error(err);
14
}
15
16
function endHandler() {
17
observer.complete();
18
}
19
20
stream.addListener(dataEventName, dataHandler);
21
stream.addListener('error', errorHandler);
22
stream.addListener(finishEventName, endHandler);
23
24
stream.resume();
25
26
return () => {
27
stream.removeListener(dataEventName, dataHandler);
28
stream.removeListener('error', errorHandler);
29
stream.removeListener(finishEventName, endHandler);
30
};
31
}).share();
32
}
33
Note that it intrinsically breaks all back pressure functionalities of streams. Observables’ are a push technology. All input chunks are going to be read and pushed to the observer as quickly as possible. Depending on your case, it might not be the best solution.