-
Notifications
You must be signed in to change notification settings - Fork 175
Add list of hosts with ports for setting connection #139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0d87218
0b74858
d713ce7
0f2da33
25b7fdc
662a27d
53affd2
fabac4d
edf5421
563b7b6
13294d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,8 +57,29 @@ struct ClientInfo { | |
}; | ||
|
||
std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { | ||
os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port | ||
<< " ping_before_query:" << opt.ping_before_query | ||
os << "Client(" << opt.user << '@'; | ||
|
||
bool many_hosts = int(opt.hosts_ports.size()) - int(!opt.host.empty()) > 1; | ||
if (many_hosts) { | ||
os << "{ "; | ||
if (!opt.host.empty()) { | ||
os << opt.host << ":" << opt.port << ","; | ||
} | ||
for (size_t i = 0; i < opt.hosts_ports.size(); ++i) { | ||
os << opt.hosts_ports[i].host << ":" << opt.hosts_ports[i].port.value_or(opt.port) | ||
<< (i != opt.hosts_ports.size() - 1 ? "," : "}"); | ||
} | ||
} | ||
else { | ||
if (opt.host.empty()) { | ||
os << opt.hosts_ports[0].host << ":" << opt.hosts_ports[0].port.value_or(opt.port); | ||
} | ||
else { | ||
os << opt.host << ":" << opt.port; | ||
} | ||
} | ||
|
||
os << " ping_before_query:" << opt.ping_before_query | ||
<< " send_retries:" << opt.send_retries | ||
<< " retry_timeout:" << opt.retry_timeout.count() | ||
<< " compression_method:" | ||
|
@@ -99,6 +120,8 @@ class Client::Impl { | |
|
||
const ServerInfo& GetServerInfo() const; | ||
|
||
const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const; | ||
|
||
private: | ||
bool Handshake(); | ||
|
||
|
@@ -167,6 +190,7 @@ class Client::Impl { | |
#endif | ||
|
||
ServerInfo server_info_; | ||
std::optional<ClientOptions::HostPort> connected_host_port_; | ||
}; | ||
|
||
|
||
|
@@ -296,73 +320,100 @@ void Client::Impl::Ping() { | |
} | ||
|
||
void Client::Impl::ResetConnection() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ⚠ If there was a temporary communication problem, the client could re-connect to a different server (which I believe may be unexpected by the user). Please make sure that client tries to reconnect to the same server first ( Also I suggest to explicitly allow users to reset connection endpoint, maybe via separate function or E.g.:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed in direct messages that:
|
||
connected_host_port_.reset(); | ||
for (int i = -1; i < int(options_.hosts_ports.size()); ++i) { | ||
const ClientOptions::HostPort& host_port = i == -1 ? ClientOptions::HostPort(options_.host) : options_.hosts_ports[i]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the list of endpoints before copying it to a |
||
try { | ||
std::unique_ptr<Socket> socket; | ||
|
||
std::unique_ptr<Socket> socket; | ||
|
||
const auto address = NetworkAddress(options_.host, std::to_string(options_.port)); | ||
const auto address = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); | ||
#if defined(WITH_OPENSSL) | ||
// TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense. | ||
std::unique_ptr<SSLContext> ssl_context; | ||
if (options_.ssl_options.use_ssl) { | ||
const auto ssl_options = options_.ssl_options; | ||
const auto ssl_params = SSLParams { | ||
ssl_options.path_to_ca_files, | ||
ssl_options.path_to_ca_directory, | ||
ssl_options.use_default_ca_locations, | ||
ssl_options.context_options, | ||
ssl_options.min_protocol_version, | ||
ssl_options.max_protocol_version, | ||
ssl_options.use_sni | ||
}; | ||
|
||
if (ssl_options.ssl_context) | ||
ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context); | ||
else { | ||
ssl_context = std::make_unique<SSLContext>(ssl_params); | ||
} | ||
|
||
socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context); | ||
} | ||
else | ||
// TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense. | ||
std::unique_ptr<SSLContext> ssl_context; | ||
if (options_.ssl_options.use_ssl) { | ||
const auto ssl_options = options_.ssl_options; | ||
const auto ssl_params = SSLParams { | ||
ssl_options.path_to_ca_files, | ||
ssl_options.path_to_ca_directory, | ||
ssl_options.use_default_ca_locations, | ||
ssl_options.context_options, | ||
ssl_options.min_protocol_version, | ||
ssl_options.max_protocol_version, | ||
ssl_options.use_sni | ||
}; | ||
|
||
if (ssl_options.ssl_context) | ||
ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context); | ||
else { | ||
ssl_context = std::make_unique<SSLContext>(ssl_params); | ||
} | ||
|
||
socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context); | ||
} | ||
else | ||
#endif | ||
socket = std::make_unique<Socket>(address); | ||
socket = std::make_unique<Socket>(address); | ||
|
||
if (options_.tcp_keepalive) { | ||
socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), | ||
options_.tcp_keepalive_intvl.count(), | ||
options_.tcp_keepalive_cnt); | ||
} | ||
if (options_.tcp_nodelay) { | ||
socket->SetTcpNoDelay(options_.tcp_nodelay); | ||
} | ||
if (options_.tcp_keepalive) { | ||
socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), | ||
options_.tcp_keepalive_intvl.count(), | ||
options_.tcp_keepalive_cnt); | ||
} | ||
if (options_.tcp_nodelay) { | ||
socket->SetTcpNoDelay(options_.tcp_nodelay); | ||
} | ||
|
||
OutputStreams output_streams; | ||
auto socket_output = output_streams.Add(socket->makeOutputStream()); | ||
auto output = output_streams.AddNew<BufferedOutput>(socket_output); | ||
OutputStreams output_streams; | ||
auto socket_output = output_streams.Add(socket->makeOutputStream()); | ||
auto output = output_streams.AddNew<BufferedOutput>(socket_output); | ||
|
||
InputStreams input_streams; | ||
auto socket_input = input_streams.Add(socket->makeInputStream()); | ||
auto input = input_streams.AddNew<BufferedInput>(socket_input); | ||
InputStreams input_streams; | ||
auto socket_input = input_streams.Add(socket->makeInputStream()); | ||
auto input = input_streams.AddNew<BufferedInput>(socket_input); | ||
|
||
std::swap(output_streams, output_streams_); | ||
std::swap(input_streams, input_streams_); | ||
std::swap(socket, socket_); | ||
output_ = output; | ||
input_ = input; | ||
std::swap(output_streams, output_streams_); | ||
std::swap(input_streams, input_streams_); | ||
std::swap(socket, socket_); | ||
output_ = output; | ||
input_ = input; | ||
|
||
#if defined(WITH_OPENSSL) | ||
std::swap(ssl_context_, ssl_context); | ||
std::swap(ssl_context_, ssl_context); | ||
#endif | ||
|
||
if (!Handshake()) { | ||
throw std::runtime_error("fail to connect to " + options_.host); | ||
if (!Handshake()) { | ||
throw std::runtime_error("fail to connect to " + host_port.host); | ||
} | ||
} catch (const std::system_error &e) { | ||
if (i == int(options_.hosts_ports.size()) - 1) { | ||
throw; | ||
} | ||
continue; | ||
} catch (const std::runtime_error &e) { | ||
if (i == int(options_.hosts_ports.size()) - 1) { | ||
throw; | ||
} | ||
continue; | ||
} catch (...) { | ||
if (i == int(options_.hosts_ports.size()) - 1) { | ||
throw; | ||
} | ||
continue; | ||
} | ||
|
||
connected_host_port_ = host_port; | ||
return; | ||
} | ||
} | ||
|
||
const ServerInfo& Client::Impl::GetServerInfo() const { | ||
return server_info_; | ||
} | ||
|
||
const std::optional<ClientOptions::HostPort>& Client::Impl::GetConnectedHostPort() const { | ||
return connected_host_port_; | ||
} | ||
|
||
bool Client::Impl::Handshake() { | ||
if (!SendHello()) { | ||
return false; | ||
|
@@ -831,4 +882,8 @@ const ServerInfo& Client::GetServerInfo() const { | |
return impl_->GetServerInfo(); | ||
} | ||
|
||
const std::optional<ClientOptions::HostPort>& Client::GetConnectedHostPort() const { | ||
return impl_->GetConnectedHostPort(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -51,6 +51,16 @@ struct ClientOptions { | |||||||||||||||||||||||||
return *this; \ | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/// List of hostnames with service ports | ||||||||||||||||||||||||||
struct HostPort { | ||||||||||||||||||||||||||
std::string host; | ||||||||||||||||||||||||||
std::optional<unsigned int> port; | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fix: add |
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
explicit HostPort(std::string host, std::optional<unsigned int> port = std::nullopt) : host(std::move(host)), port(std::move(port)) { | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||
Comment on lines
+56
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename
Suggested change
|
||||||||||||||||||||||||||
DECLARE_FIELD(hosts_ports, std::vector<HostPort>, SetHost,{}); | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
/// Hostname of the server. | ||||||||||||||||||||||||||
DECLARE_FIELD(host, std::string, SetHost, std::string()); | ||||||||||||||||||||||||||
/// Service port. | ||||||||||||||||||||||||||
|
@@ -196,6 +206,8 @@ class Client { | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const ServerInfo& GetServerInfo() const; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const; | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please specify in comments when this function may return |
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
private: | ||||||||||||||||||||||||||
const ClientOptions options_; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -481,7 +481,7 @@ static void RunTests(Client& client) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ArrayExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CancelableExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DateExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DateTime64Example(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// DateTime64Example(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DecimalExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EnumExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ExecptionExample(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -510,6 +510,67 @@ int main() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetCompressionMethod(CompressionMethod::LZ4)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RunTests(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add unit tests instead, the sample program is not executed in CI/CD and hence your code is not checked. I suggest you to add a separate test-cases, both positive and negative in
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort correct_host_port = ClientOptions::HostPort("localhost", 9000); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Client client(ClientOptions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetHost({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("localhost", 8000), // wrong port | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("localhost", 7000), // wrong port | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("1127.91.2.1"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("1127.91.2.2"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("notlocalwronghost"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("another_notlocalwronghost"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
correct_host_port, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("localhost", 9001), // wrong port | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("1127.911.2.2"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetPingBeforeQuery(true)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+515
to
+528
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, this allows simplifying client code:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert(client.GetConnectedHostPort() == correct_host_port); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RunTests(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Client client(ClientOptions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetHost({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("notlocalwronghost") // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetSendRetries(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetPingBeforeQuery(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert(false && "exception must be thrown"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (const std::exception &e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Client client(ClientOptions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetHost({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("localhost", 8000), // wrong port | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetSendRetries(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetPingBeforeQuery(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert(false && "exception must be thrown"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (const std::runtime_error &e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Client client(ClientOptions() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetHost({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ClientOptions::HostPort("1127.91.2.1"), // wrong host | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetSendRetries(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.SetPingBeforeQuery(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert(false && "exception must be thrown"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (const std::runtime_error &e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (const std::exception& e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
std::cerr << "exception : " << e.what() << std::endl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please modify endpoints creation to simplify the
ResetConnection()