Skip to content

Commit 1f41059

Browse files
committed
etcdserver: fix incorrect metrics generated when clients cancel watches
Before this patch, a client which cancels the context for a watch results in the server generating a `rpctypes.ErrGRPCNoLeader` error that leads the recording of a gRPC `Unavailable` metric in association with the client watch cancellation. The metric looks like this: grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} So, the watch server has misidentified the error as a server error and then propagates the mistake to metrics, leading to a false indicator that the leader has been lost. This false signal then leads to false alerting. The commit 9c103dd introduced an interceptor which wraps watch streams requiring a leader, causing those streams to be actively canceled when leader loss is detected. However, the error handling code assumes all stream context cancellations are from the interceptor. This assumption is broken when the context was canceled because of a client stream cancelation. The core challenge is lack of information conveyed via `context.Context` which is shared by both the send and receive sides of the stream handling and is subject to cancellation by all paths (including the gRPC library itself). If any piece of the system cancels the shared context, there's no way for a context consumer to understand who cancelled the context or why. To solve the ambiguity of the stream interceptor code specifically, this patch introduces a custom context struct which the interceptor uses to expose a custom error through the context when the interceptor decides to actively cancel a stream. Now the consuming side can more safely assume a generic context cancellation can be propagated as a cancellation, and the server generated leader error is preserved and propagated normally without any special inference. When a client cancels the stream, there remains a race in the error handling code between the send and receive goroutines whereby the underlying gRPC error is lost in the case where the send path returns and is handled first, but this issue can be taken separately as no matter which paths wins, we can detect a generic cancellation. This is a replacement of #11375. Fixes #10289, #9725, #9576, #9166
1 parent b47cd2f commit 1f41059

File tree

4 files changed

+56
-10
lines changed

4 files changed

+56
-10
lines changed

etcdserver/api/v3rpc/interceptor.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ import (
2525
"go.etcd.io/etcd/v3/pkg/types"
2626
"go.etcd.io/etcd/v3/raft"
2727

28-
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
2928
"go.uber.org/zap"
3029
"google.golang.org/grpc"
3130
"google.golang.org/grpc/metadata"
3231
"google.golang.org/grpc/peer"
32+
33+
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
3334
)
3435

3536
const (
@@ -231,8 +232,13 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
231232
return rpctypes.ErrGRPCNoLeader
232233
}
233234

234-
cctx, cancel := context.WithCancel(ss.Context())
235-
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
235+
cancelCtx, cancelFn := context.WithCancel(ss.Context())
236+
monitorCtx := &leaderMonitoringContext{
237+
Context: cancelCtx,
238+
cancel: cancelFn,
239+
}
240+
cancelForLeaderLoss := context.CancelFunc(monitorCtx.CancelForLeaderLoss)
241+
ss = serverStreamWithCtx{ctx: monitorCtx, cancel: &cancelForLeaderLoss, ServerStream: ss}
236242

237243
smap.mu.Lock()
238244
smap.streams[ss] = struct{}{}
@@ -242,7 +248,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
242248
smap.mu.Lock()
243249
delete(smap.streams, ss)
244250
smap.mu.Unlock()
245-
cancel()
251+
monitorCtx.Cancel()
246252
}()
247253
}
248254
}
@@ -251,6 +257,40 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
251257
}
252258
}
253259

260+
// leaderMonitoringContext wraps a context and provides a custom error when
261+
// the CancelForLeaderLoss() method is used to cancel the context. This is
262+
// so downstream context users can disambiguate the reason for the cancellation
263+
// which could be from the client (for example) or from this interceptor code.
264+
type leaderMonitoringContext struct {
265+
context.Context
266+
267+
lock sync.Mutex
268+
cancel context.CancelFunc
269+
cancelReason error
270+
}
271+
272+
func (c *leaderMonitoringContext) Cancel() {
273+
c.lock.Lock()
274+
defer c.lock.Unlock()
275+
c.cancel()
276+
}
277+
278+
func (c *leaderMonitoringContext) CancelForLeaderLoss() {
279+
c.lock.Lock()
280+
defer c.lock.Unlock()
281+
c.cancelReason = rpctypes.ErrGRPCNoLeader
282+
c.cancel()
283+
}
284+
285+
func (c *leaderMonitoringContext) Err() error {
286+
c.lock.Lock()
287+
defer c.lock.Unlock()
288+
if c.cancelReason != nil {
289+
return c.cancelReason
290+
}
291+
return c.Context.Err()
292+
}
293+
254294
type serverStreamWithCtx struct {
255295
grpc.ServerStream
256296
ctx context.Context

etcdserver/api/v3rpc/rpctypes/error.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ var (
3535
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
3636
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
3737

38+
ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()
39+
3840
ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
3941
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
4042
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()

etcdserver/api/v3rpc/watch.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import (
2121
"sync"
2222
"time"
2323

24+
"go.uber.org/zap"
25+
2426
"go.etcd.io/etcd/v3/auth"
2527
"go.etcd.io/etcd/v3/etcdserver"
2628
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
2729
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
2830
"go.etcd.io/etcd/v3/mvcc"
2931
"go.etcd.io/etcd/v3/mvcc/mvccpb"
30-
31-
"go.uber.org/zap"
3232
)
3333

3434
const minWatchProgressInterval = 100 * time.Millisecond
@@ -199,13 +199,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
199199

200200
select {
201201
case err = <-errc:
202+
if err == context.Canceled {
203+
err = rpctypes.ErrGRPCWatchCanceled
204+
}
202205
close(sws.ctrlStream)
203-
204206
case <-stream.Context().Done():
205207
err = stream.Context().Err()
206-
// the only server-side cancellation is noleader for now.
207208
if err == context.Canceled {
208-
err = rpctypes.ErrGRPCNoLeader
209+
err = rpctypes.ErrGRPCWatchCanceled
209210
}
210211
}
211212

tests/e2e/metrics_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
4949
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
5050
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
5151
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
52+
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
5253
{"/health", `{"health":"true","reason":""}`},
5354
} {
5455
i++
@@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
5859
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
5960
cx.t.Fatal(err)
6061
}
61-
62+
if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
63+
cx.t.Fatal(err)
64+
}
6265
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
6366
cx.t.Fatalf("failed get with curl (%v)", err)
6467
}

0 commit comments

Comments
 (0)