@@ -29,12 +29,15 @@ import (
29
29
"github.com/coreos/etcd/etcdserver"
30
30
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
31
31
"github.com/coreos/etcd/etcdserver/api/v2http"
32
+ "github.com/coreos/etcd/etcdserver/api/v3rpc"
32
33
"github.com/coreos/etcd/pkg/cors"
33
34
"github.com/coreos/etcd/pkg/debugutil"
34
35
runtimeutil "github.com/coreos/etcd/pkg/runtime"
35
36
"github.com/coreos/etcd/pkg/transport"
36
37
"github.com/coreos/etcd/pkg/types"
37
38
"github.com/coreos/etcd/rafthttp"
39
+
40
+ "github.com/cockroachdb/cmux"
38
41
"github.com/coreos/pkg/capnslog"
39
42
"google.golang.org/grpc"
40
43
"google.golang.org/grpc/keepalive"
@@ -60,12 +63,14 @@ const (
60
63
type Etcd struct {
61
64
Peers []* peerListener
62
65
Clients []net.Listener
63
- Server * etcdserver.EtcdServer
66
+ // a map of contexts for the servers that serves client requests.
67
+ sctxs map [string ]* serveCtx
68
+
69
+ Server * etcdserver.EtcdServer
64
70
65
71
cfg Config
66
72
stopc chan struct {}
67
73
errc chan error
68
- sctxs map [string ]* serveCtx
69
74
70
75
closeOnce sync.Once
71
76
}
@@ -91,20 +96,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
91
96
return
92
97
}
93
98
if ! serving {
94
- // errored before starting gRPC server for serveCtx.grpcServerC
99
+ // errored before starting gRPC server for serveCtx.serversC
95
100
for _ , sctx := range e .sctxs {
96
- close (sctx .grpcServerC )
101
+ close (sctx .serversC )
97
102
}
98
103
}
99
104
e .Close ()
100
105
e = nil
101
106
}()
102
107
103
108
if e .Peers , err = startPeerListeners (cfg ); err != nil {
104
- return
109
+ return e , err
105
110
}
106
111
if e .sctxs , err = startClientListeners (cfg ); err != nil {
107
- return
112
+ return e , err
108
113
}
109
114
for _ , sctx := range e .sctxs {
110
115
e .Clients = append (e .Clients , sctx .l )
@@ -150,76 +155,53 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
150
155
}
151
156
152
157
if e .Server , err = etcdserver .NewServer (srvcfg ); err != nil {
153
- return
154
- }
155
-
156
- // configure peer handlers after rafthttp.Transport started
157
- ph := etcdhttp .NewPeerHandler (e .Server )
158
- for _ , p := range e .Peers {
159
- srv := & http.Server {
160
- Handler : ph ,
161
- ReadTimeout : 5 * time .Minute ,
162
- ErrorLog : defaultLog .New (ioutil .Discard , "" , 0 ), // do not log user error
163
- }
164
-
165
- l := p .Listener
166
- p .serve = func () error { return srv .Serve (l ) }
167
- p .close = func (ctx context.Context ) error {
168
- // gracefully shutdown http.Server
169
- // close open listeners, idle connections
170
- // until context cancel or time-out
171
- return srv .Shutdown (ctx )
172
- }
158
+ return e , err
173
159
}
174
160
175
161
// buffer channel so goroutines on closed connections won't wait forever
176
162
e .errc = make (chan error , len (e .Peers )+ len (e .Clients )+ 2 * len (e .sctxs ))
177
163
178
164
e .Server .Start ()
179
- if err = e .serve (); err != nil {
180
- return
165
+
166
+ if err = e .servePeers (); err != nil {
167
+ return e , err
181
168
}
169
+ if err = e .serveClients (); err != nil {
170
+ return e , err
171
+ }
172
+
182
173
serving = true
183
- return
174
+ return e , nil
184
175
}
185
176
186
177
// Config returns the current configuration.
187
178
func (e * Etcd ) Config () Config {
188
179
return e .cfg
189
180
}
190
181
182
+ // Close gracefully shuts down all servers/listeners.
183
+ // Client requests will be terminated with request timeout.
184
+ // After timeout, enforce remaning requests be closed immediately.
191
185
func (e * Etcd ) Close () {
192
186
e .closeOnce .Do (func () { close (e .stopc ) })
193
187
188
+ // close client requests with request timeout
194
189
timeout := 2 * time .Second
195
190
if e .Server != nil {
196
191
timeout = e .Server .Cfg .ReqTimeout ()
197
192
}
198
193
for _ , sctx := range e .sctxs {
199
- for gs := range sctx .grpcServerC {
200
- ch := make (chan struct {})
201
- go func () {
202
- defer close (ch )
203
- // close listeners to stop accepting new connections,
204
- // will block on any existing transports
205
- gs .GracefulStop ()
206
- }()
207
- // wait until all pending RPCs are finished
208
- select {
209
- case <- ch :
210
- case <- time .After (timeout ):
211
- // took too long, manually close open transports
212
- // e.g. watch streams
213
- gs .Stop ()
214
- // concurrent GracefulStop should be interrupted
215
- <- ch
216
- }
194
+ for ss := range sctx .serversC {
195
+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
196
+ stopServers (ctx , ss )
197
+ cancel ()
217
198
}
218
199
}
219
200
220
201
for _ , sctx := range e .sctxs {
221
202
sctx .cancel ()
222
203
}
204
+
223
205
for i := range e .Clients {
224
206
if e .Clients [i ] != nil {
225
207
e .Clients [i ].Close ()
@@ -241,6 +223,43 @@ func (e *Etcd) Close() {
241
223
}
242
224
}
243
225
226
+ func stopServers (ctx context.Context , ss * servers ) {
227
+ shutdownNow := func () {
228
+ // first, close the http.Server
229
+ ss .http .Shutdown (ctx )
230
+ // then close grpc.Server; cancels all active RPCs
231
+ ss .grpc .Stop ()
232
+ }
233
+
234
+ // do not grpc.Server.GracefulStop with TLS enabled etcd server
235
+ // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
236
+ // and https://github.com/coreos/etcd/issues/8916
237
+ if ss .secure {
238
+ shutdownNow ()
239
+ return
240
+ }
241
+
242
+ ch := make (chan struct {})
243
+ go func () {
244
+ defer close (ch )
245
+ // close listeners to stop accepting new connections,
246
+ // will block on any existing transports
247
+ ss .grpc .GracefulStop ()
248
+ }()
249
+
250
+ // wait until all pending RPCs are finished
251
+ select {
252
+ case <- ch :
253
+ case <- ctx .Done ():
254
+ // took too long, manually close open transports
255
+ // e.g. watch streams
256
+ shutdownNow ()
257
+
258
+ // concurrent GracefulStop should be interrupted
259
+ <- ch
260
+ }
261
+ }
262
+
244
263
func (e * Etcd ) Err () <- chan error { return e .errc }
245
264
246
265
func startPeerListeners (cfg * Config ) (peers []* peerListener , err error ) {
@@ -269,7 +288,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
269
288
for i := range peers {
270
289
if peers [i ] != nil && peers [i ].close != nil {
271
290
plog .Info ("stopping listening for peers on " , cfg .LPUrls [i ].String ())
272
- peers [i ].close (context .Background ())
291
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second )
292
+ peers [i ].close (ctx )
293
+ cancel ()
273
294
}
274
295
}
275
296
}()
@@ -297,6 +318,45 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
297
318
return peers , nil
298
319
}
299
320
321
+ // configure peer handlers after rafthttp.Transport started
322
+ func (e * Etcd ) servePeers () (err error ) {
323
+ ph := etcdhttp .NewPeerHandler (e .Server )
324
+ var peerTLScfg * tls.Config
325
+ if ! e .cfg .PeerTLSInfo .Empty () {
326
+ if peerTLScfg , err = e .cfg .PeerTLSInfo .ServerConfig (); err != nil {
327
+ return err
328
+ }
329
+ }
330
+
331
+ for _ , p := range e .Peers {
332
+ gs := v3rpc .Server (e .Server , peerTLScfg )
333
+ m := cmux .New (p .Listener )
334
+ go gs .Serve (m .Match (cmux .HTTP2 ()))
335
+ srv := & http.Server {
336
+ Handler : grpcHandlerFunc (gs , ph ),
337
+ ReadTimeout : 5 * time .Minute ,
338
+ ErrorLog : defaultLog .New (ioutil .Discard , "" , 0 ), // do not log user error
339
+ }
340
+ go srv .Serve (m .Match (cmux .Any ()))
341
+ p .serve = func () error { return m .Serve () }
342
+ p .close = func (ctx context.Context ) error {
343
+ // gracefully shutdown http.Server
344
+ // close open listeners, idle connections
345
+ // until context cancel or time-out
346
+ stopServers (ctx , & servers {secure : peerTLScfg != nil , grpc : gs , http : srv })
347
+ return nil
348
+ }
349
+ }
350
+
351
+ // start peer servers in a goroutine
352
+ for _ , pl := range e .Peers {
353
+ go func (l * peerListener ) {
354
+ e .errHandler (l .serve ())
355
+ }(pl )
356
+ }
357
+ return nil
358
+ }
359
+
300
360
func startClientListeners (cfg * Config ) (sctxs map [string ]* serveCtx , err error ) {
301
361
if cfg .ClientAutoTLS && cfg .ClientTLSInfo .Empty () {
302
362
chosts := make ([]string , len (cfg .LCUrls ))
@@ -388,7 +448,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
388
448
return sctxs , nil
389
449
}
390
450
391
- func (e * Etcd ) serve () (err error ) {
451
+ func (e * Etcd ) serveClients () (err error ) {
392
452
var ctlscfg * tls.Config
393
453
if ! e .cfg .ClientTLSInfo .Empty () {
394
454
plog .Infof ("ClientTLS: %s" , e .cfg .ClientTLSInfo )
@@ -401,13 +461,6 @@ func (e *Etcd) serve() (err error) {
401
461
plog .Infof ("cors = %s" , e .cfg .CorsInfo )
402
462
}
403
463
404
- // Start the peer server in a goroutine
405
- for _ , pl := range e .Peers {
406
- go func (l * peerListener ) {
407
- e .errHandler (l .serve ())
408
- }(pl )
409
- }
410
-
411
464
// Start a client server goroutine for each listen address
412
465
var h http.Handler
413
466
if e .Config ().EnableV2 {
@@ -433,6 +486,8 @@ func (e *Etcd) serve() (err error) {
433
486
Timeout : e .cfg .GRPCKeepAliveTimeout ,
434
487
}))
435
488
}
489
+
490
+ // start client servers in a goroutine
436
491
for _ , sctx := range e .sctxs {
437
492
go func (s * serveCtx ) {
438
493
e .errHandler (s .serve (e .Server , ctlscfg , h , e .errHandler , gopts ... ))
0 commit comments