Skip to content

Commit dae03bc

Browse files
authored
feat(grpc): support watching dir (#195)
1 parent fb67fad commit dae03bc

File tree

2 files changed

+128
-14
lines changed

2 files changed

+128
-14
lines changed

lib/resty/etcd/v3.lua

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -934,17 +934,15 @@ local function get_range_end(key)
934934
end
935935

936936

937-
local function watch(self, key, attr)
937+
local function create_watch_request(key, attr)
938938
-- verify key
939939
if #key == 0 then
940940
key = str_char(0)
941941
end
942942

943-
key = encode_base64(key)
944-
945943
local range_end
946944
if attr.range_end then
947-
range_end = encode_base64(attr.range_end)
945+
range_end = attr.range_end
948946
end
949947

950948
local prev_kv
@@ -977,23 +975,91 @@ local function watch(self, key, attr)
977975
filters = attr.filters and attr.filters or 0
978976
end
979977

978+
local create_request = {
979+
key = key,
980+
range_end = range_end,
981+
prev_kv = prev_kv,
982+
start_revision = start_revision,
983+
watch_id = watch_id,
984+
progress_notify = progress_notify,
985+
fragment = fragment,
986+
filters = filters,
987+
}
988+
989+
return create_request
990+
end
991+
992+
993+
function _grpc_M.create_grpc_watch_stream(self, key, attr, opts)
994+
key = utils.get_real_key(self.key_prefix, key)
995+
attr.range_end = get_range_end(key)
996+
997+
local req = {
998+
create_request = create_watch_request(key, attr),
999+
}
1000+
1001+
local conn = self.conn
1002+
if opts then
1003+
self.call_opts.timeout = opts.timeout and opts.timeout * 1000
1004+
end
1005+
if not self.call_opts.timeout then
1006+
self.call_opts.timeout = self.timeout * 1000
1007+
end
1008+
1009+
local st, err = conn:new_server_stream("etcdserverpb.Watch", "Watch", req, self.call_opts)
1010+
if not st then
1011+
return nil, err
1012+
end
1013+
1014+
local res, err = st:recv()
1015+
if not res then
1016+
return nil, err
1017+
end
1018+
1019+
return st
1020+
end
1021+
1022+
1023+
function _grpc_M.read_grpc_watch_stream(self, watching_stream)
1024+
local res, err = watching_stream:recv()
1025+
if not res then
1026+
return nil, err
1027+
end
1028+
1029+
if res.events then
1030+
for _, event in ipairs(res.events) do
1031+
if event.kv.value then -- DELETE not have value
1032+
event.kv.value = self.serializer.deserialize(event.kv.value)
1033+
end
1034+
if event.prev_kv then
1035+
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
1036+
end
1037+
end
1038+
end
1039+
1040+
local wrapped_res = {
1041+
result = res,
1042+
}
1043+
return wrapped_res
1044+
end
1045+
1046+
1047+
local function watch(self, key, attr)
9801048
local need_cancel
9811049
if attr.need_cancel then
9821050
need_cancel = attr.need_cancel and true or false
9831051
end
9841052

1053+
local create_request = create_watch_request(key, attr)
1054+
create_request.key = encode_base64(key)
1055+
1056+
if attr.range_end then
1057+
create_request.range_end = encode_base64(attr.range_end)
1058+
end
1059+
9851060
local opts = {
9861061
body = {
987-
create_request = {
988-
key = key,
989-
range_end = range_end,
990-
prev_kv = prev_kv,
991-
start_revision = start_revision,
992-
watch_id = watch_id,
993-
progress_notify = progress_notify,
994-
fragment = fragment,
995-
filters = filters,
996-
}
1062+
create_request = create_request,
9971063
},
9981064
need_cancel = need_cancel,
9991065
}
@@ -1010,6 +1076,7 @@ local function watch(self, key, attr)
10101076
return callback_fun
10111077
end
10121078

1079+
10131080
function _grpc_M.convert_grpc_to_http_res(self, res)
10141081
if res == nil then
10151082
return nil

t/v3/grpc/key.t

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,50 @@ ok
152152
}
153153
--- response_body
154154
checked val as expect: abc
155+
156+
157+
158+
=== TEST 4: watch dir
159+
--- http_config eval: $::HttpConfig
160+
--- config
161+
location /t {
162+
content_by_lua_block {
163+
local etcd, err = require("resty.etcd").new({protocol = "v3", use_grpc = true})
164+
check_res(etcd, err)
165+
166+
local res, err = etcd:set("/test/1", "abc")
167+
check_res(res, err)
168+
169+
local opts = {}
170+
opts.timeout = 0.5
171+
172+
local st, err = etcd:create_grpc_watch_stream("/test/", {}, opts)
173+
if not st then
174+
ngx.log(ngx.ERR, "create watch stream failed: ", err)
175+
return
176+
end
177+
178+
ngx.timer.at(0.1, function ()
179+
etcd:set("/test/1", "bcd3")
180+
end)
181+
182+
local idx = 0
183+
while true do
184+
local chunk, err = etcd:read_grpc_watch_stream(st)
185+
186+
if not chunk then
187+
if err then
188+
ngx.say(err)
189+
end
190+
break
191+
end
192+
193+
idx = idx + 1
194+
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
195+
end
196+
}
197+
}
198+
--- response_body_like eval
199+
qr/1:.*"value":"bcd3".*
200+
.*context deadline exceeded/
201+
--- timeout: 5

0 commit comments

Comments
 (0)