I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that comes through the ReadableStreams goes directly into the WritableStream right then.
I can do the opposite, by using ReadableStream.prototype.tee()
to split one ReadableStream into two, but I do not know how to combine two into one.
JavaScript
x
21
21
1
const textarea = document.querySelector("textarea");
2
3
4
// This is a ReadableStream which says "Mom! " every 1 second.
5
const momReadableStream = new ReadableStream({ start: controller => {
6
const sayMom = () => controller.enqueue("Mom! ");
7
setInterval(sayMom, 1000);
8
}});
9
10
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
11
const loisReadableStream = new ReadableStream({ start: controller => {
12
const sayLois = () => controller.enqueue("Lois! ");
13
setInterval(sayLois, 700);
14
}});
15
16
// This is a WritableStream which displays what it receives in a textarea.
17
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
18
19
20
momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
21
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
JavaScript
1
1
1
<textarea readonly></textarea>
Advertisement
Answer
Manually, by racing the most recent read from each reader to produce the overall read and initiating those reads as necessary:
JavaScript
1
38
38
1
const never = new Promise(() => {});
2
3
const mergeStreams = streams => {
4
const readers = streams.map(s => s.getReader());
5
const reads = streams.map(() => null);
6
const dones = [];
7
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
8
dones.push(resolve);
9
})));
10
11
return new ReadableStream({
12
start: controller => {
13
allDone.then(() => {
14
controller.close();
15
});
16
},
17
pull: controller =>
18
Promise.race(
19
readers.map((r, i) =>
20
reads[i] ??= r.read().then(({value, done}) => {
21
if (done) {
22
dones[i]();
23
return never;
24
}
25
26
controller.enqueue(value);
27
reads[i] = null;
28
})
29
)
30
),
31
cancel: reason => {
32
for (const reader of readers) {
33
reader.cancel(reason);
34
}
35
},
36
});
37
};
38
JavaScript
1
62
62
1
const textarea = document.querySelector("textarea");
2
3
4
const never = new Promise(() => {});
5
6
const mergeStreams = streams => {
7
const readers = streams.map(s => s.getReader());
8
const reads = streams.map(() => null);
9
const dones = [];
10
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
11
dones.push(resolve);
12
})));
13
14
return new ReadableStream({
15
start: controller => {
16
allDone.then(() => {
17
controller.close();
18
});
19
},
20
pull: controller =>
21
Promise.race(
22
readers.map((r, i) =>
23
reads[i] ??= r.read().then(({value, done}) => {
24
if (done) {
25
dones[i]();
26
return never;
27
}
28
29
controller.enqueue(value);
30
reads[i] = null;
31
})
32
)
33
),
34
cancel: reason => {
35
for (const reader of readers) {
36
reader.cancel(reason);
37
}
38
},
39
});
40
};
41
42
43
// This is a ReadableStream which says "Mom! " every 1 second.
44
const momReadableStream = new ReadableStream({ start: controller => {
45
const sayMom = () => controller.enqueue("Mom! ");
46
setInterval(sayMom, 1000);
47
}});
48
49
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
50
const loisReadableStream = new ReadableStream({ start: controller => {
51
const sayLois = () => controller.enqueue("Lois! ");
52
setInterval(sayLois, 700);
53
}});
54
55
// This is a WritableStream which displays what it receives in a textarea.
56
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
57
58
59
mergeStreams([
60
momReadableStream,
61
loisReadableStream,
62
]).pipeTo(writableStream).catch(console.error);
JavaScript
1
1
1
<textarea readonly></textarea>