@@ -71,7 +71,7 @@ enum class httpclient_errorcode_context
71
71
close
72
72
};
73
73
74
- class asio_connection : public std ::enable_shared_from_this<asio_connection>
74
+ class asio_connection
75
75
{
76
76
friend class asio_client ;
77
77
public:
@@ -222,7 +222,7 @@ class asio_connection : public std::enable_shared_from_this<asio_connection>
222
222
bool m_keep_alive;
223
223
};
224
224
225
- // / <summary>Implements a connection pool with adaptive connection removal and callback notification on idle </summary>
225
+ // / <summary>Implements a connection pool with adaptive connection removal</summary>
226
226
// / <remarks>
227
227
// / The timeout mechanism is based on the `uint64_t m_epoch` member. Every 30 seconds,
228
228
// / the lambda in `start_epoch_interval` fires, triggering the cleanup of any
@@ -247,20 +247,18 @@ class asio_connection : public std::enable_shared_from_this<asio_connection>
247
247
// / }
248
248
// / </code>
249
249
// /
250
- // / Additionally, idle notification is only triggered when two cleanup phases have
251
- // / occurred with no calls to `acquire()` between them. This prevents a race
252
- // / condition where the cleanup might occur when the pool happens to be
253
- // / instantaneously empty but still under heavy load.
250
+ // / Additionally, when two cleanup phases have occurred with no calls to `release()`
251
+ // / between them, the internal self-reference is cleared. If there are no active
252
+ // / `http_client`s keeping the pool alive, this will cause the pool to expire upon
253
+ // / cleanup handler termination. Whenever a new call to `release()` arrives, the self
254
+ // / reference is re-applied to keep the pool alive.
254
255
// / </remarks>
255
256
class asio_connection_pool : public std ::enable_shared_from_this<asio_connection_pool>
256
257
{
257
258
public:
258
- asio_connection_pool (const std::function<void (asio_connection_pool*)>& m_cb = {})
259
- : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()),
260
- m_parent_pool_cb (m_cb)
261
- {
262
- start_epoch_interval (shared_from_this ());
263
- }
259
+ asio_connection_pool ()
260
+ : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
261
+ {}
264
262
265
263
std::shared_ptr<asio_connection> acquire ()
266
264
{
@@ -283,48 +281,66 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
283
281
return ;
284
282
285
283
std::lock_guard<std::mutex> lock (m_lock);
284
+ if (m_self_reference == nullptr )
285
+ {
286
+ auto sptr = this ->shared_from_this ();
287
+ m_self_reference = sptr;
288
+ start_epoch_interval (sptr);
289
+ }
290
+
286
291
m_epoch++;
287
292
m_connections.emplace_back (m_epoch, connection);
288
293
}
289
294
290
295
private:
296
+ // Note: must be called under m_lock
291
297
static void start_epoch_interval (const std::shared_ptr<asio_connection_pool>& pool) {
298
+ _ASSERTE (pool.get () != nullptr );
299
+ _ASSERTE (pool->m_self_reference != nullptr );
300
+
301
+ auto & self = *pool;
292
302
std::weak_ptr<asio_connection_pool> weak_pool = pool;
293
- auto prev_epoch = pool->m_epoch ;
303
+
304
+ self.m_prev_epoch = self.m_epoch ;
294
305
pool->m_pool_epoch_timer .expires_from_now (boost::posix_time::seconds (30 ));
295
- pool->m_pool_epoch_timer .async_wait ([prev_epoch, weak_pool](const boost::system::error_code& ec) {
306
+ pool->m_pool_epoch_timer .async_wait ([weak_pool](const boost::system::error_code& ec) {
296
307
if (ec)
297
308
return ;
298
309
299
310
auto pool = weak_pool.lock ();
300
311
if (!pool)
301
312
return ;
313
+ auto & self = *pool;
302
314
303
- std::lock_guard<std::mutex> lock (pool->m_lock );
304
- if (prev_epoch == pool->m_epoch && pool->m_parent_pool_cb )
315
+ std::lock_guard<std::mutex> lock (self.m_lock );
316
+ _ASSERTE (self.m_self_reference != nullptr );
317
+ if (self.m_prev_epoch == self.m_epoch )
305
318
{
306
- assert (pool-> m_connections .empty ());
307
- pool-> m_parent_pool_cb (pool. get ()) ;
319
+ assert (self. m_connections .empty ());
320
+ self. m_self_reference = nullptr ;
308
321
}
309
322
else
310
323
{
311
- auto erase_end = std::find_if (pool->m_connections .begin (), pool->m_connections .end (), [prev_epoch](std::pair<uint64_t , std::shared_ptr<asio_connection>>& p)
324
+ auto prev_epoch = self.m_prev_epoch ;
325
+ auto erase_end = std::find_if (self.m_connections .begin (), self.m_connections .end (),
326
+ [prev_epoch](std::pair<uint64_t , std::shared_ptr<asio_connection>>& p)
312
327
{
313
328
return p.first > prev_epoch;
314
329
});
315
330
316
- pool->m_connections .erase (pool->m_connections .begin (), erase_end);
331
+ self.m_connections .erase (self.m_connections .begin (), erase_end);
332
+ start_epoch_interval (pool);
317
333
}
318
- start_epoch_interval (pool);
319
334
});
320
335
}
321
336
322
337
std::mutex m_lock;
323
338
boost::asio::deadline_timer m_pool_epoch_timer;
324
339
std::deque<std::pair<uint64_t , std::shared_ptr<asio_connection>>> m_connections;
325
340
uint64_t m_epoch = 0 ;
341
+ uint64_t m_prev_epoch = 0 ;
326
342
327
- const std::function< void ( asio_connection_pool*)> m_parent_pool_cb ;
343
+ std::shared_ptr< asio_connection_pool> m_self_reference ;
328
344
};
329
345
330
346
class asio_shared_connection_pool : public std ::enable_shared_from_this<asio_shared_connection_pool>
@@ -336,26 +352,27 @@ class asio_shared_connection_pool : public std::enable_shared_from_this<asio_sha
336
352
337
353
std::lock_guard<std::mutex> lock (m_lock);
338
354
auto it = m_pools.find (pool_key);
339
- if (it = = m_pools.end ())
355
+ if (it ! = m_pools.end ())
340
356
{
341
- std::weak_ptr<asio_shared_connection_pool> weak_this = shared_from_this ();
342
- ret = std::make_shared<asio_connection_pool>([pool_key, weak_this](asio_connection_pool* inner_pool_ptr )
357
+ ret = it-> second . lock ();
358
+ if ( ret == nullptr )
343
359
{
344
- auto self = weak_this.lock ();
345
- if (!self)
346
- return ;
347
-
348
- std::lock_guard<std::mutex> lock (self->m_lock );
349
- auto it = self->m_pools .find (pool_key);
350
- if (it != self->m_pools .end () && it->second .get () == inner_pool_ptr)
351
- self->m_pools .erase (it);
352
- });
353
- m_pools.emplace (pool_key, ret);
360
+ // Previous pool expired
361
+ ret = std::make_shared<asio_connection_pool>();
362
+ it->second = ret;
363
+ }
354
364
}
355
365
else
356
366
{
357
- ret = it->second ;
367
+ if (m_pools.empty ())
368
+ {
369
+ // If transitioning from empty to having a single element, restart the timer.
370
+ start_timer (shared_from_this ());
371
+ }
372
+ ret = std::make_shared<asio_connection_pool>();
373
+ m_pools.emplace (pool_key, ret);
358
374
}
375
+
359
376
assert (ret != nullptr );
360
377
return ret;
361
378
}
@@ -367,9 +384,39 @@ class asio_shared_connection_pool : public std::enable_shared_from_this<asio_sha
367
384
return s_instance;
368
385
}
369
386
387
+ asio_shared_connection_pool () : m_timer(crossplat::threadpool::shared_instance().service()) {}
388
+
370
389
private:
390
+ static void start_timer (const std::shared_ptr<asio_shared_connection_pool>& self)
391
+ {
392
+ self->m_timer .expires_from_now (boost::posix_time::seconds (60 ));
393
+ std::weak_ptr<asio_shared_connection_pool> weak_this = self;
394
+ self->m_timer .async_wait ([weak_this](const boost::system::error_code& ec)
395
+ {
396
+ if (ec)
397
+ return ;
398
+ auto strong_this = weak_this.lock ();
399
+ if (!strong_this)
400
+ return ;
401
+
402
+ std::lock_guard<std::mutex> lock (strong_this->m_lock );
403
+ auto b = strong_this->m_pools .begin ();
404
+ auto e = strong_this->m_pools .end ();
405
+ for (; b != e;)
406
+ {
407
+ if (b->second .expired ())
408
+ b = strong_this->m_pools .erase (b);
409
+ else
410
+ ++b;
411
+ }
412
+ if (!strong_this->m_pools .empty ())
413
+ start_timer (strong_this);
414
+ });
415
+ }
416
+
417
+ boost::asio::deadline_timer m_timer;
371
418
std::mutex m_lock;
372
- std::unordered_map<std::string, std::shared_ptr <asio_connection_pool>> m_pools;
419
+ std::unordered_map<std::string, std::weak_ptr <asio_connection_pool>> m_pools;
373
420
};
374
421
375
422
class asio_client final : public _http_client_communicator
0 commit comments