diff --git a/README.md b/README.md index d7cb62c05..3ba351c0c 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,10 @@ if (rc != 0) { } ``` +Note that some methods can return `rc < 0` without any error. For example, `wait` +can return `-1` when timeout is exceeded. In order to differentiate between such +situations, one can use method `Connection::hasError()`. + To reset connection after errors (clean up error message and connection status), one can use `Connection::reset()`. @@ -182,6 +186,51 @@ request is ready, `wait()` terminates. It also provides negative return code in case of system related fails (e.g. broken or time outed connection). If `wait()` returns 0, then response is received and expected to be parsed. +### Waiting for Responses + +The connector provides several wait methods. All methods accept an integer `timeout` +argument with the following semantics: +* If `timeout > 0`, the connector blocks for `timeout` **milliseconds** or until + all required responses are received. Time is measured against the monotonic clock. +* If `timeout == 0`, the connector decodes all available responses and returns + immediately. +* If `timeout == -1`, the connector blocks until required responses are received + (basically, no timeout). + +All the waiting functions (except for `waitAny`, its description will be later) +return `0` on success and `-1` in the case of any internal error (for example, +when the underlying connection is closed) or when timeout is exceeded. +See [this section](#error-handling) for error handling details. + +Method `wait` waits for one request: +```c++ +int rc = client.wait(conn, ping, WAIT_TIMEOUT); +``` +An optional argument allows to obtain response right away in the case of success: +```c++ +Response response; +int rc = client.wait(conn, ping, WAIT_TIMEOUT, &response); +``` + +Method `waitAll` waits for completion of all the given requests of a connection: +```c++ +std::vector futures{ping1, ping2, call, replace}; +int rc = client.waitAll(conn, futures, WAIT_TIMEOUT); +``` + +Method `waitCount` waits until the given connection will complete any `future_count` requests: +```c++ +int rc = client.waitCount(conn, future_count, WAIT_TIMEOUT); +``` + +Method `waitAny` is different - it allows to poll all the connections simultaneously. +In the case of success, the function returns a connection that received a response. +In the case of internal error or when timeout is exceeded, returns `std::nullopt`. +See [this section](#error-handling) for error handling details. +```c++ +std::optional> conn_ready = client.waitAny(WAIT_TIMEOUT); +``` + ### Receiving Responses To get the response when it is ready, we can use `Connection::getResponse()`. diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 87dbd7f49..b0fd96c8b 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -75,13 +75,13 @@ class Connector const std::string& addr, unsigned port); int wait(Connection &conn, rid_t future, - int timeout = 0, Response *result = nullptr); + int timeout = -1, Response *result = nullptr); int waitAll(Connection &conn, - const std::vector &futures, int timeout = 0); + const std::vector &futures, int timeout = -1); int waitCount(Connection &conn, - size_t feature_count, int timeout = 0); + size_t feature_count, int timeout = -1); + std::optional> waitAny(int timeout = -1); ////////////////////////////Service interfaces////////////////////////// - std::optional> waitAny(int timeout = 0); void readyToDecode(const Connection &conn); void readyToSend(const Connection &conn); void finishSend(const Connection &conn); @@ -185,9 +185,8 @@ Connector::wait(Connection &conn, timer.start(); if (connectionDecodeResponses(conn, result) != 0) return -1; - while (!conn.hasError() && !conn.futureIsReady(future) && - !timer.isExpired()) { - if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) { + while (!conn.hasError() && !conn.futureIsReady(future)) { + if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; @@ -203,13 +202,15 @@ Connector::wait(Connection &conn, if (!hasDataToDecode(conn)) m_ReadyToDecode.erase(conn); } + if (timer.isExpired()) + break; } if (conn.hasError()) { LOG_ERROR("Connection got an error: ", conn.getError().msg); return -1; } if (! conn.futureIsReady(future)) { - LOG_ERROR("Connection has been timed out: future ", future, + LOG_DEBUG("Connection has been timed out: future ", future, " is not ready"); return -1; } @@ -226,8 +227,8 @@ Connector::waitAll(Connection &conn, Timer timer{timeout}; timer.start(); size_t last_not_ready = 0; - while (!conn.hasError() && !timer.isExpired()) { - if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) { + while (!conn.hasError()) { + if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; @@ -249,12 +250,14 @@ Connector::waitAll(Connection &conn, } if (finish) return 0; + if (timer.isExpired()) + break; } if (conn.hasError()) { LOG_ERROR("Connection got an error: ", conn.getError().msg); return -1; } - LOG_ERROR("Connection has been timed out: not all futures are ready"); + LOG_DEBUG("Connection has been timed out: not all futures are ready"); return -1; } @@ -264,10 +267,13 @@ Connector::waitAny(int timeout) { Timer timer{timeout}; timer.start(); - while (m_ReadyToDecode.empty() && !timer.isExpired()) - m_NetProvider.wait(timeout - timer.elapsed()); + while (m_ReadyToDecode.empty()) { + m_NetProvider.wait(timer.timeLeft()); + if (timer.isExpired()) + break; + } if (m_ReadyToDecode.empty()) { - LOG_ERROR("wait() has been timed out! No responses are received"); + LOG_DEBUG("wait() has been timed out! No responses are received"); return std::nullopt; } Connection conn = *m_ReadyToDecode.begin(); @@ -287,8 +293,8 @@ Connector::waitCount(Connection &conn, Timer timer{timeout}; timer.start(); size_t ready_futures = conn.getFutureCount(); - while (!conn.hasError() && !timer.isExpired()) { - if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) { + while (!conn.hasError()) { + if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; @@ -302,13 +308,15 @@ Connector::waitCount(Connection &conn, } if ((conn.getFutureCount() - ready_futures) >= future_count) return 0; + if (timer.isExpired()) + break; } if (conn.hasError()) { LOG_ERROR("Connection got an error: ", conn.getError().msg); return -1; } - LOG_ERROR("Connection has been timed out: only ", - conn.getFutureCount() - ready_futures, " are ready"); + LOG_DEBUG("Connection has been timed out: only ", + conn.getFutureCount() - ready_futures, " are ready"); return -1; } diff --git a/src/Client/EpollNetProvider.hpp b/src/Client/EpollNetProvider.hpp index a363096b5..ccd6aa826 100644 --- a/src/Client/EpollNetProvider.hpp +++ b/src/Client/EpollNetProvider.hpp @@ -258,8 +258,8 @@ template int EpollNetProvider::wait(int timeout) { - assert(timeout >= 0); - if (timeout == 0) + assert(timeout >= -1); + if (timeout == -1) timeout = TIMEOUT_INFINITY; LOG_DEBUG("Network engine wait for ", timeout, " milliseconds"); /* Send pending requests. */ diff --git a/src/Client/LibevNetProvider.hpp b/src/Client/LibevNetProvider.hpp index fa4a81280..431d1c126 100644 --- a/src/Client/LibevNetProvider.hpp +++ b/src/Client/LibevNetProvider.hpp @@ -354,7 +354,7 @@ void LibevNetProvider::timeout_cb(EV_P_ ev_timer *w, int /* revents */) { (void) w; - LOG_ERROR("Libev timed out!"); + LOG_DEBUG("Libev timed out!"); /* Stop external loop */ ev_break(EV_A_ EVBREAK_ONE); } @@ -363,7 +363,7 @@ template int LibevNetProvider::wait(int timeout) { - assert(timeout >= 0); + assert(timeout >= -1); if (timeout > 0) { ev_timer_init(&m_TimeoutWatcher, &timeout_cb, timeout / MILLISECONDS, 0 /* repeat */); ev_timer_start(m_Loop, &m_TimeoutWatcher); @@ -381,6 +381,8 @@ LibevNetProvider::wait(int timeout) } } - ev_run(m_Loop, EVRUN_ONCE); + /* Work in non-blocking mode when the timeout is zero. */ + int flags = timeout == 0 ? EVRUN_NOWAIT : EVRUN_ONCE; + ev_run(m_Loop, flags); return 0; } diff --git a/src/Utils/Timer.hpp b/src/Utils/Timer.hpp index 6396b65c5..53e0de85b 100644 --- a/src/Utils/Timer.hpp +++ b/src/Utils/Timer.hpp @@ -29,6 +29,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include #include class Timer { @@ -40,7 +41,7 @@ class Timer { } bool isExpired() const { - if (m_Timeout == std::chrono::milliseconds{0}) + if (m_Timeout == std::chrono::milliseconds{-1}) return false; std::chrono::time_point end = std::chrono::steady_clock::now(); @@ -48,13 +49,22 @@ class Timer { std::chrono::duration_cast(end - m_Start); return elapsed >= m_Timeout; } - int elapsed() const + /** + * The function to obtain amount of time left. Returns: + * 1. `-1` if the initial timeout was `-1`. + * 2. `0` if the timer has expired. + * 3. Otherwise, amount of milliseconds left is returned. + * NB: the function should not be used for expiration check - use `isExpired` instead. + */ + int timeLeft() const { - if (m_Timeout == std::chrono::milliseconds{0}) - return 0; + if (m_Timeout == std::chrono::milliseconds{-1}) + return -1; std::chrono::time_point end = std::chrono::steady_clock::now(); - return std::chrono::duration_cast(end - m_Start).count(); + int timeLeft = m_Timeout.count() - + std::chrono::duration_cast(end - m_Start).count(); + return std::max(0, timeLeft); } private: std::chrono::milliseconds m_Timeout; diff --git a/test/ClientTest.cpp b/test/ClientTest.cpp index 16adbafa0..a3fde6599 100644 --- a/test/ClientTest.cpp +++ b/test/ClientTest.cpp @@ -264,9 +264,11 @@ auto_close(Connector &client) /** Several connection, separate/sequence pings, no errors */ template void -many_conn_ping(Connector &client) +many_conn_ping(void) { TEST_INIT(0); + /* FIXME(gh-123,gh-124): use own client not to leave hanging connection. */ + Connector client; Connection conn1(client); Connection conn2(client); Connection conn3(client); @@ -1033,15 +1035,16 @@ test_auth(Connector &client) } /** Single connection, write to closed connection. */ -template void -test_sigpipe(Connector &client) +test_sigpipe(void) { TEST_INIT(0); int rc = ::launchDummyServer(localhost, dummy_server_port); fail_unless(rc == 0); + /* FIXME(gh-122): use own client not to leave hanging dead connection. */ + Connector client; Connection conn(client); rc = ::test_connect(client, conn, localhost, dummy_server_port); fail_unless(rc == 0); @@ -1063,15 +1066,16 @@ test_sigpipe(Connector &client) } /** Single connection, wait response from closed connection. */ -template void -test_dead_connection_wait(Connector &client) +test_dead_connection_wait(void) { TEST_INIT(0); int rc = ::launchDummyServer(localhost, dummy_server_port); fail_unless(rc == 0); + /* FIXME(gh-122): use own client not to leave hanging dead connection. */ + Connector client; Connection conn(client); rc = ::test_connect(client, conn, localhost, dummy_server_port); fail_unless(rc == 0); @@ -1140,6 +1144,104 @@ response_decoding(Connector &client) client.close(conn); } +/** Checks all available `wait` methods of connector. */ +template +void +test_wait(Connector &client) +{ + TEST_INIT(0); + static constexpr double SLEEP_TIME = 0.1; + + Connection conn(client); + int rc = test_connect(client, conn, localhost, port); + fail_unless(rc == 0); + + TEST_CASE("wait(0) and wait(-1)"); + rid_t f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!conn.futureIsReady(f)); + client.wait(conn, f, 0); + fail_unless(!conn.futureIsReady(f)); + client.wait(conn, f, -1); + fail_unless(conn.futureIsReady(f)); + std::optional> response = conn.getResponse(f); + fail_unless(response.has_value()); + + TEST_CASE("wait(0) polls connections"); + f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!conn.futureIsReady(f)); + while (!conn.futureIsReady(f)) { + client.wait(conn, f, 0); + usleep(10 * 1000); /* 10ms */ + } + fail_unless(conn.futureIsReady(f)); + response = conn.getResponse(f); + fail_unless(response.has_value()); + + TEST_CASE("waitAny(0) and waitAny(-1)"); + f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!client.waitAny(0).has_value()); + fail_unless(client.waitAny(-1).has_value()); + response = conn.getResponse(f); + fail_unless(response.has_value()); + + TEST_CASE("waitAny(0) polls connections"); + f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!conn.futureIsReady(f)); + while (!conn.futureIsReady(f)) { + client.waitAny(0); + usleep(10 * 1000); /* 10ms */ + } + fail_unless(conn.futureIsReady(f)); + response = conn.getResponse(f); + fail_unless(response.has_value()); + + TEST_CASE("waitAll(0) and waitAll(-1)"); + std::vector fs; + fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME))); + fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME))); + fail_unless(client.waitAll(conn, fs, 0) == -1); + fail_unless(client.waitAll(conn, fs, -1) == 0); + response = conn.getResponse(fs[0]); + fail_unless(response.has_value()); + response = conn.getResponse(fs[1]); + fail_unless(response.has_value()); + + TEST_CASE("waitAll(0) polls connections"); + f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!conn.futureIsReady(f)); + while (!conn.futureIsReady(f)) { + client.waitAll(conn, std::vector{f}, 0); + usleep(10 * 1000); /* 10ms */ + } + fail_unless(conn.futureIsReady(f)); + response = conn.getResponse(f); + fail_unless(response.has_value()); + + TEST_CASE("waitCount(0) and waitCount(-1)"); + fs.clear(); + fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME))); + fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME))); + fail_unless(client.waitCount(conn, 2, 0) == -1); + fail_unless(client.waitCount(conn, 2, -1) == 0); + response = conn.getResponse(fs[0]); + fail_unless(response.has_value()); + response = conn.getResponse(fs[1]); + fail_unless(response.has_value()); + + TEST_CASE("waitCount(0) polls connections"); + f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)); + fail_unless(!conn.futureIsReady(f)); + while (!conn.futureIsReady(f)) { + client.waitCount(conn, 1, 0); + usleep(10 * 1000); /* 10ms */ + } + fail_unless(conn.futureIsReady(f)); + response = conn.getResponse(f); + fail_unless(response.has_value()); + + client.close(conn); +} + int main() { #ifdef TNTCXX_ENABLE_SSL @@ -1167,7 +1269,7 @@ int main() trivial(client); single_conn_ping(client); auto_close(client); - many_conn_ping(client); + many_conn_ping(); single_conn_error(client); single_conn_replace(client); single_conn_insert(client); @@ -1185,9 +1287,10 @@ int main() * an a lot more complex state machine. */ #ifndef TNTCXX_ENABLE_SSL - ::test_sigpipe(client); + ::test_sigpipe(); #endif - ::test_dead_connection_wait(client); + ::test_dead_connection_wait(); response_decoding(client); + test_wait(client); return 0; } diff --git a/test/cfg.lua b/test/cfg.lua index 10a8a8800..08813a9a2 100644 --- a/test/cfg.lua +++ b/test/cfg.lua @@ -34,6 +34,12 @@ function remote_echo(...) return {...} end +function remote_sleep(timeout) + local fiber = require('fiber') + fiber.sleep(timeout) + return nil +end + function get_rps() return box.stat.net().REQUESTS.rps end diff --git a/test/cfg_ssl.lua b/test/cfg_ssl.lua index 0718cee72..1bf94169c 100644 --- a/test/cfg_ssl.lua +++ b/test/cfg_ssl.lua @@ -32,6 +32,12 @@ function remote_echo(...) return {...} end +function remote_sleep(timeout) + local fiber = require('fiber') + fiber.sleep(timeout) + return nil +end + function get_rps() return box.stat.net().REQUESTS.rps end