Skip to content

Commit 288ef7d

Browse files
committed
embed: fix gRPC server panic on GracefulStop
Cherry-pick #8987. Signed-off-by: Gyuho Lee <[email protected]>
1 parent 7b7722e commit 288ef7d

File tree

2 files changed

+128
-65
lines changed

2 files changed

+128
-65
lines changed

embed/etcd.go

Lines changed: 111 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ import (
2929
"github.com/coreos/etcd/etcdserver"
3030
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
3131
"github.com/coreos/etcd/etcdserver/api/v2http"
32+
"github.com/coreos/etcd/etcdserver/api/v3rpc"
3233
"github.com/coreos/etcd/pkg/cors"
3334
"github.com/coreos/etcd/pkg/debugutil"
3435
runtimeutil "github.com/coreos/etcd/pkg/runtime"
3536
"github.com/coreos/etcd/pkg/transport"
3637
"github.com/coreos/etcd/pkg/types"
3738
"github.com/coreos/etcd/rafthttp"
39+
40+
"github.com/cockroachdb/cmux"
3841
"github.com/coreos/pkg/capnslog"
3942
"google.golang.org/grpc"
4043
"google.golang.org/grpc/keepalive"
@@ -60,12 +63,14 @@ const (
6063
type Etcd struct {
6164
Peers []*peerListener
6265
Clients []net.Listener
63-
Server *etcdserver.EtcdServer
66+
// a map of contexts for the servers that serves client requests.
67+
sctxs map[string]*serveCtx
68+
69+
Server *etcdserver.EtcdServer
6470

6571
cfg Config
6672
stopc chan struct{}
6773
errc chan error
68-
sctxs map[string]*serveCtx
6974

7075
closeOnce sync.Once
7176
}
@@ -91,20 +96,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
9196
return
9297
}
9398
if !serving {
94-
// errored before starting gRPC server for serveCtx.grpcServerC
99+
// errored before starting gRPC server for serveCtx.serversC
95100
for _, sctx := range e.sctxs {
96-
close(sctx.grpcServerC)
101+
close(sctx.serversC)
97102
}
98103
}
99104
e.Close()
100105
e = nil
101106
}()
102107

103108
if e.Peers, err = startPeerListeners(cfg); err != nil {
104-
return
109+
return e, err
105110
}
106111
if e.sctxs, err = startClientListeners(cfg); err != nil {
107-
return
112+
return e, err
108113
}
109114
for _, sctx := range e.sctxs {
110115
e.Clients = append(e.Clients, sctx.l)
@@ -150,76 +155,53 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
150155
}
151156

152157
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
153-
return
154-
}
155-
156-
// configure peer handlers after rafthttp.Transport started
157-
ph := etcdhttp.NewPeerHandler(e.Server)
158-
for _, p := range e.Peers {
159-
srv := &http.Server{
160-
Handler: ph,
161-
ReadTimeout: 5 * time.Minute,
162-
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
163-
}
164-
165-
l := p.Listener
166-
p.serve = func() error { return srv.Serve(l) }
167-
p.close = func(ctx context.Context) error {
168-
// gracefully shutdown http.Server
169-
// close open listeners, idle connections
170-
// until context cancel or time-out
171-
return srv.Shutdown(ctx)
172-
}
158+
return e, err
173159
}
174160

175161
// buffer channel so goroutines on closed connections won't wait forever
176162
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
177163

178164
e.Server.Start()
179-
if err = e.serve(); err != nil {
180-
return
165+
166+
if err = e.servePeers(); err != nil {
167+
return e, err
181168
}
169+
if err = e.serveClients(); err != nil {
170+
return e, err
171+
}
172+
182173
serving = true
183-
return
174+
return e, nil
184175
}
185176

186177
// Config returns the current configuration.
187178
func (e *Etcd) Config() Config {
188179
return e.cfg
189180
}
190181

