I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that comes through the ReadableStreams goes directly into the WritableStream right then.
I can do the opposite, by using ReadableStream.prototype.tee()
to split one ReadableStream into two, but I do not know how to combine two into one.
const textarea = document.querySelector("textarea"); // This is a ReadableStream which says "Mom! " every 1 second. const momReadableStream = new ReadableStream({ start: controller => { const sayMom = () => controller.enqueue("Mom! "); setInterval(sayMom, 1000); }}); // This is a ReadableStream which says "Lois! " every 0.7 seconds. const loisReadableStream = new ReadableStream({ start: controller => { const sayLois = () => controller.enqueue("Lois! "); setInterval(sayLois, 700); }}); // This is a WritableStream which displays what it receives in a textarea. const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk }); momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>
Advertisement
Answer
Manually, by racing the most recent read from each reader to produce the overall read and initiating those reads as necessary:
const never = new Promise(() => {}); const mergeStreams = streams => { const readers = streams.map(s => s.getReader()); const reads = streams.map(() => null); const dones = []; const allDone = Promise.all(streams.map(s => new Promise(resolve => { dones.push(resolve); }))); return new ReadableStream({ start: controller => { allDone.then(() => { controller.close(); }); }, pull: controller => Promise.race( readers.map((r, i) => reads[i] ??= r.read().then(({value, done}) => { if (done) { dones[i](); return never; } controller.enqueue(value); reads[i] = null; }) ) ), cancel: reason => { for (const reader of readers) { reader.cancel(reason); } }, }); };
const textarea = document.querySelector("textarea"); const never = new Promise(() => {}); const mergeStreams = streams => { const readers = streams.map(s => s.getReader()); const reads = streams.map(() => null); const dones = []; const allDone = Promise.all(streams.map(s => new Promise(resolve => { dones.push(resolve); }))); return new ReadableStream({ start: controller => { allDone.then(() => { controller.close(); }); }, pull: controller => Promise.race( readers.map((r, i) => reads[i] ??= r.read().then(({value, done}) => { if (done) { dones[i](); return never; } controller.enqueue(value); reads[i] = null; }) ) ), cancel: reason => { for (const reader of readers) { reader.cancel(reason); } }, }); }; // This is a ReadableStream which says "Mom! " every 1 second. const momReadableStream = new ReadableStream({ start: controller => { const sayMom = () => controller.enqueue("Mom! "); setInterval(sayMom, 1000); }}); // This is a ReadableStream which says "Lois! " every 0.7 seconds. const loisReadableStream = new ReadableStream({ start: controller => { const sayLois = () => controller.enqueue("Lois! "); setInterval(sayLois, 700); }}); // This is a WritableStream which displays what it receives in a textarea. const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk }); mergeStreams([ momReadableStream, loisReadableStream, ]).pipeTo(writableStream).catch(console.error);
<textarea readonly></textarea>