Skip to content

Commit 76823e6

Browse files
committed
stream: catch and re-throw objectMode incompatible error
1 parent d24c731 commit 76823e6

File tree

5 files changed

+62
-4
lines changed

5 files changed

+62
-4
lines changed

doc/api/errors.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2625,6 +2625,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
26252625
A stream method was called that cannot complete because the stream was
26262626
destroyed using `stream.destroy()`.
26272627

2628+
<a id="ERR_STREAM_INCOMPATIBLE_OBJECT_MODE"></a>
2629+
2630+
### `ERR_STREAM_INCOMPATIBLE_OBJECT_MODE`
2631+
2632+
An attempt was made when piping from object mode to non-object mode while the
2633+
chunk is an `object`.
2634+
26282635
<a id="ERR_STREAM_NULL_VALUES"></a>
26292636

26302637
### `ERR_STREAM_NULL_VALUES`

lib/internal/errors.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1716,6 +1716,7 @@ E('ERR_STREAM_ALREADY_FINISHED',
17161716
Error);
17171717
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
17181718
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
1719+
E('ERR_STREAM_INCOMPATIBLE_OBJECT_MODE', 'Cannot pipe from objectMode to non-objectMode', Error);
17191720
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
17201721
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
17211722
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);

lib/internal/streams/readable.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const {
7373
kErrored,
7474
kConstructed,
7575
kOnConstructed,
76+
isObjectModeCompatible,
7677
} = require('internal/streams/utils');
7778

7879
const {
@@ -82,6 +83,7 @@ const {
8283
ERR_INVALID_ARG_TYPE,
8384
ERR_METHOD_NOT_IMPLEMENTED,
8485
ERR_OUT_OF_RANGE,
86+
ERR_STREAM_INCOMPATIBLE_OBJECT_MODE,
8587
ERR_STREAM_PUSH_AFTER_EOF,
8688
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
8789
ERR_UNKNOWN_ENCODING,
@@ -1006,10 +1008,20 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10061008
src.on('data', ondata);
10071009
function ondata(chunk) {
10081010
debug('ondata');
1009-
const ret = dest.write(chunk);
1010-
debug('dest.write', ret);
1011-
if (ret === false) {
1012-
pause();
1011+
try {
1012+
const ret = dest.write(chunk);
1013+
debug('dest.write', ret);
1014+
if (ret === false) {
1015+
pause();
1016+
}
1017+
} catch (err) {
1018+
if (
1019+
(typeof chunk === 'object') && !isObjectModeCompatible(state, dest)
1020+
) {
1021+
dest.destroy(new ERR_STREAM_INCOMPATIBLE_OBJECT_MODE());
1022+
} else {
1023+
dest.destroy(err);
1024+
}
10131025
}
10141026
}
10151027

lib/internal/streams/utils.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,15 @@ function isErrored(stream) {
315315
));
316316
}
317317

318+
function isObjectModeCompatible(readable, writable) {
319+
if ((readable[kState] & kObjectMode) !== 0 &&
320+
(writable._writableState[kState] & kObjectMode) === 0) {
321+
return false;
322+
}
323+
324+
return true;
325+
}
326+
318327
module.exports = {
319328
kOnConstructed,
320329
isDestroyed,
@@ -332,6 +341,7 @@ module.exports = {
332341
isDuplexNodeStream,
333342
isFinished,
334343
isIterable,
344+
isObjectModeCompatible,
335345
isReadableNodeStream,
336346
isReadableStream,
337347
isReadableEnded,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('node:assert');
5+
const { Readable, Transform } = require('node:stream');
6+
7+
{
8+
const objectReadable = Readable.from([
9+
{ hello: 'hello' },
10+
{ world: 'world' },
11+
]);
12+
13+
const passThrough = new Transform({
14+
transform(chunk, _encoding, cb) {
15+
this.push(chunk);
16+
cb(null);
17+
},
18+
});
19+
20+
passThrough.on('error', common.mustCall());
21+
22+
objectReadable.pipe(passThrough);
23+
24+
assert.rejects(async () => {
25+
// eslint-disable-next-line no-unused-vars
26+
for await (const _ of passThrough);
27+
}, /ERR_STREAM_INCOMPATIBLE_OBJECT_MODE/).then(common.mustCall());
28+
}

0 commit comments

Comments
 (0)