Skip to content

Commit 1d44505

Browse files
authored
feat: supports polling to select healthy nodes (#170)
1 parent 8009122 commit 1d44505

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

lib/resty/etcd/v3.lua

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ local decode_base64 = ngx.decode_base64
2727
local semaphore = require("ngx.semaphore")
2828
local health_check = require("resty.etcd.health_check")
2929

30+
math.randomseed(now() * 1000 + ngx.worker.pid())
31+
32+
local INIT_COUNT_RESIZE = 2e8
33+
3034
local _M = {}
3135

3236
local mt = { __index = _M }
@@ -44,11 +48,25 @@ local unmodifiable_headers = {
4448
local refresh_jwt_token
4549

4650

47-
local function choose_endpoint(self)
51+
local function ring_balancer(self)
4852
local endpoints = self.endpoints
53+
local endpoints_len = #endpoints
54+
55+
self.init_count = self.init_count + 1
56+
local pos = self.init_count % endpoints_len + 1
57+
if self.init_count >= INIT_COUNT_RESIZE then
58+
self.init_count = 0
59+
end
60+
61+
return endpoints[pos]
62+
end
63+
4964

50-
for _, endpoint in ipairs(endpoints) do
65+
local function choose_endpoint(self)
66+
for _, _ in ipairs(self.endpoints) do
67+
local endpoint = ring_balancer(self)
5168
if health_check.get_target_status(endpoint.http_host) then
69+
utils.log_info("choose endpoint: ", endpoint.http_host)
5270
return endpoint
5371
end
5472
end
@@ -256,6 +274,7 @@ function _M.new(opts)
256274
local extra_headers = opts.extra_headers
257275
local sni = opts.sni
258276
local unix_socket_proxy = opts.unix_socket_proxy
277+
local init_count = opts.init_count
259278

260279
if not typeof.uint(timeout) then
261280
return nil, 'opts.timeout must be unsigned integer'
@@ -289,6 +308,10 @@ function _M.new(opts)
289308
return nil, 'opts.unix_socket_proxy must be string or ignore'
290309
end
291310

311+
if not typeof.number(init_count) then
312+
init_count = random(100)
313+
end
314+
292315
local endpoints = {}
293316
local http_hosts
294317
if type(http_host) == 'string' then -- signle node
@@ -357,6 +380,7 @@ function _M.new(opts)
357380
extra_headers = extra_headers,
358381
sni = sni,
359382
unix_socket_proxy = unix_socket_proxy,
383+
init_count = init_count,
360384
},
361385
mt)
362386
end
@@ -888,12 +912,7 @@ local function watch(self, key, attr)
888912
need_cancel = need_cancel,
889913
}
890914

891-
local endpoint, err = choose_endpoint(self)
892-
if not endpoint then
893-
return nil, err
894-
end
895-
896-
local callback_fun, http_cli
915+
local callback_fun, http_cli, err
897916
callback_fun, err, http_cli = request_chunk(self, 'POST', '/watch',
898917
opts, attr.timeout or self.timeout)
899918
if not callback_fun then

t/Procfile-single-enable-mtls

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
etcd0: etcd
33
etcd1: etcd --name infra1 --listen-client-urls https://127.0.0.1:12379 --advertise-client-urls https://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
44
etcd2: etcd --name infra2 --listen-client-urls https://127.0.0.1:22379 --advertise-client-urls https://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
5-
etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.cr
5+
etcd3: etcd --name infra3 --listen-client-urls https://127.0.0.1:32379 --advertise-client-urls https://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --cert-file ./t/certs/mtls_server.crt --key-file ./t/certs/mtls_server.key --client-cert-auth --trusted-ca-file ./t/certs/mtls_ca.crt
66
# A learner node can be started using Procfile.learner

