Skip to content

Commit 939337f

Browse files
committed
*: add max requests bytes, keepalive to server, blackhole methods to integration
Signed-off-by: Gyu-Ho Lee <[email protected]>
1 parent 2a6d504 commit 939337f

File tree

14 files changed

+275
-81
lines changed

14 files changed

+275
-81
lines changed

embed/config.go

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net/http"
2222
"net/url"
2323
"strings"
24+
"time"
2425

2526
"github.com/coreos/etcd/etcdserver"
2627
"github.com/coreos/etcd/pkg/cors"
@@ -37,9 +38,13 @@ const (
3738
ClusterStateFlagNew = "new"
3839
ClusterStateFlagExisting = "existing"
3940

40-
DefaultName = "default"
41-
DefaultMaxSnapshots = 5
42-
DefaultMaxWALs = 5
41+
DefaultName = "default"
42+
DefaultMaxSnapshots = 5
43+
DefaultMaxWALs = 5
44+
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
45+
DefaultGRPCKeepAliveMinTime = 5 * time.Second
46+
DefaultGRPCKeepAliveInterval = 2 * time.Hour
47+
DefaultGRPCKeepAliveTimeout = 20 * time.Second
4348

4449
DefaultListenPeerURLs = "http://localhost:2380"
4550
DefaultListenClientURLs = "http://localhost:2379"
@@ -85,6 +90,24 @@ type Config struct {
8590
TickMs uint `json:"heartbeat-interval"`
8691
ElectionMs uint `json:"election-timeout"`
8792
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
93+
MaxRequestBytes uint `json:"max-request-bytes"`
94+
95+
// gRPC server options
96+
97+
// GRPCKeepAliveMinTime is the minimum interval that a client should
98+
// wait before pinging server. When client pings "too fast", server
99+
// sends goaway and closes the connection (errors: too_many_pings,
100+
// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
101+
// Server expects client pings only when there is any active streams
102+
// (PermitWithoutStream is set false).
103+
GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
104+
// GRPCKeepAliveInterval is the frequency of server-to-client ping
105+
// to check if a connection is alive. Close a non-responsive connection
106+
// after an additional duration of Timeout. 0 to disable.
107+
GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
108+
// GRPCKeepAliveTimeout is the additional duration of wait
109+
// before closing a non-responsive connection. 0 to disable.
110+
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`
88111

89112
// clustering
90113

@@ -167,23 +190,27 @@ func NewConfig() *Config {
167190
lcurl, _ := url.Parse(DefaultListenClientURLs)
168191
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
169192
cfg := &Config{
170-
CorsInfo: &cors.CORSInfo{},
171-
MaxSnapFiles: DefaultMaxSnapshots,
172-
MaxWalFiles: DefaultMaxWALs,
173-
Name: DefaultName,
174-
SnapCount: etcdserver.DefaultSnapCount,
175-
TickMs: 100,
176-
ElectionMs: 1000,
177-
LPUrls: []url.URL{*lpurl},
178-
LCUrls: []url.URL{*lcurl},
179-
APUrls: []url.URL{*apurl},
180-
ACUrls: []url.URL{*acurl},
181-
ClusterState: ClusterStateFlagNew,
182-
InitialClusterToken: "etcd-cluster",
183-
StrictReconfigCheck: true,
184-
Metrics: "basic",
185-
EnableV2: true,
186-
AuthToken: "simple",
193+
CorsInfo: &cors.CORSInfo{},
194+
MaxSnapFiles: DefaultMaxSnapshots,
195+
MaxWalFiles: DefaultMaxWALs,
196+
Name: DefaultName,
197+
SnapCount: etcdserver.DefaultSnapCount,
198+
MaxRequestBytes: DefaultMaxRequestBytes,
199+
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
200+
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
201+
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,
202+
TickMs: 100,
203+
ElectionMs: 1000,
204+
LPUrls: []url.URL{*lpurl},
205+
LCUrls: []url.URL{*lcurl},
206+
APUrls: []url.URL{*apurl},
207+
ACUrls: []url.URL{*acurl},
208+
ClusterState: ClusterStateFlagNew,
209+
InitialClusterToken: "etcd-cluster",
210+
StrictReconfigCheck: true,
211+
Metrics: "basic",
212+
EnableV2: true,
213+
AuthToken: "simple",
187214
}
188215
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
189216
return cfg

embed/etcd.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
"github.com/coreos/etcd/pkg/types"
3737
"github.com/coreos/etcd/rafthttp"
3838
"github.com/coreos/pkg/capnslog"
39+
"google.golang.org/grpc"
40+
"google.golang.org/grpc/keepalive"
3941
)
4042

4143
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
@@ -140,6 +142,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
140142
ElectionTicks: cfg.ElectionTicks(),
141143
AutoCompactionRetention: cfg.AutoCompactionRetention,
142144
QuotaBackendBytes: cfg.QuotaBackendBytes,
145+
MaxRequestBytes: cfg.MaxRequestBytes,
143146
StrictReconfigCheck: cfg.StrictReconfigCheck,
144147
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
145148
AuthToken: cfg.AuthToken,
@@ -415,9 +418,23 @@ func (e *Etcd) serve() (err error) {
415418
}
416419
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
417420

421+
gopts := []grpc.ServerOption{}
422+
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
423+
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
424+
MinTime: e.cfg.GRPCKeepAliveMinTime,
425+
PermitWithoutStream: false,
426+
}))
427+
}
428+
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
429+
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
430+
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
431+
Time: e.cfg.GRPCKeepAliveInterval,
432+
Timeout: e.cfg.GRPCKeepAliveTimeout,
433+
}))
434+
}
418435
for _, sctx := range e.sctxs {
419436
go func(s *serveCtx) {
420-
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler))
437+
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler, gopts...))
421438
}(sctx)
422439
}
423440
return nil

embed/serve.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
6666
// serve accepts incoming connections on the listener l,
6767
// creating a new service goroutine for each. The service goroutines
6868
// read requests and then call handler to reply to them.
69-
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error {
69+
func (sctx *serveCtx) serve(
70+
s *etcdserver.EtcdServer,
71+
tlscfg *tls.Config,
72+
handler http.Handler,
73+
errHandler func(error),
74+
gopts ...grpc.ServerOption) error {
7075
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
7176
<-s.ReadyNotify()
7277
plog.Info("ready to serve client requests")
@@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
7782
servLock := v3lock.NewLockServer(v3c)
7883

7984
if sctx.insecure {
80-
gs := v3rpc.Server(s, nil)
85+
gs := v3rpc.Server(s, nil, gopts...)
8186
sctx.grpcServerC <- gs
8287
v3electionpb.RegisterElectionServer(gs, servElection)
8388
v3lockpb.RegisterLockServer(gs, servLock)
@@ -107,7 +112,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
107112
}
108113

109114
if sctx.secure {
110-
gs := v3rpc.Server(s, tlscfg)
115+
gs := v3rpc.Server(s, tlscfg, gopts...)
111116
sctx.grpcServerC <- gs
112117
v3electionpb.RegisterElectionServer(gs, servElection)
113118
v3lockpb.RegisterLockServer(gs, servLock)

etcdmain/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ func newConfig() *config {
138138
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
139139
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
140140
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
141+
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
142+
fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
143+
fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
144+
fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
141145

142146
// clustering
143147
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")

etcdmain/help.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ member flags:
6666
comma-separated whitelist of origins for CORS (cross-origin resource sharing).
6767
--quota-backend-bytes '0'
6868
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
69+
--max-request-bytes '1572864'
70+
maximum client request size in bytes the server will accept.
71+
--grpc-keepalive-min-time '5s'
72+
minimum duration interval that a client should wait before pinging server.
73+
--grpc-keepalive-interval '2h'
74+
frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
75+
--grpc-keepalive-timeout '20s'
76+
additional duration of wait before closing a non-responsive connection (0 to disable).
6977
7078
clustering flags:
7179

etcdserver/api/v3rpc/grpc.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,28 @@ import (
2525
"google.golang.org/grpc/grpclog"
2626
)
2727

28-
const maxStreams = math.MaxUint32
28+
const (
29+
grpcOverheadBytes = 512 * 1024
30+
maxStreams = math.MaxUint32
31+
maxSendBytes = math.MaxInt32
32+
)
2933

3034
func init() {
3135
grpclog.SetLogger(plog)
3236
}
3337

34-
func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
38+
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
3539
var opts []grpc.ServerOption
3640
opts = append(opts, grpc.CustomCodec(&codec{}))
3741
if tls != nil {
3842
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
3943
}
4044
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
4145
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
46+
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
47+
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
4248
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
43-
grpcServer := grpc.NewServer(opts...)
49+
grpcServer := grpc.NewServer(append(opts, gopts...)...)
4450

4551
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
4652
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))

etcdserver/api/v3rpc/rpctypes/error.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package rpctypes
1717
import (
1818
"google.golang.org/grpc"
1919
"google.golang.org/grpc/codes"
20+
"google.golang.org/grpc/status"
2021
)
2122

2223
var (
@@ -188,3 +189,10 @@ func Error(err error) error {
188189
}
189190
return EtcdError{code: grpc.Code(verr), desc: grpc.ErrorDesc(verr)}
190191
}
192+
193+
func ErrorDesc(err error) string {
194+
if s, ok := status.FromError(err); ok {
195+
return s.Message()
196+
}
197+
return err.Error()
198+
}

etcdserver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ type ServerConfig struct {
5555
AutoCompactionRetention int
5656
QuotaBackendBytes int64
5757

58+
// MaxRequestBytes is the maximum request size to send over raft.
59+
MaxRequestBytes uint
60+
5861
StrictReconfigCheck bool
5962

6063
// ClientCertAuthEnabled is true when cert has been signed by the client CA.

etcdserver/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ const (
8282
releaseDelayAfterSnapshot = 30 * time.Second
8383

8484
// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
85-
maxPendingRevokes = 16
85+
maxPendingRevokes = 16
86+
recommendedMaxRequestBytes = 10 * 1024 * 1024
8687
)
8788

8889
var (
@@ -259,6 +260,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
259260
cl *membership.RaftCluster
260261
)
261262

263+
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
264+
plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
265+
}
266+
262267
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
263268
return nil, fmt.Errorf("cannot access data directory: %v", terr)
264269
}

etcdserver/v3_server.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,6 @@ import (
3333
)
3434

3535
const (
36-
// the max request size that raft accepts.
37-
// TODO: make this a flag? But we probably do not want to
38-
// accept large request which might block raft stream. User
39-
// specify a large value might end up with shooting in the foot.
40-
maxRequestBytes = 1.5 * 1024 * 1024
41-
4236
// In the health case, there might be a small gap (10s of entries) between
4337
// the applied index and committed index.
4438
// However, if the committed entries are very heavy to apply, the gap might grow.
@@ -556,7 +550,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
556550
return nil, err
557551
}
558552

559-
if len(data) > maxRequestBytes {
553+
if len(data) > int(s.Cfg.MaxRequestBytes) {
560554
return nil, ErrRequestTooLarge
561555
}
562556

0 commit comments

Comments
 (0)