Skip to content

Commit 943c4d8

Browse files
committed
add pool timer to clean connection pool
1 parent 02b8718 commit 943c4d8

File tree

5 files changed

+105
-26
lines changed

5 files changed

+105
-26
lines changed

Release/include/cpprest/details/http_server_asio.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ class connection
8787
if (is_https)
8888
{
8989
m_ssl_context = utility::details::make_unique<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23);
90-
ssl_context_callback(*m_ssl_context);
90+
if (ssl_context_callback)
91+
{
92+
ssl_context_callback(*m_ssl_context);
93+
}
9194
m_ssl_stream = utility::details::make_unique<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>>(*m_socket, *m_ssl_context);
9295

9396
m_ssl_stream->async_handshake(boost::asio::ssl::stream_base::server, [this](const boost::system::error_code&) { this->start_request_response(); });

Release/include/cpprest/http_client.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ class http_client_config
100100
#endif
101101
, m_set_user_nativehandle_options([](native_handle)->void{})
102102
#if !defined(_WIN32) && !defined(__cplusplus_winrt)
103-
, m_ssl_context_callback([](boost::asio::ssl::context&)->void{})
104103
, m_tlsext_sni_enabled(true)
105104
#endif
106105
#if defined(_WIN32) && !defined(__cplusplus_winrt)

Release/include/pplx/threadpool.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,31 @@ class threadpool
101101
return m_service;
102102
}
103103

104-
std::shared_ptr<web::http::client::details::asio_connection_pool> obtain_connection_pool(const std::string &base_uri, std::function<std::shared_ptr<web::http::client::details::asio_connection_pool>()> connection_pool_generator)
104+
template<typename PoolGenerator>
105+
std::shared_ptr<web::http::client::details::asio_connection_pool> obtain_connection_pool(const std::string &key, PoolGenerator pool_generator)
105106
{
106107
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
107108

108-
auto &pool = m_connection_pool_map[base_uri];
109+
auto &pool = m_connection_pool_map[key];
109110
if (!pool)
110111
{
111-
pool = connection_pool_generator();
112+
pool = pool_generator();
112113
}
113114

114115
return pool;
115116
}
116117

118+
template<typename PoolReleaseHandler>
119+
void release_connection_pool(const std::string &key, PoolReleaseHandler handler)
120+
{
121+
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
122+
123+
auto pool = m_connection_pool_map[key];
124+
handler(pool);
125+
}
126+
127+
void free_connection_pool(const boost::system::error_code &ec, const std::string &key);
128+
117129
private:
118130
struct _cancel_thread { };
119131

Release/src/http/client/http_client_asio.cpp

Lines changed: 77 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class asio_connection
8080
asio_connection(boost::asio::io_service& io_service, bool start_with_ssl, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback) :
8181
m_socket(io_service),
8282
m_ssl_context_callback(ssl_context_callback),
83-
m_pool_timer(io_service),
83+
m_connection_timer(io_service),
8484
m_is_reused(false),
8585
m_keep_alive(true)
8686
{
@@ -103,7 +103,10 @@ class asio_connection
103103
boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23);
104104
ssl_context.set_default_verify_paths();
105105
ssl_context.set_options(boost::asio::ssl::context::default_workarounds);
106-
m_ssl_context_callback(ssl_context);
106+
if (m_ssl_context_callback)
107+
{
108+
m_ssl_context_callback(ssl_context);
109+
}
107110
m_ssl_stream = utility::details::make_unique<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>>(m_socket, ssl_context);
108111
}
109112

@@ -127,9 +130,9 @@ class asio_connection
127130
return error;
128131
}
129132

130-
void cancel_pool_timer()
133+
void cancel_connection_timer()
131134
{
132-
m_pool_timer.cancel();
135+
m_connection_timer.cancel();
133136
}
134137

135138
bool is_reused() const { return m_is_reused; }
@@ -218,15 +221,15 @@ class asio_connection
218221

219222
private:
220223
template <typename TimeoutHandler>
221-
void start_pool_timer(int timeout_secs, const TimeoutHandler &handler)
224+
void start_connection_timer(int timeout_secs, const TimeoutHandler &handler)
222225
{
223-
m_pool_timer.expires_from_now(boost::posix_time::milliseconds(timeout_secs * 1000));
224-
m_pool_timer.async_wait(handler);
226+
m_connection_timer.expires_from_now(boost::posix_time::milliseconds(timeout_secs * 1000));
227+
m_connection_timer.async_wait(handler);
225228
}
226229

227230
void start_reuse()
228231
{
229-
cancel_pool_timer();
232+
cancel_connection_timer();
230233
m_is_reused = true;
231234
}
232235

@@ -239,7 +242,7 @@ class asio_connection
239242

240243
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback;
241244

242-
boost::asio::deadline_timer m_pool_timer;
245+
boost::asio::deadline_timer m_connection_timer;
243246
bool m_is_reused;
244247
bool m_keep_alive;
245248
};
@@ -252,7 +255,10 @@ class asio_connection_pool
252255
m_io_service(io_service),
253256
m_timeout_secs(static_cast<int>(idle_timeout.count())),
254257
m_start_with_ssl(start_with_ssl),
255-
m_ssl_context_callback(ssl_context_callback)
258+
m_ssl_context_callback(ssl_context_callback),
259+
m_pool_timeout_secs(60), // Clean this connection pool 60 secs after the last asio_client release it.
260+
m_pool_timer(io_service),
261+
m_use_count(0)
256262
{}
257263

