diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index ee94a527..c00225bc 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -27,6 +27,10 @@ local decode_base64 = ngx.decode_base64 local semaphore = require("ngx.semaphore") local health_check = require("resty.etcd.health_check") +math.randomseed(now() * 1000 + ngx.worker.pid()) + +local INIT_COUNT_RESIZE = 2e8 + local _M = {} local mt = { __index = _M } @@ -44,11 +48,25 @@ local unmodifiable_headers = { local refresh_jwt_token -local function choose_endpoint(self) +local function ring_balancer(self) local endpoints = self.endpoints + local endpoints_len = #endpoints + + self.init_count = self.init_count + 1 + local pos = self.init_count % endpoints_len + 1 + if self.init_count >= INIT_COUNT_RESIZE then + self.init_count = 0 + end + + return endpoints[pos] +end + - for _, endpoint in ipairs(endpoints) do +local function choose_endpoint(self) + for _, _ in ipairs(self.endpoints) do + local endpoint = ring_balancer(self) if health_check.get_target_status(endpoint.http_host) then + utils.log_info("choose endpoint: ", endpoint.http_host) return endpoint end end @@ -256,6 +274,7 @@ function _M.new(opts) local extra_headers = opts.extra_headers local sni = opts.sni local unix_socket_proxy = opts.unix_socket_proxy + local init_count = opts.init_count if not typeof.uint(timeout) then return nil, 'opts.timeout must be unsigned integer' @@ -289,6 +308,10 @@ function _M.new(opts) return nil, 'opts.unix_socket_proxy must be string or ignore' end + if not typeof.number(init_count) then + init_count = random(100) + end + local endpoints = {} local http_hosts if type(http_host) == 'string' then -- signle node @@ -357,6 +380,7 @@ function _M.new(opts) extra_headers = extra_headers, sni = sni, unix_socket_proxy = unix_socket_proxy, + init_count = init_count, }, mt) end @@ -888,12 +912,7 @@ local function watch(self, key, attr) need_cancel = need_cancel, } - local endpoint, err = choose_endpoint(self) - if not endpoint then - return nil, err - end - - local callback_fun, http_cli + local callback_fun, http_cli, err callback_fun, err, http_cli = request_chunk(self, 'POST', '/watch', opts, attr.timeout or self.timeout) if not callback_fun then diff --git a/t/Procfile-single-enable-mtls b/t/Procfile-single-enable-mtls index 0263a811..1afcf140 100644 --- a/t/Procfile-single-enable-mtls +++ b/t/Procfile-single-enable-mtls @@ -2,5 +2,5 @@ etcd0: etcd etcd1: etcd --name infra1 --listen-client-urls https://127.0.0.1:12379 --advertise-client-urls https://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt etcd2: etcd --name infra2 --listen-client-urls https://127.0.0.1:22379 --advertise-client-urls https://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt -etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.cr +etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt # A learner node can be started using Procfile.learner diff --git a/t/v3/health_check.t b/t/v3/health_check.t index b845ff3c..b16ca684 100644 --- a/t/v3/health_check.t +++ b/t/v3/health_check.t @@ -154,6 +154,7 @@ failed to get ngx.shared dict: error_shm_name }, user = 'root', password = 'abc123', + init_count = -1, }) local res, err = etcd:set("/trigger_unhealthy", { a='abc'}) @@ -187,6 +188,7 @@ http://127.0.0.1:42379: connection refused "http://127.0.0.1:22379", "http://127.0.0.1:32379", }, + init_count = -1, }) local body_chunk_fun, err = etcd:watch("/trigger_unhealthy") @@ -224,10 +226,16 @@ http://127.0.0.1:42379: connection refused }, user = 'root', password = 'abc123', + init_count = -1, }) - etcd:set("/fault_count", { a='abc'}) - etcd:set("/fault_count", { a='abc'}) + -- make sure to select http://127.0.0.1:42379 twice + for i = 1, 4 do + etcd:set("/fault_count", { a='abc'}) + end + + -- here have actually been 5 reads and writes to etcd, including one to /auth/authenticate + local fails, err = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") if err then ngx.say(err) @@ -264,6 +272,7 @@ GET /t }, user = 'root', password = 'abc123', + init_count = -1, }) etcd:set("/get_target_status", { a='abc'}) @@ -301,19 +310,21 @@ false }, user = 'root', password = 'abc123', + init_count = -1, }) - local res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- trigger http://127.0.0.1:42379 to unhealthy + local res, err - res, err = etcd:set("/fail_timeout", "http://127.0.0.1:22379") -- choose http://127.0.0.1:22379 to set value - res, err = etcd:get("/fail_timeout") - assert(res.body.kvs[1].value == "http://127.0.0.1:22379") - - ngx.sleep(2) + -- make sure to select http://127.0.0.1:42379 once and trigger it to unhealthy + for i = 1, 3 do + res, err = etcd:set("/fail_timeout", "value") + end - res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- choose http://127.0.0.1:42379 to set value - res, err = etcd:get("/fail_timeout") - assert(res.body.kvs[1].value == "http://127.0.0.1:22379") + -- ensure that unhealthy http://127.0.0.1:42379 are no longer selected + for i = 1, 3 do + res, err = etcd:get("/fail_timeout") + assert(res.body.kvs[1].value == "value") + end ngx.say("done") } @@ -323,6 +334,8 @@ GET /t --- timeout: 5 --- response_body done +--- error_log +update endpoint: http://127.0.0.1:42379 to unhealthy --- no_error_log [error] @@ -387,6 +400,7 @@ has no healthy etcd endpoint available }, user = 'root', password = 'abc123', + init_count = -1, }) local etcd2, err = require "resty.etcd" .new({ @@ -398,6 +412,7 @@ has no healthy etcd endpoint available }, user = 'root', password = 'abc123', + init_count = -1, }) assert(tostring(etcd1) ~= tostring(etcd2)) @@ -493,12 +508,21 @@ qr/update endpoint: http:\/\/localhost:1984 to unhealthy/ }, user = 'root', password = 'abc123', + init_count = -1, }) - local res, err = etcd:set("/trigger_unhealthy", "abc") + local res, err + for i = 1, 3 do + res, err = etcd:set("/trigger_unhealthy", "abc") + end check_res(res, err) local res, err = etcd:get("/trigger_unhealthy") check_res(res, err, "abc") + + -- There are 5 times read and write operations to etcd have occurred here + -- 3 set, 1 get, 1 auth + -- actual 8 times choose endpoint, retry every time 42379 is selected + -- 42379 marked as unhealthy after 3 seleced } } --- request @@ -531,13 +555,16 @@ checked val as expect: abc }, user = 'root', password = 'abc123', + init_count = -1, }) local body_chunk_fun, err = etcd:watch("/trigger_unhealthy", {timeout = 0.5}) check_res(body_chunk_fun, err) ngx.timer.at(0.1, function () - etcd:set("/trigger_unhealthy", "abc") + for i = 1, 3 do + etcd:set("/trigger_unhealthy", "abc") + end end) local idx = 0 @@ -563,6 +590,8 @@ qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/ --- response_body_like eval qr/1:.*"created":true.* 2:.*"value":"abc".* +3:.*"value":"abc".* +4:.*"value":"abc".* timeout/ --- timeout: 5 @@ -681,6 +710,7 @@ has no healthy etcd endpoint available "http://127.0.0.1:22379", "http://127.0.0.1:32379", }, + init_count = -1, }) local res, err = etcd:set("/test/etcd/healthy", "hello") @@ -749,6 +779,7 @@ qr/update endpoint: http:\/\/127.0.0.1:12379 to unhealthy/ "http://127.0.0.1:22379", "http://127.0.0.1:32379", }, + init_count = -1, }) local res @@ -827,6 +858,7 @@ healthy check use ngx.shared dict "http://127.0.0.1:22379", "http://127.0.0.1:32379", }, + init_count = -1, }) local res @@ -844,7 +876,7 @@ healthy check use ngx.shared dict GET /t --- response_body http://127.0.0.1:42379: connection refused -http://127.0.0.1:42379: connection refused +http://127.0.0.1:22379: OK --- no_error_log eval qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/ @@ -889,3 +921,39 @@ GET /t --- response_body passed passed + + + +=== TEST 22: ring balancer with specific init_count +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check = require("resty.etcd.health_check") + health_check.disable() + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + init_count = 101, + }) + + local res + for i = 1, 3 do + res, _ = etcd:set("/ring_balancer", "abc") + end + + ngx.say(etcd.init_count) + } + } +--- request +GET /t +--- response_body +104 +--- error_log +choose_endpoint(): choose endpoint: http://127.0.0.1:12379 +choose_endpoint(): choose endpoint: http://127.0.0.1:22379 +choose_endpoint(): choose endpoint: http://127.0.0.1:32379