182+
// Close gracefully shuts down all servers/listeners.
183+
// Client requests will be terminated with request timeout.
184+
// After timeout, enforce remaning requests be closed immediately.
191185
func (e *Etcd) Close() {
192186
e.closeOnce.Do(func() { close(e.stopc) })
193187

188+
// close client requests with request timeout
194189
timeout := 2 * time.Second
195190
if e.Server != nil {
196191
timeout = e.Server.Cfg.ReqTimeout()
197192
}
198193
for _, sctx := range e.sctxs {
199-
for gs := range sctx.grpcServerC {
200-
ch := make(chan struct{})
201-
go func() {
202-
defer close(ch)
203-
// close listeners to stop accepting new connections,
204-
// will block on any existing transports
205-
gs.GracefulStop()
206-
}()
207-
// wait until all pending RPCs are finished
208-
select {
209-
case <-ch:
210-
case <-time.After(timeout):
211-
// took too long, manually close open transports
212-
// e.g. watch streams
213-
gs.Stop()
214-
// concurrent GracefulStop should be interrupted
215-
<-ch
216-
}
194+
for ss := range sctx.serversC {
195+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
196+
stopServers(ctx, ss)
197+
cancel()
217198
}
218199
}
219200

220201
for _, sctx := range e.sctxs {
221202
sctx.cancel()
222203
}
204+
223205
for i := range e.Clients {
224206
if e.Clients[i] != nil {
225207
e.Clients[i].Close()
@@ -241,6 +223,43 @@ func (e *Etcd) Close() {
241223
}
242224
}
243225

226+
func stopServers(ctx context.Context, ss *servers) {
227+
shutdownNow := func() {
228+
// first, close the http.Server
229+
ss.http.Shutdown(ctx)
230+
// then close grpc.Server; cancels all active RPCs
231+
ss.grpc.Stop()
232+
}
233+
234+
// do not grpc.Server.GracefulStop with TLS enabled etcd server
235+
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
236+
// and https://github.com/coreos/etcd/issues/8916
237+
if ss.secure {
238+
shutdownNow()
239+
return
240+
}
241+
242+
ch := make(chan struct{})
243+
go func() {
244+
defer close(ch)
245+
// close listeners to stop accepting new connections,
246+
// will block on any existing transports
247+
ss.grpc.GracefulStop()
248+
}()
249+
250+
// wait until all pending RPCs are finished
251+
select {
252+
case <-ch:
253+
case <-ctx.Done():
254+
// took too long, manually close open transports
255+
// e.g. watch streams
256+
shutdownNow()
257+
258+
// concurrent GracefulStop should be interrupted
259+
<-ch
260+
}
261+
}
262+
244263
func (e *Etcd) Err() <-chan error { return e.errc }
245264

246265
func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
@@ -269,7 +288,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
269288
for i := range peers {
270289
if peers[i] != nil && peers[i].close != nil {
271290
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
272-
peers[i].close(context.Background())
291+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
292+
peers[i].close(ctx)
293+
cancel()
273294
}
274295
}
275296
}()
@@ -297,6 +318,45 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
297318
return peers, nil
298319
}
299320

321+
// configure peer handlers after rafthttp.Transport started
322+
func (e *Etcd) servePeers() (err error) {
323+
ph := etcdhttp.NewPeerHandler(e.Server)
324+
var peerTLScfg *tls.Config
325+
if !e.cfg.PeerTLSInfo.Empty() {
326+
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
327+
return err
328+
}
329+
}
330+
331+
for _, p := range e.Peers {
332+
gs := v3rpc.Server(e.Server, peerTLScfg)
333+
m := cmux.New(p.Listener)
334+
go gs.Serve(m.Match(cmux.HTTP2()))
335+
srv := &http.Server{
336+
Handler: grpcHandlerFunc(gs, ph),
337+
ReadTimeout: 5 * time.Minute,
338+
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
339+
}
340+
go srv.Serve(m.Match(cmux.Any()))
341+
p.serve = func() error { return m.Serve() }
342+
p.close = func(ctx context.Context) error {
343+
// gracefully shutdown http.Server
344+
// close open listeners, idle connections
345+
// until context cancel or time-out
346+
stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
347+
return nil
348+
}
349+
}
350+
351+
// start peer servers in a goroutine
352+
for _, pl := range e.Peers {
353+
go func(l *peerListener) {
354+
e.errHandler(l.serve())
355+
}(pl)
356+
}
357+
return nil
358+
}
359+
300360
func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
301361
if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
302362
chosts := make([]string, len(cfg.LCUrls))
@@ -388,7 +448,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
388448
return sctxs, nil
389449
}
390450