258264
~asio_connection_pool()
@@ -261,10 +267,24 @@ class asio_connection_pool
261267
// Cancel the pool timer for all connections.
262268
for (auto& connection : m_connections)
263269
{
264-
connection->cancel_pool_timer();
270+
connection->cancel_connection_timer();
265271
}
266272
}
267273

274+
template <typename TimeoutHandler>
275+
void start_pool_timer(const TimeoutHandler &handler)
276+
{
277+
//std::lock_guard<std::mutex> lg(m_pool_timer_mutex);
278+
m_pool_timer.expires_from_now(boost::posix_time::milliseconds(m_pool_timeout_secs * 1000));
279+
m_pool_timer.async_wait(handler);
280+
}
281+
282+
void cancel_pool_timer()
283+
{
284+
//std::lock_guard<std::mutex> lg(m_pool_timer_mutex);
285+
m_pool_timer.cancel();
286+
}
287+
268288
void release(const std::shared_ptr<asio_connection> &connection)
269289
{
270290
if (connection->keep_alive() && (m_timeout_secs > 0))
@@ -274,7 +294,7 @@ class asio_connection_pool
274294
std::lock_guard<std::mutex> lock(m_connections_mutex);
275295
// This will destroy and remove the connection from pool after the set timeout.
276296
// We use 'this' because async calls to timer handler only occur while the pool exists.
277-
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::handle_pool_timer, this, boost::asio::placeholders::error, connection));
297+
connection->start_connection_timer(m_timeout_secs, boost::bind(&asio_connection_pool::handle_connection_timer, this, boost::asio::placeholders::error, connection));
278298
m_connections.push_back(connection);
279299
}
280300
// Otherwise connection is not put to the pool and it will go out of scope.
@@ -302,10 +322,15 @@ class asio_connection_pool
302322
}
303323
}
304324

325+
int &use_count()
326+
{
327+
return m_use_count;
328+
}
329+
305330
private:
306331

307332
// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope.
308-
void handle_pool_timer(const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection)
333+
void handle_connection_timer(const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection)
309334
{
310335
if (!ec)
311336
{
@@ -328,6 +353,12 @@ class asio_connection_pool
328353
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback;
329354
std::vector<std::shared_ptr<asio_connection> > m_connections;
330355
std::mutex m_connections_mutex;
356+
357+
const int m_pool_timeout_secs;
358+
std::mutex m_pool_timer_mutex;
359+
boost::asio::deadline_timer m_pool_timer;
360+
//std::atomic<int> m_use_count;
361+
int m_use_count;
331362
};
332363

333364

@@ -348,32 +379,56 @@ class asio_client : public _http_client_communicator
348379
}
349380
else
350381
{
351-
std::string host = base_uri().to_string();
382+
m_pool_key = base_uri().to_string();
352383

353384
auto &credentials = _http_client_communicator::client_config().credentials();
354385
if (credentials.is_set())
355386
{
356-
host.append(credentials.username());
387+
m_pool_key.append(credentials.username());
357388
}
358389

359390
auto &proxy = _http_client_communicator::client_config().proxy();
360391
if (proxy.is_specified())
361392
{
362-
host.append(proxy.address().to_string());
393+
m_pool_key.append(proxy.address().to_string());
363394
if (proxy.credentials().is_set())
364395
{
365-
host.append(proxy.credentials().username());
396+
m_pool_key.append(proxy.credentials().username());
366397
}
367398
}
368399

369-
m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(host, [this]()
400+
m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(m_pool_key, [this]()
370401
{
371402
return std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
372403
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
373404
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
374-
this->client_config().get_ssl_context_callback());
405+
nullptr);
406+
});
407+
408+
if (m_pool->use_count() == 0)
409+
{
410+
m_pool->cancel_pool_timer();
411+
}
412+
++m_pool->use_count();
413+
}
414+
}
415+
416+
~asio_client()
417+
{
418+
if (!m_pool_key.empty())
419+
{
420+
crossplat::threadpool::shared_instance().release_connection_pool(m_pool_key, [this](std::shared_ptr<web::http::client::details::asio_connection_pool> pool)
421+
{
422+
if (pool)
423+
{
424+
--pool->use_count();
425+
if (pool->use_count() == 0)
426+
{
427+
pool->start_pool_timer(boost::bind(&crossplat::threadpool::free_connection_pool, &crossplat::threadpool::shared_instance(), boost::asio::placeholders::error, m_pool_key));
428+
}
429+
}
375430
});
376-
}
431+
}
377432
}
378433

379434
void send_request(const std::shared_ptr<request_context> &request_ctx) override;
@@ -384,6 +439,7 @@ class asio_client : public _http_client_communicator
384439

385440
std::shared_ptr<asio_connection_pool> m_pool;
386441
tcp::resolver m_resolver;
442+
std::string m_pool_key;
387443
};
388444

389445
class asio_context : public request_context, public std::enable_shared_from_this<asio_context>
@@ -509,7 +565,7 @@ class asio_context : public request_context, public std::enable_shared_from_this
509565
{
510566
m_context->report_error("Failed to send connect request to proxy.", err, httpclient_errorcode_context::writebody);
511567
}
512-
}
568+
}
513569

514570
void handle_status_line(const boost::system::error_code& ec)
515571
{

Release/src/pplx/threadpool.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ threadpool& threadpool::shared_instance()
6767
return s_shared;
6868
}
6969

70+
void threadpool::free_connection_pool(const boost::system::error_code &ec, const std::string &key)
71+
{
72+
if (!ec)
73+
{
74+
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
75+
m_connection_pool_map.erase(key);
76+
}
77+
}
78+
7079
#endif
7180

7281
}

0 commit comments

Comments
 (0)