t/v3/health_check.t

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ failed to get ngx.shared dict: error_shm_name
154154
},
155155
user = 'root',
156156
password = 'abc123',
157+
init_count = -1,
157158
})
158159
159160
local res, err = etcd:set("/trigger_unhealthy", { a='abc'})
@@ -187,6 +188,7 @@ http://127.0.0.1:42379: connection refused
187188
"http://127.0.0.1:22379",
188189
"http://127.0.0.1:32379",
189190
},
191+
init_count = -1,
190192
})
191193
192194
local body_chunk_fun, err = etcd:watch("/trigger_unhealthy")
@@ -224,10 +226,16 @@ http://127.0.0.1:42379: connection refused
224226
},
225227
user = 'root',
226228
password = 'abc123',
229+
init_count = -1,
227230
})
228231
229-
etcd:set("/fault_count", { a='abc'})
230-
etcd:set("/fault_count", { a='abc'})
232+
-- make sure to select http://127.0.0.1:42379 twice
233+
for i = 1, 4 do
234+
etcd:set("/fault_count", { a='abc'})
235+
end
236+
237+
-- here have actually been 5 reads and writes to etcd, including one to /auth/authenticate
238+
231239
local fails, err = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379")
232240
if err then
233241
ngx.say(err)
@@ -264,6 +272,7 @@ GET /t
264272
},
265273
user = 'root',
266274
password = 'abc123',
275+
init_count = -1,
267276
})
268277
269278
etcd:set("/get_target_status", { a='abc'})
@@ -301,19 +310,21 @@ false
301310
},
302311
user = 'root',
303312
password = 'abc123',
313+
init_count = -1,
304314
})
305315
306-
local res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- trigger http://127.0.0.1:42379 to unhealthy
316+
local res, err
307317
308-
res, err = etcd:set("/fail_timeout", "http://127.0.0.1:22379") -- choose http://127.0.0.1:22379 to set value
309-
res, err = etcd:get("/fail_timeout")
310-
assert(res.body.kvs[1].value == "http://127.0.0.1:22379")
311-
312-
ngx.sleep(2)
318+
-- make sure to select http://127.0.0.1:42379 once and trigger it to unhealthy
319+
for i = 1, 3 do
320+
res, err = etcd:set("/fail_timeout", "value")
321+
end
313322
314-
res, err = etcd:set("/fail_timeout", "http://127.0.0.1:42379") -- choose http://127.0.0.1:42379 to set value
315-
res, err = etcd:get("/fail_timeout")
316-
assert(res.body.kvs[1].value == "http://127.0.0.1:22379")
323+
-- ensure that unhealthy http://127.0.0.1:42379 are no longer selected
324+
for i = 1, 3 do
325+
res, err = etcd:get("/fail_timeout")
326+
assert(res.body.kvs[1].value == "value")
327+
end
317328
318329
ngx.say("done")
319330
}
@@ -323,6 +334,8 @@ GET /t
323334
--- timeout: 5
324335
--- response_body
325336
done
337+
--- error_log
338+
update endpoint: http://127.0.0.1:42379 to unhealthy
326339
--- no_error_log
327340
[error]
328341
@@ -387,6 +400,7 @@ has no healthy etcd endpoint available
387400
},
388401
user = 'root',
389402
password = 'abc123',
403+
init_count = -1,
390404
})
391405
392406
local etcd2, err = require "resty.etcd" .new({
@@ -398,6 +412,7 @@ has no healthy etcd endpoint available
398412
},
399413
user = 'root',
400414
password = 'abc123',
415+
init_count = -1,
401416
})
402417
403418
assert(tostring(etcd1) ~= tostring(etcd2))
@@ -493,12 +508,21 @@ qr/update endpoint: http:\/\/localhost:1984 to unhealthy/
493508
},
494509
user = 'root',
495510
password = 'abc123',
511+
init_count = -1,
496512
})
497513
498-
local res, err = etcd:set("/trigger_unhealthy", "abc")
514+
local res, err
515+
for i = 1, 3 do
516+
res, err = etcd:set("/trigger_unhealthy", "abc")
517+
end
499518
check_res(res, err)
500519
local res, err = etcd:get("/trigger_unhealthy")
501520
check_res(res, err, "abc")
521+
522+
-- There are 5 times read and write operations to etcd have occurred here
523+
-- 3 set, 1 get, 1 auth
524+
-- actual 8 times choose endpoint, retry every time 42379 is selected
525+
-- 42379 marked as unhealthy after 3 seleced
502526
}
503527
}
504528
--- request
@@ -531,13 +555,16 @@ checked val as expect: abc
531555
},
532556
user = 'root',
533557
password = 'abc123',
558+
init_count = -1,
534559
})
535560
536561
local body_chunk_fun, err = etcd:watch("/trigger_unhealthy", {timeout = 0.5})
537562
check_res(body_chunk_fun, err)
538563
539564
ngx.timer.at(0.1, function ()
540-
etcd:set("/trigger_unhealthy", "abc")
565+
for i = 1, 3 do
566+
etcd:set("/trigger_unhealthy", "abc")
567+
end
541568
end)
542569
543570
local idx = 0
@@ -563,6 +590,8 @@ qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/
563590
--- response_body_like eval
564591
qr/1:.*"created":true.*
565592
2:.*"value":"abc".*
593+
3:.*"value":"abc".*
594+
4:.*"value":"abc".*
566595
timeout/
567596
--- timeout: 5
568597
@@ -681,6 +710,7 @@ has no healthy etcd endpoint available
681710
"http://127.0.0.1:22379",
682711
"http://127.0.0.1:32379",
683712
},
713+
init_count = -1,
684714
})
685715
686716
local res, err = etcd:set("/test/etcd/healthy", "hello")
@@ -749,6 +779,7 @@ qr/update endpoint: http:\/\/127.0.0.1:12379 to unhealthy/
749779
"http://127.0.0.1:22379",
750780
"http://127.0.0.1:32379",
751781
},
782+
init_count = -1,
752783
})
753784
754785
local res
@@ -827,6 +858,7 @@ healthy check use ngx.shared dict
827858
"http://127.0.0.1:22379",
828859
"http://127.0.0.1:32379",
829860
},
861+
init_count = -1,
830862
})
831863
832864
local res
@@ -844,7 +876,7 @@ healthy check use ngx.shared dict
844876
GET /t
845877
--- response_body
846878
http://127.0.0.1:42379: connection refused
847-
http://127.0.0.1:42379: connection refused
879+
http://127.0.0.1:22379: OK
848880
--- no_error_log eval
849881
qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/
850882
@@ -889,3 +921,39 @@ GET /t
889921
--- response_body
890922
passed
891923
passed
924+
925+
926+
927+
=== TEST 22: ring balancer with specific init_count
928+
--- http_config eval: $::HttpConfig
929+
--- config
930+
location /t {
931+
content_by_lua_block {
932+
local health_check = require("resty.etcd.health_check")
933+
health_check.disable()
934+
local etcd, err = require "resty.etcd" .new({
935+
protocol = "v3",
936+
http_host = {
937+
"http://127.0.0.1:12379",
938+
"http://127.0.0.1:22379",
939+
"http://127.0.0.1:32379",
940+
},
941+
init_count = 101,
942+
})
943+
944+
local res
945+
for i = 1, 3 do
946+
res, _ = etcd:set("/ring_balancer", "abc")
947+
end
948+
949+
ngx.say(etcd.init_count)
950+
}
951+
}
952+
--- request
953+
GET /t
954+
--- response_body
955+
104
956+
--- error_log
957+
choose_endpoint(): choose endpoint: http://127.0.0.1:12379
958+
choose_endpoint(): choose endpoint: http://127.0.0.1:22379
959+
choose_endpoint(): choose endpoint: http://127.0.0.1:32379

0 commit comments

Comments
 (0)