391-
func (e *Etcd) serve() (err error) {
451+
func (e *Etcd) serveClients() (err error) {
392452
var ctlscfg *tls.Config
393453
if !e.cfg.ClientTLSInfo.Empty() {
394454
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
@@ -401,13 +461,6 @@ func (e *Etcd) serve() (err error) {
401461
plog.Infof("cors = %s", e.cfg.CorsInfo)
402462
}
403463

404-
// Start the peer server in a goroutine
405-
for _, pl := range e.Peers {
406-
go func(l *peerListener) {
407-
e.errHandler(l.serve())
408-
}(pl)
409-
}
410-
411464
// Start a client server goroutine for each listen address
412465
var h http.Handler
413466
if e.Config().EnableV2 {
@@ -433,6 +486,8 @@ func (e *Etcd) serve() (err error) {
433486
Timeout: e.cfg.GRPCKeepAliveTimeout,
434487
}))
435488
}
489+
490+
// start client servers in a goroutine
436491
for _, sctx := range e.sctxs {
437492
go func(s *serveCtx) {
438493
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler, gopts...))

embed/serve.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,22 @@ type serveCtx struct {
5353

5454
userHandlers map[string]http.Handler
5555
serviceRegister func(*grpc.Server)
56-
grpcServerC chan *grpc.Server
56+
serversC chan *servers
57+
}
58+
59+
type servers struct {
60+
secure bool
61+
grpc *grpc.Server
62+
http *http.Server
5763
}
5864

5965
func newServeCtx() *serveCtx {
6066
ctx, cancel := context.WithCancel(context.Background())
61-
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
62-
grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
67+
return &serveCtx{
68+
ctx: ctx,
69+
cancel: cancel,
70+
userHandlers: make(map[string]http.Handler),
71+
serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
6372
}
6473
}
6574

@@ -83,7 +92,6 @@ func (sctx *serveCtx) serve(
8392

8493
if sctx.insecure {
8594
gs := v3rpc.Server(s, nil, gopts...)
86-
sctx.grpcServerC <- gs
8795
v3electionpb.RegisterElectionServer(gs, servElection)
8896
v3lockpb.RegisterLockServer(gs, servLock)
8997
if sctx.serviceRegister != nil {
@@ -92,9 +100,7 @@ func (sctx *serveCtx) serve(
92100
grpcl := m.Match(cmux.HTTP2())
93101
go func() { errHandler(gs.Serve(grpcl)) }()
94102

95-
opts := []grpc.DialOption{
96-
grpc.WithInsecure(),
97-
}
103+
opts := []grpc.DialOption{grpc.WithInsecure()}
98104
gwmux, err := sctx.registerGateway(opts)
99105
if err != nil {
100106
return err
@@ -108,12 +114,13 @@ func (sctx *serveCtx) serve(
108114
}
109115
httpl := m.Match(cmux.HTTP1())
110116
go func() { errHandler(srvhttp.Serve(httpl)) }()
117+
118+
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
111119
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
112120
}
113121

114122
if sctx.secure {
115123
gs := v3rpc.Server(s, tlscfg, gopts...)
116-
sctx.grpcServerC <- gs
117124
v3electionpb.RegisterElectionServer(gs, servElection)
118125
v3lockpb.RegisterLockServer(gs, servLock)
119126
if sctx.serviceRegister != nil {
@@ -142,10 +149,11 @@ func (sctx *serveCtx) serve(
142149
}
143150
go func() { errHandler(srv.Serve(tlsl)) }()
144151

152+
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
145153
plog.Infof("serving client requests on %s", sctx.l.Addr().String())
146154
}
147155

148-
close(sctx.grpcServerC)
156+
close(sctx.serversC)
149157
return m.Serve()
150158
}
151159

0 commit comments

Comments
 (0)