Skip to content

feat: supports polling to select healthy nodes #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ local decode_base64 = ngx.decode_base64
local semaphore = require("ngx.semaphore")
local health_check = require("resty.etcd.health_check")

math.randomseed(now() * 1000 + ngx.worker.pid())

local INIT_COUNT_RESIZE = 2e8

local _M = {}

local mt = { __index = _M }
Expand All @@ -44,11 +48,25 @@ local unmodifiable_headers = {
local refresh_jwt_token


local function choose_endpoint(self)
local function ring_balancer(self)
local endpoints = self.endpoints
local endpoints_len = #endpoints

self.init_count = self.init_count + 1
local pos = self.init_count % endpoints_len + 1
if self.init_count >= INIT_COUNT_RESIZE then
self.init_count = 0
end

return endpoints[pos]
end


for _, endpoint in ipairs(endpoints) do
local function choose_endpoint(self)
for _, _ in ipairs(self.endpoints) do
local endpoint = ring_balancer(self)
if health_check.get_target_status(endpoint.http_host) then
utils.log_info("choose endpoint: ", endpoint.http_host)
return endpoint
end
end
Expand Down Expand Up @@ -256,6 +274,7 @@ function _M.new(opts)
local extra_headers = opts.extra_headers
local sni = opts.sni
local unix_socket_proxy = opts.unix_socket_proxy
local init_count = opts.init_count

if not typeof.uint(timeout) then
return nil, 'opts.timeout must be unsigned integer'
Expand Down Expand Up @@ -289,6 +308,10 @@ function _M.new(opts)
return nil, 'opts.unix_socket_proxy must be string or ignore'
end

if not typeof.number(init_count) then
init_count = random(100)
end

local endpoints = {}
local http_hosts
if type(http_host) == 'string' then -- signle node
Expand Down Expand Up @@ -357,6 +380,7 @@ function _M.new(opts)
extra_headers = extra_headers,
sni = sni,
unix_socket_proxy = unix_socket_proxy,
init_count = init_count,
},
mt)
end
Expand Down Expand Up @@ -888,12 +912,7 @@ local function watch(self, key, attr)
need_cancel = need_cancel,
}

local endpoint, err = choose_endpoint(self)
if not endpoint then
return nil, err
end

local callback_fun, http_cli
local callback_fun, http_cli, err
callback_fun, err, http_cli = request_chunk(self, 'POST', '/watch',
opts, attr.timeout or self.timeout)
if not callback_fun then
Expand Down
2 changes: 1 addition & 1 deletion t/Procfile-single-enable-mtls
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
etcd0: etcd
etcd1: etcd --name infra1 --listen-client-urls https://127.0.0.1:12379 --advertise-client-urls https://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
etcd2: etcd --name infra2 --listen-client-urls https://127.0.0.1:22379 --advertise-client-urls https://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.cr
etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
# A learner node can be started using Procfile.learner
96 changes: 82 additions & 14 deletions t/v3/health_check.t
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ failed to get ngx.shared dict: error_shm_name
},
user = 'root',
password = 'abc123',
init_count = -1,
})

local res, err = etcd:set("/trigger_unhealthy", { a='abc'})
Expand Down Expand Up @@ -187,6 +188,7 @@ http://127.0.0.1:42379: connection refused
"http://127.0.0.1:22379",
"http://127.0.0.1:32379",
},
init_count = -1,
})

local body_chunk_fun, err = etcd:watch("/trigger_unhealthy")
Expand Down Expand Up @@ -224,10 +226,16 @@ http://127.0.0.1:42379: connection refused
},
user = 'root',
password = 'abc123',
init_count = -1,
})

etcd:set("/fault_count", { a='abc'})
etcd:set("/fault_count", { a='abc'})
-- make sure to select http://127.0.0.1:42379 twice
for i = 1, 4 do
etcd:set("/fault_count", { a='abc'})
end

-- here have actually been 5 reads and writes to etcd, including one to /auth/authenticate

local fails, err = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379")
if err then
ngx.say(err)
Expand Down Expand Up @@ -264,6 +272,7 @@ GET /t
},
user = 'root',
password = 'abc123',
init_count = -1,
})

etcd:set("/get_target_status", { a='abc'})
Expand Down Expand Up @@ -301,19 +310,21 @@ false
},
user = 'root',
password = 'abc123',
init_count = -1,
})

local res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- trigger http://127.0.0.1:42379 to unhealthy
local res, err

res, err = etcd:set("/fail_timeout", "http://127.0.0.1:22379") -- choose http://127.0.0.1:22379 to set value
res, err = etcd:get("/fail_timeout")
assert(res.body.kvs[1].value == "http://127.0.0.1:22379")

