Skip to content

Add list of hosts with ports for setting connection #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,16 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block)
/// Delete table.
client.Execute("DROP TABLE test.numbers");
```

## Features
### Multiple host
It is possible to specify multiple hosts to connect to. The connection
will be set to the first available host.
```cpp
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("host1.com", 8000),
ClientOptions::HostPort("host2.com"), /// port is ClientOptions.port
}));

```
157 changes: 106 additions & 51 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,29 @@ struct ClientInfo {
};

std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) {
os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port
<< " ping_before_query:" << opt.ping_before_query
os << "Client(" << opt.user << '@';

bool many_hosts = int(opt.hosts_ports.size()) - int(!opt.host.empty()) > 1;
if (many_hosts) {
os << "{ ";
if (!opt.host.empty()) {
os << opt.host << ":" << opt.port << ",";
}
for (size_t i = 0; i < opt.hosts_ports.size(); ++i) {
os << opt.hosts_ports[i].host << ":" << opt.hosts_ports[i].port.value_or(opt.port)
<< (i != opt.hosts_ports.size() - 1 ? "," : "}");
}
}
else {
if (opt.host.empty()) {
os << opt.hosts_ports[0].host << ":" << opt.hosts_ports[0].port.value_or(opt.port);
}
else {
os << opt.host << ":" << opt.port;
}
}

os << " ping_before_query:" << opt.ping_before_query
<< " send_retries:" << opt.send_retries
<< " retry_timeout:" << opt.retry_timeout.count()
<< " compression_method:"
Expand Down Expand Up @@ -99,6 +120,8 @@ class Client::Impl {

const ServerInfo& GetServerInfo() const;

const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const;

private:
bool Handshake();

Expand Down Expand Up @@ -167,6 +190,7 @@ class Client::Impl {
#endif

ServerInfo server_info_;
std::optional<ClientOptions::HostPort> connected_host_port_;
};


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please modify endpoints creation to simplify the ResetConnection()

Suggested change
ClientOptions modifyClientOptions(ClientOptions opts)
{
if (!opts.host.empty())
opts.endpoints.insert(opts.endpoints.begin(), ClientOptions::Endpoint{opts.host, opts.port});
return opts;
}
Client::Impl::Impl(const ClientOptions& opts)
: options_(modifyClientOptions(opts))
{

Expand Down Expand Up @@ -296,73 +320,100 @@ void Client::Impl::Ping() {
}

void Client::Impl::ResetConnection() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠ If there was a temporary communication problem, the client could re-connect to a different server (which I believe may be unexpected by the user).

Please make sure that client tries to reconnect to the same server first (options_.send_retries times).

Also I suggest to explicitly allow users to reset connection endpoint, maybe via separate function or ResetConnection() overload/parameter.

E.g.:

  • ResetConnection() - try reconnecting to current endpoint.
  • ResetConnectionEndpoint() - try connecting to different endpoint.

Copy link

@PolyProgrammist PolyProgrammist May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in direct messages that:
ResetConnectionEndpoint() should try to connect to different endpoints one by one only one time. If it doesn't work, throw an exception. It should be called on the client start and it should be called N times. Also the user can call it in order to connect to different endpoint.

ResetConnection() should try to reconnect to the current endpoint N times. It doesn't try to connect to other endpoints. If it doesn't work, throw an exception. So it's behaviour doesn't change

connected_host_port_.reset();
for (int i = -1; i < int(options_.hosts_ports.size()); ++i) {
const ClientOptions::HostPort& host_port = i == -1 ? ClientOptions::HostPort(options_.host) : options_.hosts_ports[i];
Copy link
Contributor

@Enmk Enmk Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the list of endpoints before copying it to a options_ so you don't have to do this. (see above)

try {
std::unique_ptr<Socket> socket;

std::unique_ptr<Socket> socket;

const auto address = NetworkAddress(options_.host, std::to_string(options_.port));
const auto address = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port)));
#if defined(WITH_OPENSSL)
// TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense.
std::unique_ptr<SSLContext> ssl_context;
if (options_.ssl_options.use_ssl) {
const auto ssl_options = options_.ssl_options;
const auto ssl_params = SSLParams {
ssl_options.path_to_ca_files,
ssl_options.path_to_ca_directory,
ssl_options.use_default_ca_locations,
ssl_options.context_options,
ssl_options.min_protocol_version,
ssl_options.max_protocol_version,
ssl_options.use_sni
};

if (ssl_options.ssl_context)
ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context);
else {
ssl_context = std::make_unique<SSLContext>(ssl_params);
}

socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context);
}
else
// TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense.
std::unique_ptr<SSLContext> ssl_context;
if (options_.ssl_options.use_ssl) {
const auto ssl_options = options_.ssl_options;
const auto ssl_params = SSLParams {
ssl_options.path_to_ca_files,
ssl_options.path_to_ca_directory,
ssl_options.use_default_ca_locations,
ssl_options.context_options,
ssl_options.min_protocol_version,
ssl_options.max_protocol_version,
ssl_options.use_sni
};

if (ssl_options.ssl_context)
ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context);
else {
ssl_context = std::make_unique<SSLContext>(ssl_params);
}

socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context);
}
else
#endif
socket = std::make_unique<Socket>(address);
socket = std::make_unique<Socket>(address);

if (options_.tcp_keepalive) {
socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(),
options_.tcp_keepalive_intvl.count(),
options_.tcp_keepalive_cnt);
}
if (options_.tcp_nodelay) {
socket->SetTcpNoDelay(options_.tcp_nodelay);
}
if (options_.tcp_keepalive) {
socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(),
options_.tcp_keepalive_intvl.count(),
options_.tcp_keepalive_cnt);
}
if (options_.tcp_nodelay) {
socket->SetTcpNoDelay(options_.tcp_nodelay);
}

OutputStreams output_streams;
auto socket_output = output_streams.Add(socket->makeOutputStream());
auto output = output_streams.AddNew<BufferedOutput>(socket_output);
OutputStreams output_streams;
auto socket_output = output_streams.Add(socket->makeOutputStream());
auto output = output_streams.AddNew<BufferedOutput>(socket_output);

InputStreams input_streams;
auto socket_input = input_streams.Add(socket->makeInputStream());
auto input = input_streams.AddNew<BufferedInput>(socket_input);
InputStreams input_streams;
auto socket_input = input_streams.Add(socket->makeInputStream());
auto input = input_streams.AddNew<BufferedInput>(socket_input);

std::swap(output_streams, output_streams_);
std::swap(input_streams, input_streams_);
std::swap(socket, socket_);
output_ = output;
input_ = input;
std::swap(output_streams, output_streams_);
std::swap(input_streams, input_streams_);
std::swap(socket, socket_);
output_ = output;
input_ = input;

#if defined(WITH_OPENSSL)
std::swap(ssl_context_, ssl_context);
std::swap(ssl_context_, ssl_context);
#endif

if (!Handshake()) {
throw std::runtime_error("fail to connect to " + options_.host);
if (!Handshake()) {
throw std::runtime_error("fail to connect to " + host_port.host);
}
} catch (const std::system_error &e) {
if (i == int(options_.hosts_ports.size()) - 1) {
throw;
}
continue;
} catch (const std::runtime_error &e) {
if (i == int(options_.hosts_ports.size()) - 1) {
throw;
}
continue;
} catch (...) {
if (i == int(options_.hosts_ports.size()) - 1) {
throw;
}
continue;
}

connected_host_port_ = host_port;
return;
}
}

const ServerInfo& Client::Impl::GetServerInfo() const {
return server_info_;
}

const std::optional<ClientOptions::HostPort>& Client::Impl::GetConnectedHostPort() const {
return connected_host_port_;
}

bool Client::Impl::Handshake() {
if (!SendHello()) {
return false;
Expand Down Expand Up @@ -831,4 +882,8 @@ const ServerInfo& Client::GetServerInfo() const {
return impl_->GetServerInfo();
}

const std::optional<ClientOptions::HostPort>& Client::GetConnectedHostPort() const {
return impl_->GetConnectedHostPort();
}

}
12 changes: 12 additions & 0 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ struct ClientOptions {
return *this; \
}


/// List of hostnames with service ports
struct HostPort {
std::string host;
std::optional<unsigned int> port;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix: add #include <optional>
Or enable editing PR by maintainers.


explicit HostPort(std::string host, std::optional<unsigned int> port = std::nullopt) : host(std::move(host)), port(std::move(port)) {
}
};
Comment on lines +56 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename HostPort to Endpoint (here and everywhere else), it is a bit more clear. Also, you can significantly simplify the struct definition:

Suggested change
struct HostPort {
std::string host;
std::optional<unsigned int> port;
explicit HostPort(std::string host, std::optional<unsigned int> port = std::nullopt) : host(std::move(host)), port(std::move(port)) {
}
};
struct Endpoint {
std::string host;
std::optional<unsigned int> port = std::nullopt;
}
};

DECLARE_FIELD(hosts_ports, std::vector<HostPort>, SetHost,{});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DECLARE_FIELD(hosts_ports, std::vector<HostPort>, SetHost,{});
/** Set endpoints (host+port), only one is used.
* Client tries to connect to those endpoints one by one, on the round-robin basis:
* first default enpoint (set via SetHost() + SetPort()), then each of endpoints, from begin() to end(),
* the first one to establish connection is used for the rest of the session.
* If port part is not specified, default port (@see SetPort()) is used.
*/
DECLARE_FIELD(endpoints, std::vector<Endpoint>, SetEndpoints, {});

/// Hostname of the server.
DECLARE_FIELD(host, std::string, SetHost, std::string());
/// Service port.
Expand Down Expand Up @@ -196,6 +206,8 @@ class Client {

const ServerInfo& GetServerInfo() const;

const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify in comments when this function may return nullopt.


private:
const ClientOptions options_;

Expand Down
63 changes: 62 additions & 1 deletion tests/simple/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ static void RunTests(Client& client) {
ArrayExample(client);
CancelableExample(client);
DateExample(client);
DateTime64Example(client);
// DateTime64Example(client);
DecimalExample(client);
EnumExample(client);
ExecptionExample(client);
Expand Down Expand Up @@ -510,6 +510,67 @@ int main() {
.SetCompressionMethod(CompressionMethod::LZ4));
RunTests(client);
}

{
Copy link
Contributor

@Enmk Enmk Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests instead, the sample program is not executed in CI/CD and hence your code is not checked. I suggest you to add a separate test-cases, both positive and negative in client_ut.cpp:

Suggested change
{
TEST(ClientCaseConnect, MultipleEndpoints) {
}

ClientOptions::HostPort correct_host_port = ClientOptions::HostPort("localhost", 9000);
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("localhost", 8000), // wrong port
ClientOptions::HostPort("localhost", 7000), // wrong port
ClientOptions::HostPort("1127.91.2.1"), // wrong host
ClientOptions::HostPort("1127.91.2.2"), // wrong host
ClientOptions::HostPort("notlocalwronghost"), // wrong host
ClientOptions::HostPort("another_notlocalwronghost"), // wrong host
correct_host_port,
ClientOptions::HostPort("localhost", 9001), // wrong port
ClientOptions::HostPort("1127.911.2.2"), // wrong host
})
.SetPingBeforeQuery(true));
Comment on lines +515 to +528
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this allows simplifying client code:

Suggested change
ClientOptions::HostPort correct_host_port = ClientOptions::HostPort("localhost", 9000);
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("localhost", 8000), // wrong port
ClientOptions::HostPort("localhost", 7000), // wrong port
ClientOptions::HostPort("1127.91.2.1"), // wrong host
ClientOptions::HostPort("1127.91.2.2"), // wrong host
ClientOptions::HostPort("notlocalwronghost"), // wrong host
ClientOptions::HostPort("another_notlocalwronghost"), // wrong host
correct_host_port,
ClientOptions::HostPort("localhost", 9001), // wrong port
ClientOptions::HostPort("1127.911.2.2"), // wrong host
})
.SetPingBeforeQuery(true));
ClientOptions::Endpoint correct_endpoint{"localhost", 9000};
Client client(ClientOptions()
.SetEndpoints({
{"localhost", 8000}, // wrong port
{"localhost", 7000}, // wrong port
{"1127.91.2.1"}, // wrong host
{"1127.91.2.2"}, // wrong host
{"notlocalwronghost"}, // wrong host
{"another_notlocalwronghost"}, // wrong host
{correct_endpoint},
{"localhost", 9001}, // wrong port
{"1127.911.2.2"}, // wrong host
})
.SetPingBeforeQuery(true));

assert(client.GetConnectedHostPort() == correct_host_port);
RunTests(client);
}
{
try {
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("notlocalwronghost") // wrong host
})
.SetSendRetries(0)
.SetPingBeforeQuery(true)
);
assert(false && "exception must be thrown");
} catch (const std::exception &e) {
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl;
}
}
{
try {
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("localhost", 8000), // wrong port
})
.SetSendRetries(0)
.SetPingBeforeQuery(true)
);
assert(false && "exception must be thrown");
} catch (const std::runtime_error &e) {
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl;
}
}
{
try {
Client client(ClientOptions()
.SetHost({
ClientOptions::HostPort("1127.91.2.1"), // wrong host
})
.SetSendRetries(0)
.SetPingBeforeQuery(true)
);
assert(false && "exception must be thrown");
} catch (const std::runtime_error &e) {
std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl;
}
}
} catch (const std::exception& e) {
std::cerr << "exception : " << e.what() << std::endl;
}
Expand Down