Skip to content

Commit c8427ab

Browse files
Add backpressure functionality to row stream (#505)
* Add backpressure functionality to row stream * Fix indexing of row buffer * Increment rowIndex before if statement * Add tests for highWaterMark * Rename test suite name to follow naming convention * Add try/catch and pass error to callback * Refactor tests into a loop * Add check to ensure rowIndex is greater than 0 before decrement
1 parent 33214d2 commit c8427ab

File tree

3 files changed

+128
-3
lines changed

3 files changed

+128
-3
lines changed

lib/connection/result/row_stream.js

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,37 @@ function RowStream(statement, context, options)
173173
// add the next row to the read queue
174174
process.nextTick(function ()
175175
{
176-
self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName));
176+
// check if there are still rows available in the rowBuffer
177+
if (rowBuffer && rowIndex > 0)
178+
{
179+
rowIndex--; // decrement the index to include the previous row in the while loop
180+
181+
// push() data to readable stream until highWaterMark threshold is reached or all rows are pushed
182+
while (rowIndex < rowBuffer.length)
183+
{
184+
row = rowBuffer[rowIndex++];
185+
186+
// if buffer has reached the threshold based on the highWaterMark value then
187+
// push() will return false and pause sending data to the buffer until the data is read from the buffer
188+
if (!self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName)))
189+
{
190+
break;
191+
}
192+
}
193+
194+
// check if all rows in rowBuffer has been pushed to the readable stream
195+
if (rowIndex === rowBuffer.length)
196+
{
197+
// reset the buffer and index
198+
rowBuffer = null;
199+
rowIndex = 0;
200+
}
201+
}
202+
else // No more rows left in the buffer
203+
{
204+
// Push the last row in the buffer
205+
self.push(externalizeRow(row, columns, mapColumnIdToExtractFnName));
206+
}
177207
});
178208
};
179209

lib/connection/statement.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,9 +678,15 @@ function invokeStatementComplete(statement, context)
678678
// array to the complete callback as the last argument
679679
var rows = [];
680680
statement.streamRows()
681-
.on('data', function (row)
681+
.on('readable', function () // read only when data is available
682682
{
683-
rows.push(row);
683+
let row;
684+
685+
// while there are rows available to read, push row to results array
686+
while ((row = this.read()) !== null)
687+
{
688+
rows.push(row);
689+
}
684690
})
685691
.on('end', function ()
686692
{

test/integration/testStreamRows.js

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* Copyright (c) 2015-2019 Snowflake Computing Inc. All rights reserved.
33
*/
44
var assert = require('assert');
5+
var async = require('async');
56
var testUtil = require('./testUtil');
67
require('events').EventEmitter.prototype._maxListeners = 100;
78

@@ -372,4 +373,92 @@ describe('Test Stream Rows API', function ()
372373
}
373374
});
374375
});*/
376+
377+
});
378+
379+
describe('Test Stream Rows HighWaterMark', function ()
380+
{
381+
this.timeout(300000);
382+
383+
before(function (done)
384+
{
385+
connection = testUtil.createConnection();
386+
testUtil.connect(connection, done);
387+
});
388+
389+
after(function (done)
390+
{
391+
testUtil.destroyConnection(connection, done);
392+
});
393+
394+
var testingFunc = function (highWaterMark, expectedRowCount, callback)
395+
{
396+
async.series(
397+
[
398+
function (callback)
399+
{
400+
// select table with row count equal to expectedRowCount
401+
var statement = connection.execute({
402+
sqlText: `SELECT seq8() FROM table(generator(rowCount => ${expectedRowCount}));`,
403+
streamResult: true,
404+
complete: function ()
405+
{
406+
var actualRowCount = 0;
407+
var rowIndex;
408+
409+
var stream = statement.streamRows();
410+
stream.on('error', function (err)
411+
{
412+
callback(err);
413+
});
414+
stream.on('readable', function ()
415+
{
416+
rowIndex = 0;
417+
418+
while (this.read() !== null)
419+
{
420+
actualRowCount++;
421+
rowIndex++;
422+
}
423+
424+
// assert the amount of rows read per loop never exceeds the highWaterMark threshold
425+
try
426+
{
427+
assert.ok(rowIndex <= highWaterMark);
428+
}
429+
catch (err)
430+
{
431+
stream.destroy(err); // passes error to the stream error event
432+
}
433+
});
434+
stream.on('end', function ()
435+
{
436+
try
437+
{
438+
// assert the total number of rows is equal to the specified row count
439+
assert.strictEqual(actualRowCount, expectedRowCount);
440+
callback();
441+
}
442+
catch (err)
443+
{
444+
callback(err);
445+
}
446+
});
447+
}
448+
});
449+
}
450+
],
451+
callback
452+
);
453+
};
454+
455+
const highWaterMarkValue = 10; // default parameter value is 10 (based on PARAM_ROW_STREAM_HIGH_WATER_MARK)
456+
457+
[1000, 10000, 100000, 1000000].forEach(rowCount =>
458+
{
459+
it(`test ${rowCount} rows`, done =>
460+
{
461+
testingFunc(highWaterMarkValue, rowCount, done);
462+
});
463+
});
375464
});

0 commit comments

Comments
 (0)