ngx.sleep(2)
-- make sure to select http://127.0.0.1:42379 once and trigger it to unhealthy
for i = 1, 3 do
res, err = etcd:set("/fail_timeout", "value")
end

res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- choose http://127.0.0.1:42379 to set value
res, err = etcd:get("/fail_timeout")
assert(res.body.kvs[1].value == "http://127.0.0.1:22379")
-- ensure that unhealthy http://127.0.0.1:42379 are no longer selected
for i = 1, 3 do
res, err = etcd:get("/fail_timeout")
assert(res.body.kvs[1].value == "value")
end

ngx.say("done")
}
Expand All @@ -323,6 +334,8 @@ GET /t
--- timeout: 5
--- response_body
done
--- error_log
update endpoint: http://127.0.0.1:42379 to unhealthy
--- no_error_log
[error]

Expand Down Expand Up @@ -387,6 +400,7 @@ has no healthy etcd endpoint available
},
user = 'root',
password = 'abc123',
init_count = -1,
})

local etcd2, err = require "resty.etcd" .new({
Expand All @@ -398,6 +412,7 @@ has no healthy etcd endpoint available
},
user = 'root',
password = 'abc123',
init_count = -1,
})

assert(tostring(etcd1) ~= tostring(etcd2))
Expand Down Expand Up @@ -493,12 +508,21 @@ qr/update endpoint: http:\/\/localhost:1984 to unhealthy/
},
user = 'root',
password = 'abc123',
init_count = -1,
})

local res, err = etcd:set("/trigger_unhealthy", "abc")
local res, err
for i = 1, 3 do
res, err = etcd:set("/trigger_unhealthy", "abc")
end
check_res(res, err)
local res, err = etcd:get("/trigger_unhealthy")
check_res(res, err, "abc")

-- There are 5 times read and write operations to etcd have occurred here
-- 3 set, 1 get, 1 auth
-- actual 8 times choose endpoint, retry every time 42379 is selected
-- 42379 marked as unhealthy after 3 seleced
}
}
--- request
Expand Down Expand Up @@ -531,13 +555,16 @@ checked val as expect: abc
},
user = 'root',
password = 'abc123',
init_count = -1,
})

local body_chunk_fun, err = etcd:watch("/trigger_unhealthy", {timeout = 0.5})
check_res(body_chunk_fun, err)

ngx.timer.at(0.1, function ()
etcd:set("/trigger_unhealthy", "abc")
for i = 1, 3 do
etcd:set("/trigger_unhealthy", "abc")
end
end)

local idx = 0
Expand All @@ -563,6 +590,8 @@ qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/
--- response_body_like eval
qr/1:.*"created":true.*
2:.*"value":"abc".*
3:.*"value":"abc".*
4:.*"value":"abc".*
timeout/
--- timeout: 5

Expand Down Expand Up @@ -681,6 +710,7 @@ has no healthy etcd endpoint available
"http://127.0.0.1:22379",
"http://127.0.0.1:32379",
},
init_count = -1,
})

local res, err = etcd:set("/test/etcd/healthy", "hello")
Expand Down Expand Up @@ -749,6 +779,7 @@ qr/update endpoint: http:\/\/127.0.0.1:12379 to unhealthy/
"http://127.0.0.1:22379",
"http://127.0.0.1:32379",
},
init_count = -1,
})

local res
Expand Down Expand Up @@ -827,6 +858,7 @@ healthy check use ngx.shared dict
"http://127.0.0.1:22379",
"http://127.0.0.1:32379",
},
init_count = -1,
})

local res
Expand All @@ -844,7 +876,7 @@ healthy check use ngx.shared dict
GET /t
--- response_body
http://127.0.0.1:42379: connection refused
http://127.0.0.1:42379: connection refused
http://127.0.0.1:22379: OK
--- no_error_log eval
qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/

Expand Down Expand Up @@ -889,3 +921,39 @@ GET /t
--- response_body
passed
passed



=== TEST 22: ring balancer with specific init_count
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local health_check = require("resty.etcd.health_check")
health_check.disable()
local etcd, err = require "resty.etcd" .new({
protocol = "v3",
http_host = {
"http://127.0.0.1:12379",
"http://127.0.0.1:22379",
"http://127.0.0.1:32379",
},
init_count = 101,
})

local res
for i = 1, 3 do
res, _ = etcd:set("/ring_balancer", "abc")
end

ngx.say(etcd.init_count)
}
}
--- request
GET /t
--- response_body
104
--- error_log
choose_endpoint(): choose endpoint: http://127.0.0.1:12379
choose_endpoint(): choose endpoint: http://127.0.0.1:22379
choose_endpoint(): choose endpoint: http://127.0.0.1:32379