Skip to content

Commit 9e5a829

Browse files
committed
stream: pipeline should drain empty readable
This simplifies some cases where the last stream is a Duplex without any expected output. await pipeline(readable, duplex)
1 parent e937662 commit 9e5a829

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

lib/internal/streams/pipeline.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ const {
3333
isIterable,
3434
isReadableNodeStream,
3535
isNodeStream,
36+
isReadableFinished,
3637
} = require('internal/streams/utils');
3738
const { AbortController } = require('internal/abort_controller');
39+
const console = require('console');
3840

3941
let PassThrough;
4042
let Readable;
@@ -229,7 +231,14 @@ function pipelineImpl(streams, callback, opts) {
229231

230232
if (isNodeStream(stream)) {
231233
finishCount++;
232-
destroys.push(destroyer(stream, reading, writing, finish));
234+
destroys.push(destroyer(stream, reading, writing, (err) => {
235+
if (!err && !reading && isReadableFinished(stream, false)) {
236+
stream.read(0);
237+
destroyer(stream, true, writing, finish);
238+
} else {
239+
finish(err);
240+
}
241+
}));
233242
}
234243

235244
if (i === 0) {

test/parallel/test-stream-pipeline.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
10271027
const src = new PassThrough();
10281028
const dst = new PassThrough();
10291029
pipeline(src, dst, common.mustSucceed(() => {
1030-
assert.strictEqual(dst.destroyed, false);
1030+
assert.strictEqual(dst.destroyed, true);
10311031
}));
10321032
src.end();
10331033
}
@@ -1447,3 +1447,23 @@ const tsp = require('timers/promises');
14471447
assert.strictEqual(text, 'Hello World!');
14481448
}));
14491449
}
1450+
1451+
{
1452+
const pipelinePromise = promisify(pipeline);
1453+
1454+
async function run() {
1455+
const read = new Readable({
1456+
read() {}
1457+
});
1458+
1459+
const duplex = new PassThrough();
1460+
1461+
read.push(null);
1462+
1463+
await pipelinePromise(read, duplex);
1464+
1465+
assert.strictEqual(duplex.destroyed, true);
1466+
}
1467+
1468+
run();
1469+
}

0 commit comments

Comments
 (0)