Skip to content

Commit a63e28e

Browse files
authored
Merge pull request #205 from horoshenkiy/ikhoroshenkiy/add_socket_timeout
2 parents 3dd998e + feff79c commit a63e28e

File tree

6 files changed

+69
-17
lines changed

6 files changed

+69
-17
lines changed

clickhouse/base/socket.cpp

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ void SetNonBlock(SOCKET fd, bool value) {
112112
#endif
113113
}
114114

115+
void SetTimeout(SOCKET fd, const SocketTimeoutParams& timeout_params) {
116+
#if defined(_unix_)
117+
timeval recv_timeout { .tv_sec = timeout_params.recv_timeout.count(), .tv_usec = 0 };
118+
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout));
119+
120+
timeval send_timeout { .tv_sec = timeout_params.send_timeout.count(), .tv_usec = 0 };
121+
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout));
122+
#endif
123+
};
124+
115125
ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept {
116126
#if defined(_win_)
117127
return WSAPoll(fds, nfds, timeout);
@@ -120,7 +130,7 @@ ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept {
120130
#endif
121131
}
122132

123-
SOCKET SocketConnect(const NetworkAddress& addr) {
133+
SOCKET SocketConnect(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params) {
124134
int last_err = 0;
125135
for (auto res = addr.Info(); res != nullptr; res = res->ai_next) {
126136
SOCKET s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
@@ -130,6 +140,7 @@ SOCKET SocketConnect(const NetworkAddress& addr) {
130140
}
131141

132142
SetNonBlock(s, true);
143+
SetTimeout(s, timeout_params);
133144

134145
if (connect(s, res->ai_addr, (int)res->ai_addrlen) != 0) {
135146
int err = getSocketErrorCode();
@@ -213,22 +224,24 @@ NetworkAddress::~NetworkAddress() {
213224
const struct addrinfo* NetworkAddress::Info() const {
214225
return info_;
215226
}
227+
216228
const std::string & NetworkAddress::Host() const {
217229
return host_;
218230
}
219231

220232

221233
SocketBase::~SocketBase() = default;
222234

235+
223236
SocketFactory::~SocketFactory() = default;
224237

225238
void SocketFactory::sleepFor(const std::chrono::milliseconds& duration) {
226239
std::this_thread::sleep_for(duration);
227240
}
228241

229242

230-
Socket::Socket(const NetworkAddress& addr)
231-
: handle_(SocketConnect(addr))
243+
Socket::Socket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params)
244+
: handle_(SocketConnect(addr, timeout_params))
232245
{}
233246

234247
Socket::Socket(Socket&& other) noexcept
@@ -300,19 +313,21 @@ std::unique_ptr<OutputStream> Socket::makeOutputStream() const {
300313
return std::make_unique<SocketOutput>(handle_);
301314
}
302315

316+
303317
NonSecureSocketFactory::~NonSecureSocketFactory() {}
304318

305319
std::unique_ptr<SocketBase> NonSecureSocketFactory::connect(const ClientOptions &opts) {
306320
const auto address = NetworkAddress(opts.host, std::to_string(opts.port));
307321

308-
auto socket = doConnect(address);
322+
auto socket = doConnect(address, opts);
309323
setSocketOptions(*socket, opts);
310324

311325
return socket;
312326
}
313327

314-
std::unique_ptr<Socket> NonSecureSocketFactory::doConnect(const NetworkAddress& address) {
315-
return std::make_unique<Socket>(address);
328+
std::unique_ptr<Socket> NonSecureSocketFactory::doConnect(const NetworkAddress& address, const ClientOptions& opts) {
329+
SocketTimeoutParams timeout_params { opts.connection_recv_timeout, opts.connection_send_timeout };
330+
return std::make_unique<Socket>(address, timeout_params);
316331
}
317332

318333
void NonSecureSocketFactory::setSocketOptions(Socket &socket, const ClientOptions &opts) {
@@ -327,6 +342,7 @@ void NonSecureSocketFactory::setSocketOptions(Socket &socket, const ClientOption
327342
}
328343
}
329344

345+
330346
SocketInput::SocketInput(SOCKET s)
331347
: s_(s)
332348
{

clickhouse/base/socket.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,14 @@ class SocketFactory {
8282
};
8383

8484

85+
struct SocketTimeoutParams {
86+
const std::chrono::seconds recv_timeout {0};
87+
const std::chrono::seconds send_timeout {0};
88+
};
89+
8590
class Socket : public SocketBase {
8691
public:
87-
Socket(const NetworkAddress& addr);
92+
Socket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params);
8893
Socket(Socket&& other) noexcept;
8994
Socket& operator=(Socket&& other) noexcept;
9095

@@ -119,7 +124,7 @@ class NonSecureSocketFactory : public SocketFactory {
119124
std::unique_ptr<SocketBase> connect(const ClientOptions& opts) override;
120125

121126
protected:
122-
virtual std::unique_ptr<Socket> doConnect(const NetworkAddress& address);
127+
virtual std::unique_ptr<Socket> doConnect(const NetworkAddress& address, const ClientOptions& opts);
123128

124129
void setSocketOptions(Socket& socket, const ClientOptions& opts);
125130
};

clickhouse/base/sslsocket.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ SSL_CTX * SSLContext::getContext() {
198198
<< "\n\t handshake state: " << SSL_get_state(ssl_) \
199199
<< std::endl
200200
*/
201-
SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params,
202-
SSLContext& context)
203-
: Socket(addr)
201+
SSLSocket::SSLSocket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params,
202+
const SSLParams & ssl_params, SSLContext& context)
203+
: Socket(addr, timeout_params)
204204
, ssl_(SSL_new(context.getContext()), &SSL_free)
205205
{
206206
auto ssl = ssl_.get();
@@ -267,8 +267,9 @@ SSLSocketFactory::SSLSocketFactory(const ClientOptions& opts)
267267

268268
SSLSocketFactory::~SSLSocketFactory() = default;
269269

270-
std::unique_ptr<Socket> SSLSocketFactory::doConnect(const NetworkAddress& address) {
271-
return std::make_unique<SSLSocket>(address, ssl_params_, *ssl_context_);
270+
std::unique_ptr<Socket> SSLSocketFactory::doConnect(const NetworkAddress& address, const ClientOptions& opts) {
271+
SocketTimeoutParams timeout_params { opts.connection_recv_timeout, opts.connection_send_timeout };
272+
return std::make_unique<SSLSocket>(address, timeout_params, ssl_params_, *ssl_context_);
272273
}
273274

274275
std::unique_ptr<InputStream> SSLSocket::makeInputStream() const {

clickhouse/base/sslsocket.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ class SSLContext
4848

4949
class SSLSocket : public Socket {
5050
public:
51-
explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context);
51+
explicit SSLSocket(const NetworkAddress& addr, const SocketTimeoutParams& timeout_params,
52+
const SSLParams& ssl_params, SSLContext& context);
53+
5254
SSLSocket(SSLSocket &&) = default;
5355
~SSLSocket() override = default;
5456

@@ -69,7 +71,7 @@ class SSLSocketFactory : public NonSecureSocketFactory {
6971
~SSLSocketFactory() override;
7072

7173
protected:
72-
std::unique_ptr<Socket> doConnect(const NetworkAddress& address) override;
74+
std::unique_ptr<Socket> doConnect(const NetworkAddress& address, const ClientOptions& opts) override;
7375

7476
private:
7577
const SSLParams ssl_params_;

clickhouse/client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ struct ClientOptions {
8686
// TCP options
8787
DECLARE_FIELD(tcp_nodelay, bool, TcpNoDelay, true);
8888

89+
/// Connection socket timeout. If the timeout is set to zero then the operation will never timeout.
90+
DECLARE_FIELD(connection_recv_timeout, std::chrono::seconds, SetConnectionRecvTimeout, std::chrono::seconds(0));
91+
DECLARE_FIELD(connection_send_timeout, std::chrono::seconds, SetConnectionSendTimeout, std::chrono::seconds(0));
92+
8993
// TODO deprecate setting
9094
/** It helps to ease migration of the old codebases, which can't afford to switch
9195
* to using ColumnLowCardinalityT or ColumnLowCardinality directly,

ut/socket_ut.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,45 @@ TEST(Socketcase, connecterror) {
1818

1919
std::this_thread::sleep_for(std::chrono::seconds(1));
2020
try {
21-
Socket socket(addr);
21+
Socket socket(addr, SocketTimeoutParams {});
2222
} catch (const std::system_error& e) {
2323
FAIL();
2424
}
2525

2626
std::this_thread::sleep_for(std::chrono::seconds(1));
2727
server.stop();
2828
try {
29-
Socket socket(addr);
29+
Socket socket(addr, SocketTimeoutParams {});
3030
FAIL();
3131
} catch (const std::system_error& e) {
3232
ASSERT_NE(EINPROGRESS,e.code().value());
3333
}
3434
}
3535

36+
TEST(Socketcase, timeoutrecv) {
37+
using Seconds = std::chrono::seconds;
38+
39+
int port = 19979;
40+
NetworkAddress addr("localhost", std::to_string(port));
41+
LocalTcpServer server(port);
42+
server.start();
43+
44+
std::this_thread::sleep_for(std::chrono::seconds(1));
45+
try {
46+
Socket socket(addr, SocketTimeoutParams { .recv_timeout = Seconds(5), .send_timeout = Seconds(5) });
47+
48+
std::unique_ptr<InputStream> ptr_input_stream = socket.makeInputStream();
49+
char buf[1024];
50+
ptr_input_stream->Read(buf, sizeof(buf));
51+
52+
} catch (const std::system_error& e) {
53+
ASSERT_EQ(EAGAIN, e.code().value());
54+
}
55+
56+
std::this_thread::sleep_for(std::chrono::seconds(1));
57+
server.stop();
58+
}
59+
3660
// Test to verify that reading from empty socket doesn't hangs.
3761
//TEST(Socketcase, ReadFromEmptySocket) {
3862
// const int port = 12345;

0 commit comments

Comments
 (0)