Skip to content

pipeline deadlock with readable tail? #40685

Open
@ronag

Description

@ronag

#40653 (comment)

@lpinca feel free to edit issue and fill out with your concern. Otherwise I will dig into this at some later point.


The pipeline() callback might never be called if the destination is a Transform stream that is not read. Here is an example.

const stream = require('stream');

const chunk = Buffer.alloc(1024);
let bytesRead = 0;

const readable = new stream.Readable({
  read() {
    if (bytesRead === 100 * 1024) {
      readable.push(null);
    } else {
      bytesRead += chunk.length;
      readable.push(chunk);
    }
  }
});

const passThrough = new stream.PassThrough();

stream.pipeline(readable, passThrough, function (err) {
  if (err) {
    throw err;
  }

  console.log('done');
});

In the callback version this is not a big issue because pipeline() returns the last stream, so the user could resume the returned stream.

pipeline(readable, passThrough, fn).resume();

In the promisified variant, pipeline() is just a wrapper that returns a promise that is fulfilled when the callback of the callback variant is called.

However the user could not be able to resume the returned stream because they might not have direct control on it. For example if the last entry is a generator.

const stream = require('stream');
const streamPromises = require('stream/promises');

const chunk = Buffer.alloc(1024);
let bytesRead = 0;

const readable = new stream.Readable({
  read() {
    if (bytesRead === 100 * 1024) {
      readable.push(null);
    } else {
      bytesRead += chunk.length;
      readable.push(chunk);
    }
  }
});

async function* passThrough(source) {
  for await (const chunk of source) {
    yield chunk;
  }
}

async function run() {
  await streamPromises.pipeline(readable, passThrough);
}

run()
  .then(function () {
    console.log('done');
  })
  .catch(console.error);

In this case stream.pipeline() returns an internal PassThrough proxy that the user cannot resume so the promise returned by streamPromises.pipeline() might never be fulfilled.

A possible workaround is to resume the returned stream in the promisified variant.

diff --git a/lib/stream/promises.js b/lib/stream/promises.js
index 0db01a8b20..5bbadd43c5 100644
--- a/lib/stream/promises.js
+++ b/lib/stream/promises.js
@@ -23,13 +23,17 @@ function pipeline(...streams) {
       signal = options.signal;
     }
 
-    pl(streams, (err, value) => {
+    const stream = pl(streams, (err, value) => {
       if (err) {
         reject(err);
       } else {
         resolve(value);
       }
     }, { signal });
+
+    if (stream.readable) {
+      stream.resume();
+    }
   });
 }

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions