@@ -13,6 +13,7 @@ local now = ngx.now
13
13
local sub_str = string.sub
14
14
local str_byte = string.byte
15
15
local str_char = string.char
16
+ local str_find = string.find
16
17
local ipairs = ipairs
17
18
local pairs = pairs
18
19
local pcall = pcall
@@ -281,6 +282,45 @@ local function serialize_and_encode_base64(serialize_fn, data)
281
282
end
282
283
283
284
285
+ local function choose_grpc_endpoint (self )
286
+ local connect_opts = {
287
+ max_recv_msg_size = 2147483647 ,
288
+ }
289
+
290
+ local endpoint , err = choose_endpoint (self )
291
+ if not endpoint then
292
+ return nil , err , nil
293
+ end
294
+
295
+ if endpoint .scheme == " https" then
296
+ connect_opts .insecure = false
297
+ end
298
+
299
+ connect_opts .tls_verify = self .ssl_verify
300
+ connect_opts .client_cert = self .ssl_cert_path
301
+ connect_opts .client_key = self .ssl_key_path
302
+ connect_opts .trusted_ca = self .trusted_ca
303
+
304
+ local target
305
+ if self .unix_socket_proxy then
306
+ target = self .unix_socket_proxy
307
+ else
308
+ target = endpoint .address .. " :" .. endpoint .port
309
+ end
310
+
311
+ utils .log_info (" etcd grpc connect to " , target )
312
+ local conn , err = self .grpc .connect (target , connect_opts )
313
+ if not conn then
314
+ return nil , err , endpoint .http_host
315
+ end
316
+
317
+ -- we disable health check when proxying via unix socket,
318
+ -- so the http_host will always point to a real address when the failure is reported
319
+ conn .http_host = endpoint .http_host
320
+ return conn
321
+ end
322
+
323
+
284
324
function _M .new (opts )
285
325
local timeout = opts .timeout
286
326
local ttl = opts .ttl
@@ -396,6 +436,7 @@ function _M.new(opts)
396
436
397
437
ssl_cert_path = opts .ssl_cert_path ,
398
438
ssl_key_path = opts .ssl_key_path ,
439
+ trusted_ca = opts .trusted_ca ,
399
440
extra_headers = extra_headers ,
400
441
sni = sni ,
401
442
unix_socket_proxy = unix_socket_proxy ,
@@ -418,48 +459,13 @@ function _M.new(opts)
418
459
cli .grpc = grpc
419
460
cli .call_opts = {}
420
461
421
- local connect_opts = {
422
- max_recv_msg_size = 2147483647 ,
423
- }
424
-
425
- local endpoint , err = choose_endpoint (cli )
426
- if not endpoint then
427
- return nil , err
428
- end
429
-
430
- if endpoint .scheme == " https" then
431
- connect_opts .insecure = false
432
- end
433
-
434
- connect_opts .tls_verify = cli .ssl_verify
435
- connect_opts .client_cert = cli .ssl_cert_path
436
- connect_opts .client_key = cli .ssl_key_path
437
- connect_opts .trusted_ca = opts .trusted_ca
438
-
439
- local conn , err
440
- if unix_socket_proxy then
441
- conn , err = grpc .connect (unix_socket_proxy , connect_opts )
442
- else
443
- conn , err = grpc .connect (endpoint .address .. " :" .. endpoint .port , connect_opts )
444
- end
462
+ local conn , err = choose_grpc_endpoint (cli )
445
463
if not conn then
446
464
return nil , err
447
465
end
448
466
cli .conn = conn
449
467
450
- cli = setmetatable (cli , grpc_mt )
451
-
452
- if cli .user then
453
- local auth_req = {name = cli .user , password = cli .password }
454
- local res , err = cli :grpc_call (" etcdserverpb.Auth" , " Authenticate" , auth_req )
455
- if not res then
456
- return nil , err
457
- end
458
-
459
- cli .grpc_token = res .body .token
460
- end
461
-
462
- return cli
468
+ return setmetatable (cli , grpc_mt )
463
469
end
464
470
465
471
local sema , err = semaphore .new ()
@@ -1017,6 +1023,17 @@ do
1017
1023
{" token" , " " }
1018
1024
}
1019
1025
function get_grpc_metadata (self )
1026
+ if not self .grpc_token and self .user then
1027
+ local auth_req = {name = self .user , password = self .password }
1028
+ local res , err = self :grpc_call (" etcdserverpb.Auth" ,
1029
+ " Authenticate" , auth_req )
1030
+ if not res then
1031
+ return nil , err
1032
+ end
1033
+
1034
+ self .grpc_token = res .body .token
1035
+ end
1036
+
1020
1037
if self .grpc_token then
1021
1038
metadata [1 ][2 ] = self .grpc_token
1022
1039
return metadata
@@ -1043,15 +1060,22 @@ function _grpc_M.create_grpc_watch_stream(self, key, attr, opts)
1043
1060
self .call_opts .timeout = self .timeout * 1000
1044
1061
end
1045
1062
1046
- self .call_opts .metadata = get_grpc_metadata (self )
1063
+ local data , err = get_grpc_metadata (self )
1064
+ if err then
1065
+ return nil , err
1066
+ end
1067
+ self .call_opts .metadata = data
1047
1068
1048
1069
local st , err = conn :new_server_stream (" etcdserverpb.Watch" , " Watch" , req , self .call_opts )
1049
1070
if not st then
1071
+ -- report but don't retry by itself - APISIX will retry syncing after failed
1072
+ health_check .report_failure (conn .http_host )
1050
1073
return nil , err
1051
1074
end
1052
1075
1053
1076
local res , err = st :recv ()
1054
1077
if not res then
1078
+ health_check .report_failure (conn .http_host )
1055
1079
return nil , err
1056
1080
end
1057
1081
@@ -1062,6 +1086,7 @@ end
1062
1086
function _grpc_M .read_grpc_watch_stream (self , watching_stream )
1063
1087
local res , err = watching_stream :recv ()
1064
1088
if not res then
1089
+ health_check .report_failure (self .conn .http_host )
1065
1090
return nil , err
1066
1091
end
1067
1092
@@ -1151,8 +1176,16 @@ function _grpc_M.convert_grpc_to_http_res(self, res)
1151
1176
end
1152
1177
1153
1178
1179
+ local function filter_out_no_retry_err (err )
1180
+ if str_find (err , " key is not provided" , 1 , true ) then
1181
+ return err
1182
+ end
1183
+
1184
+ return nil
1185
+ end
1186
+
1187
+
1154
1188
function _grpc_M .grpc_call (self , serv , meth , attr , key , val , opts )
1155
- local conn = self .conn
1156
1189
attr .key = key
1157
1190
if val then
1158
1191
attr .value = serialize_grpc_value (self .serializer .serialize , val )
@@ -1165,9 +1198,67 @@ function _grpc_M.grpc_call(self, serv, meth, attr, key, val, opts)
1165
1198
self .call_opts .timeout = self .timeout * 1000
1166
1199
end
1167
1200
self .call_opts .int64_encoding = self .grpc .INT64_AS_STRING
1168
- self .call_opts .metadata = get_grpc_metadata (self )
1169
1201
1170
- local res , err = conn :call (serv , meth , attr , self .call_opts )
1202
+ if meth ~= " Authenticate" then
1203
+ local data , err = get_grpc_metadata (self )
1204
+ if err then
1205
+ return nil , err
1206
+ end
1207
+ self .call_opts .metadata = data
1208
+ end
1209
+
1210
+ local conn = self .conn
1211
+ local http_host
1212
+ local res , err
1213
+ if health_check .conf .retry then
1214
+ local max_retry = # self .endpoints * health_check .conf .max_fails + 1
1215
+ for i = 1 , max_retry do
1216
+ if conn then
1217
+ http_host = conn .http_host
1218
+ res , err = conn :call (serv , meth , attr , self .call_opts )
1219
+ if res then
1220
+ self .conn = conn
1221
+ break
1222
+ end
1223
+
1224
+ if filter_out_no_retry_err (err ) then
1225
+ return nil , err
1226
+ end
1227
+ end
1228
+
1229
+ health_check .report_failure (http_host )
1230
+
1231
+ if i < max_retry then
1232
+ utils .log_warn (" Tried " , http_host , " failed: " ,
1233
+ err , " . Retrying" )
1234
+ end
1235
+
1236
+ conn , err , http_host = choose_grpc_endpoint (self )
1237
+ if not conn and not http_host then
1238
+ -- no endpoint can be retries
1239
+ return nil , err
1240
+ end
1241
+ end
1242
+ else
1243
+ res , err = self .conn :call (serv , meth , attr , self .call_opts )
1244
+ if not res then
1245
+ if filter_out_no_retry_err (err ) then
1246
+ return nil , err
1247
+ end
1248
+
1249
+ health_check .report_failure (self .conn .http_host )
1250
+
1251
+ local conn , new_err = choose_grpc_endpoint (self )
1252
+ if not conn then
1253
+ utils .log_info (" failed to use next connection: " , new_err )
1254
+ return nil , err
1255
+ end
1256
+
1257
+ self .conn = conn
1258
+ return nil , err
1259
+ end
1260
+ end
1261
+
1171
1262
return self :convert_grpc_to_http_res (res ), err
1172
1263
end
1173
1264
0 commit comments