Skip to content

Commit 6309822

Browse files
committed
feat: close connections that exceeds opts.idleTimeout
fixes #148, closes #60 with a mocked Pool and Connection for sqlite driver
1 parent b77da6b commit 6309822

File tree

10 files changed

+230
-53
lines changed

10 files changed

+230
-53
lines changed

src/drivers/abstract/index.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,33 @@ module.exports = class AbstractDriver {
1212
constructor(opts = {}) {
1313
const { logger } = opts;
1414
this.logger = logger instanceof Logger ? logger : new Logger(logger);
15+
this.idleTimeout = opts.idleTimeout || 60;
16+
this.options = opts;
17+
}
18+
19+
closeConnection(connection) {
20+
throw new Error('not implemented');
21+
}
22+
23+
recycleConnections() {
24+
const acquiredAt = new Map();
25+
const timeout = this.idleTimeout * 1000;
26+
27+
this.pool.on('acquire', function onAcquire(connection) {
28+
acquiredAt.set(connection, Date.now());
29+
});
30+
31+
const checkIdle = () => {
32+
const now = Date.now();
33+
for (const [ connection, timestamp ] of acquiredAt) {
34+
if (now - timestamp > timeout) {
35+
this.closeConnection(connection);
36+
acquiredAt.delete(connection);
37+
}
38+
}
39+
setTimeout(checkIdle, timeout);
40+
};
41+
checkIdle();
1542
}
1643

1744
/**

src/drivers/mysql/index.js

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,31 @@ class MysqlDriver extends AbstractDriver {
2121
* @param {boolean} opts.stringifyObjects - stringify object value in dataValues
2222
*/
2323
constructor(opts = {}) {
24+
super(opts);
25+
this.type = 'mysql';
26+
this.pool = this.createPool(opts);
27+
this.escape = this.pool.escape.bind(this.pool);
28+
this.recycleConnections();
29+
}
30+
31+
get escapeId() {
32+
return this.pool.escapeId;
33+
}
34+
35+
createPool(opts) {
36+
// some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one.
37+
const database = opts.appName || opts.database;
2438
const client = opts.client || 'mysql';
39+
const {
40+
host, port, user, password,
41+
connectionLimit, charset, stringifyObjects = false,
42+
} = opts;
43+
2544
if (client !== 'mysql' && client !== 'mysql2') {
2645
throw new Error(`Unsupported mysql client ${client}`);
2746
}
28-
const { host, port, user, password, connectionLimit, charset, stringifyObjects = false } = opts;
29-
// some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one.
30-
const database = opts.appName || opts.database;
31-
super(opts);
32-
this.type = 'mysql';
33-
this.database = database;
34-
this.pool = require(client).createPool({
47+
48+
return require(client).createPool({
3549
connectionLimit,
3650
host,
3751
port,
@@ -41,12 +55,6 @@ class MysqlDriver extends AbstractDriver {
4155
charset,
4256
stringifyObjects,
4357
});
44-
45-
this.escape = this.pool.escape.bind(this.pool);
46-
}
47-
48-
get escapeId() {
49-
return this.pool.escapeId;
5058
}
5159

5260
getConnection() {
@@ -61,11 +69,16 @@ class MysqlDriver extends AbstractDriver {
6169
});
6270
}
6371

72+
closeConnection(connection) {
73+
connection.release();
74+
connection.destroy();
75+
}
76+
6477
async query(query, values, opts = {}) {
65-
const { pool, logger } = this;
66-
const { connection } = opts;
78+
const { logger } = this;
79+
const connection = opts.connection || await this.getConnection();
6780
const promise = new Promise((resolve, reject) => {
68-
(connection || pool).query(query, values, (err, results, fields) => {
81+
connection.query(query, values, (err, results, fields) => {
6982
if (err) {
7083
reject(err);
7184
} else {
@@ -82,6 +95,8 @@ class MysqlDriver extends AbstractDriver {
8295
} catch (err) {
8396
logger.logQueryError(sql, err, Date.now() - start, opts);
8497
throw err;
98+
} finally {
99+
if (!opts.connection) connection.release();
85100
}
86101

87102
logger.logQuery(sql, Date.now() - start, opts);

src/drivers/mysql/schema.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ module.exports = {
5858
* @param {string} newColumn the new column name
5959
*/
6060
async renameColumn(table, name, newName) {
61-
const { database, escapeId } = this;
61+
const { escapeId } = this;
62+
const { database } = this.options;
6263
const { columnName } = new Attribute(name);
6364
const schemaInfo = await this.querySchemaInfo(database, table);
6465
const { columnName: _, ...columnInfo } = schemaInfo[table].find(entry => {

src/drivers/postgres/index.js

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,30 @@ function parameterize(sql, values) {
107107
class PostgresDriver extends AbstractDriver {
108108
constructor(opts = {}) {
109109
super(opts);
110-
const { host, port, user, password, database } = opts;
111-
112110
this.type = 'postgres';
113-
this.pool = new Pool({ host, port, user, password, database });
111+
this.pool = this.createPool(opts);
112+
this.recycleConnections();
113+
}
114+
115+
createPool(opts) {
116+
const { host, port, user, password, database } = opts;
117+
return new Pool({ host, port, user, password, database });
114118
}
115119

116120
async getConnection() {
117121
return await this.pool.connect();
118122
}
119123

124+
async closeConnection(client) {
125+
client.release();
126+
await client.end();
127+
}
128+
120129
async query(query, values, spell = {}) {
121130
const { sql, nestTables } = typeof query === 'string' ? { sql: query } : query;
122131
const { text } = parameterize(sql, values);
123132
const { logger } = this;
124-
const client = spell && spell.connection || this.pool;
133+
const connection = spell.connection || await this.getConnection();
125134
const command = sql.slice(0, sql.indexOf(' ')).toLowerCase();
126135

127136
async function tryQuery(...args) {
@@ -130,10 +139,12 @@ class PostgresDriver extends AbstractDriver {
130139
let result;
131140

132141
try {
133-
result = await client.query(...args);
142+
result = await connection.query(...args);
134143
} catch (err) {
135144
logger.logQueryError(formatted, err, Date.now() - start, spell);
136145
throw err;
146+
} finally {
147+
if (!spell.connection) connection.release();
137148
}
138149

139150
logger.logQuery(formatted, Date.now() - start, spell);

src/drivers/sqlite/index.js

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22

3+
const EventEmitter = require('events');
34
const strftime = require('strftime');
45

56
const AbstractDriver = require('../abstract');
@@ -34,11 +35,11 @@ function nest(rows, fields, spell) {
3435
}
3536

3637
class Connection {
37-
constructor({ client, database, mode, logger }) {
38+
constructor({ client, database, mode, pool }) {
3839
const { Database, OPEN_READWRITE, OPEN_CREATE } = client;
3940
if (mode == null) mode = OPEN_READWRITE | OPEN_CREATE;
4041
this.database = new Database(database, mode);
41-
this.logger = logger;
42+
this.pool = pool;
4243
}
4344

4445
async query(query, values, spell) {
@@ -48,15 +49,14 @@ class Connection {
4849
const result = await this.all(sql, values);
4950
if (nestTables) return nest(result.rows, result.fields, spell);
5051
return result;
51-
} else {
52-
return await this.run(sql, values);
5352
}
53+
return await this.run(sql, values);
5454
}
5555

5656
all(sql, values) {
5757
return new Promise((resolve, reject) => {
5858
this.database.all(sql, values, (err, rows, fields) => {
59-
if (err) reject(new Error(err.stack));
59+
if (err) reject(err);
6060
else resolve({ rows, fields });
6161
});
6262
});
@@ -70,39 +70,84 @@ class Connection {
7070
});
7171
});
7272
}
73+
74+
release() {
75+
if (this.pool) this.pool.releaseConnection(this);
76+
}
77+
78+
async destroy() {
79+
return await new Promise((resolve, reject) => {
80+
this.database.close(function(err) {
81+
if (err) reject(err);
82+
resolve();
83+
});
84+
});
85+
}
86+
}
87+
88+
class Pool extends EventEmitter {
89+
constructor(opts) {
90+
super(opts);
91+
this.options = {
92+
connectionLimit: 10,
93+
...opts,
94+
client: opts.client || 'sqlite3',
95+
};
96+
this.client = require(this.options.client);
97+
this.connections = [];
98+
this.queue = [];
99+
}
100+
101+
async getConnection() {
102+
const { connections, queue, client, options } = this;
103+
for (const connection of connections) {
104+
if (connection.idle) {
105+
connection.idle = false;
106+
this.emit('acquire', connection);
107+
return connection;
108+
}
109+
}
110+
if (connections.length < options.connectionLimit) {
111+
const connection = new Connection({ ...options, client, pool: this });
112+
connections.push(connection);
113+
this.emit('acquire', connection);
114+
return connection;
115+
}
116+
await new Promise(resolve => queue.push(resolve));
117+
return await this.getConnection();
118+
}
119+
120+
releaseConnection(connection) {
121+
connection.idle = true;
122+
this.emit('release', connection);
123+
124+
const { queue } = this;
125+
while (queue.length > 0) {
126+
const task = queue.shift();
127+
task();
128+
}
129+
}
73130
}
74131

75132
class SqliteDriver extends AbstractDriver {
76133
constructor(opts = {}) {
77134
super(opts);
78-
const { logger } = this;
79-
const client = require(opts.client || 'sqlite3');
80135
this.type = 'sqlite';
81-
this.connections = [ new Connection({ ...opts, client, logger }) ];
82-
this.callbacks = [];
136+
this.pool = this.createPool(opts);
137+
this.recycleConnections();
83138
}
84139

85-
async getConnection() {
86-
const { connections, callbacks } = this;
87-
88-
if (connections.length > 0) {
89-
const connection = connections.shift();
90-
return Object.assign(connection, {
91-
release() {
92-
connections.push(connection);
93-
while (callbacks.length > 0) {
94-
const callback = callbacks.shift();
95-
callback();
96-
}
97-
},
98-
});
99-
}
140+
createPool(opts) {
141+
return new Pool(opts);
142+
}
100143

101-
await new Promise((resolve) => {
102-
callbacks.push(resolve);
103-
});
144+
async getConnection() {
145+
return await this.pool.getConnection();
146+
}
104147

105-
return this.getConnection();
148+
async closeConnection(connection) {
149+
connection.release();
150+
await connection.destroy();
106151
}
107152

108153
async query(query, values, opts = {}) {

test/unit/adapters/sequelize.test.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,9 @@ describe('=> Sequelize adapter', () => {
531531
const post = await Post.find(posts[1].id);
532532
assert.equal(post.title, 'Tyrael');
533533

534-
const post2 = await Post.find({ title: 'Leah' });
534+
const post2 = await Post.find({
535+
where: { title: 'Leah' },
536+
});
535537
assert.equal(post2.title, 'Leah');
536538
});
537539

test/unit/drivers/abstract/index.test.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22

3+
const EventEmitter = require('events');
34
const assert = require('assert').strict;
45
const dayjs = require('dayjs');
56
const { Logger } = require('../../../..');
@@ -55,3 +56,23 @@ describe('=> AbstractDriver#logger', function() {
5556
assert.ok(driver.logger instanceof CustomLogger);
5657
});
5758
});
59+
60+
describe('=> AbstractDriver#recycleConnections', function() {
61+
it('should close idle connections', async function() {
62+
const driver = new AbstractDriver({ idleTimeout: 0.01 });
63+
driver.pool = new EventEmitter();
64+
let released;
65+
let destroyed;
66+
driver.recycleConnections();
67+
driver.closeConnection = function() {
68+
released = true;
69+
destroyed = true;
70+
};
71+
driver.pool.emit('acquire', {});
72+
assert.ok(!released);
73+
assert.ok(!destroyed);
74+
await new Promise(resolve => setTimeout(resolve, 30));
75+
assert.ok(released);
76+
assert.ok(destroyed);
77+
});
78+
});

0 commit comments

Comments
 (0)