Skip to content

Commit 423e664

Browse files
Merge pull request #15569 from dmage/maxconnections
Automatic merge from submit-queue Add limit for number of concurrent connections to registry The registry might have excessive resource usage under heavy load. To avoid this, we limit the number of concurrent requests. Requests over the MaxRunning limit are enqueued. Requests are rejected if there are MaxInQueue requests in the queue. Request may stay in the queue no more than MaxWaitInQueue. See also #15448.
2 parents 01e5a1b + 949a64e commit 423e664

File tree

10 files changed

+573
-3
lines changed

10 files changed

+573
-3
lines changed

images/dockerregistry/config.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,18 @@ openshift:
4343
# Attention! A weak secret can lead to the leakage of private data.
4444
#
4545
# secret: TopSecretLongToken
46+
requests:
47+
# GET and HEAD requests
48+
read:
49+
# maxrunning is a limit for the number of in-flight requests. A zero value means there is no limit.
50+
maxrunning: 0
51+
# maxinqueue sets the maximum number of requests that can be queued if the limit for the number of in-flight requests is reached.
52+
maxinqueue: 0
53+
# maxwaitinqueue is how long a request can wait in the queue. A zero value means it can wait forever.
54+
maxwaitinqueue: 0
55+
# PUT, PATCH, POST, DELETE requests and internal mirroring requests
56+
write:
57+
# See description of openshift.requests.read.
58+
maxrunning: 0
59+
maxinqueue: 0
60+
maxwaitinqueue: 0

pkg/cmd/dockerregistry/dockerregistry.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/openshift/origin/pkg/dockerregistry/server/api"
4949
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
5050
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
51+
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
5152
"github.com/openshift/origin/pkg/dockerregistry/server/prune"
5253
"github.com/openshift/origin/pkg/version"
5354
)
@@ -155,6 +156,10 @@ func Execute(configFile io.Reader) {
155156
registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())
156157
ctx = server.WithRegistryClient(ctx, registryClient)
157158

159+
readLimiter := newLimiter(extraConfig.Requests.Read)
160+
writeLimiter := newLimiter(extraConfig.Requests.Write)
161+
ctx = server.WithWriteLimiter(ctx, writeLimiter)
162+
158163
log.WithFields(versionFields()).Info("start registry")
159164
// inject a logger into the uuid library. warns us if there is a problem
160165
// with uuid generation under low entropy.
@@ -229,7 +234,9 @@ func Execute(configFile io.Reader) {
229234
app.Config.HTTP.Headers.Set("X-Registry-Supports-Signatures", "1")
230235

231236
app.RegisterHealthChecks()
232-
handler := alive("/", app)
237+
handler := http.Handler(app)
238+
handler = limit(readLimiter, writeLimiter, handler)
239+
handler = alive("/", handler)
233240
// TODO: temporarily keep for backwards compatibility; remove in the future
234241
handler = alive("/healthz", handler)
235242
handler = health.Handler(handler)
@@ -368,6 +375,34 @@ func logLevel(level configuration.Loglevel) log.Level {
368375
return l
369376
}
370377

378+
func newLimiter(c registryconfig.RequestsLimits) maxconnections.Limiter {
379+
if c.MaxRunning <= 0 {
380+
return nil
381+
}
382+
return maxconnections.NewLimiter(c.MaxRunning, c.MaxInQueue, c.MaxWaitInQueue)
383+
}
384+
385+
func limit(readLimiter, writeLimiter maxconnections.Limiter, handler http.Handler) http.Handler {
386+
readHandler := handler
387+
if readLimiter != nil {
388+
readHandler = maxconnections.New(readLimiter, readHandler)
389+
}
390+
391+
writeHandler := handler
392+
if writeLimiter != nil {
393+
writeHandler = maxconnections.New(writeLimiter, writeHandler)
394+
}
395+
396+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
397+
switch strings.ToUpper(r.Method) {
398+
case "GET", "HEAD", "OPTIONS":
399+
readHandler.ServeHTTP(w, r)
400+
default:
401+
writeHandler.ServeHTTP(w, r)
402+
}
403+
})
404+
}
405+
371406
// alive simply wraps the handler with a route that always returns an http 200
372407
// response when the path is matched. If the path is not matched, the request
373408
// is passed to the provided handler. There is no guarantee of anything but

pkg/dockerregistry/server/configuration/configuration.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"reflect"
1010
"strings"
11+
"time"
1112

1213
"gopkg.in/yaml.v2"
1314

@@ -26,15 +27,27 @@ type openshiftConfig struct {
2627
}
2728

2829
type Configuration struct {
29-
Version configuration.Version `yaml:"version"`
30-
Metrics Metrics `yaml:"metrics"`
30+
Version configuration.Version `yaml:"version"`
31+
Metrics Metrics `yaml:"metrics"`
32+
Requests Requests `yaml:"requests"`
3133
}
3234

3335
type Metrics struct {
3436
Enabled bool `yaml:"enabled"`
3537
Secret string `yaml:"secret"`
3638
}
3739

40+
type Requests struct {
41+
Read RequestsLimits
42+
Write RequestsLimits
43+
}
44+
45+
type RequestsLimits struct {
46+
MaxRunning int
47+
MaxInQueue int
48+
MaxWaitInQueue time.Duration
49+
}
50+
3851
type versionInfo struct {
3952
Openshift struct {
4053
Version *configuration.Version

pkg/dockerregistry/server/context.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/openshift/origin/pkg/client"
77
"github.com/openshift/origin/pkg/dockerregistry/server/configuration"
8+
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
89
)
910

1011
type contextKey string
@@ -20,6 +21,9 @@ const (
2021
// registryClientKey is the key for RegistryClient values in Contexts.
2122
registryClientKey contextKey = "registryClient"
2223

24+
// writeLimiterKey is the key for write limiters in Contexts.
25+
writeLimiterKey contextKey = "writeLimiter"
26+
2327
// userClientKey is the key for a origin's client with the current user's
2428
// credentials in Contexts.
2529
userClientKey contextKey = "userClient"
@@ -71,6 +75,17 @@ func RegistryClientFrom(ctx context.Context) RegistryClient {
7175
return ctx.Value(registryClientKey).(RegistryClient)
7276
}
7377

78+
// WithWriteLimiter returns a new Context with a write limiter.
79+
func WithWriteLimiter(ctx context.Context, writeLimiter maxconnections.Limiter) context.Context {
80+
return context.WithValue(ctx, writeLimiterKey, writeLimiter)
81+
}
82+
83+
// WriteLimiterFrom returns the write limiter if one is stored in ctx, or nil otherwise.
84+
func WriteLimiterFrom(ctx context.Context) maxconnections.Limiter {
85+
writeLimiter, _ := ctx.Value(writeLimiterKey).(maxconnections.Limiter)
86+
return writeLimiter
87+
}
88+
7489
// withUserClient returns a new Context with the origin's client.
7590
// This client should have the current user's credentials
7691
func withUserClient(parent context.Context, userClient client.Interface) context.Context {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package maxconnections
2+
3+
import (
4+
"reflect"
5+
"sync"
6+
"testing"
7+
)
8+
9+
type countM map[interface{}]int
10+
11+
type counter struct {
12+
mu sync.Mutex
13+
m countM
14+
}
15+
16+
func newCounter() *counter {
17+
return &counter{
18+
m: make(countM),
19+
}
20+
}
21+
22+
func (c *counter) Add(key interface{}, delta int) {
23+
c.mu.Lock()
24+
defer c.mu.Unlock()
25+
c.m[key] += delta
26+
}
27+
28+
func (c *counter) Values() countM {
29+
c.mu.Lock()
30+
defer c.mu.Unlock()
31+
m := make(map[interface{}]int)
32+
for k, v := range c.m {
33+
m[k] = v
34+
}
35+
return m
36+
}
37+
38+
func (c *counter) Equal(m countM) bool {
39+
c.mu.Lock()
40+
defer c.mu.Unlock()
41+
for k, v := range m {
42+
if c.m[k] != v {
43+
return false
44+
}
45+
}
46+
for k, v := range c.m {
47+
if _, ok := m[k]; !ok && v != 0 {
48+
return false
49+
}
50+
}
51+
return true
52+
}
53+
54+
func TestCounter(t *testing.T) {
55+
c := newCounter()
56+
c.Add(100, 1)
57+
c.Add(200, 2)
58+
c.Add(300, 3)
59+
if expected := (countM{100: 1, 200: 2, 300: 3}); !reflect.DeepEqual(c.m, expected) {
60+
t.Fatalf("c.m = %v, want %v", c.m, expected)
61+
}
62+
if expected := (countM{100: 1, 200: 2, 300: 3}); !c.Equal(expected) {
63+
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
64+
}
65+
66+
c.Add(200, -2)
67+
if expected := (countM{100: 1, 200: 0, 300: 3}); !c.Equal(expected) {
68+
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
69+
}
70+
if expected := (countM{100: 1, 300: 3}); !c.Equal(expected) {
71+
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
72+
}
73+
if expected := (countM{100: 1, 300: 3, 400: 0}); !c.Equal(expected) {
74+
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
75+
}
76+
77+
if expected := (countM{100: 1}); c.Equal(expected) {
78+
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
79+
}
80+
if expected := (countM{100: 1, 300: 3, 400: 4}); c.Equal(expected) {
81+
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
82+
}
83+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package maxconnections
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// A Limiter controls starting of jobs.
9+
type Limiter interface {
10+
// Start decides whether a new job can be started. The decision may be
11+
// returned after a delay if the limiter wants to throttle jobs.
12+
Start(context.Context) bool
13+
14+
// Done must be called when a job is finished.
15+
Done()
16+
}
17+
18+
// limiter ensures that there are no more than maxRunning jobs at the same
19+
// time. It can enqueue up to maxInQueue jobs awaiting to be run, for other
20+
// jobs Start will return false immediately.
21+
type limiter struct {
22+
// running is a buffered channel. Before starting a job, an empty struct is
23+
// sent to the channel. When the job is finished, one element is received
24+
// back from the channel. If the channel's buffer is full, the job is
25+
// enqueued.
26+
running chan struct{}
27+
28+
// queue is a buffered channel. An empty struct is placed into the channel
29+
// while a job is waiting for a spot in the running channel's buffer.
30+
// If the queue channel's buffer is full, the job is declined.
31+
queue chan struct{}
32+
33+
// maxWaitInQueue is a maximum wait time in the queue, zero means forever.
34+
maxWaitInQueue time.Duration
35+
36+
// newTimer allows to override the function time.NewTimer for tests.
37+
newTimer func(d time.Duration) *time.Timer
38+
}
39+
40+
// NewLimiter return a limiter that allows no more than maxRunning jobs at the
41+
// same time. It can enqueue up to maxInQueue jobs awaiting to be run, and a
42+
// job may wait in the queue no more than maxWaitInQueue.
43+
func NewLimiter(maxRunning, maxInQueue int, maxWaitInQueue time.Duration) Limiter {
44+
return &limiter{
45+
running: make(chan struct{}, maxRunning),
46+
queue: make(chan struct{}, maxInQueue),
47+
maxWaitInQueue: maxWaitInQueue,
48+
newTimer: time.NewTimer,
49+
}
50+
}
51+
52+
func (l *limiter) Start(ctx context.Context) bool {
53+
select {
54+
case l.running <- struct{}{}:
55+
return true
56+
default:
57+
}
58+
59+
// Slow-path.
60+
select {
61+
case l.queue <- struct{}{}:
62+
defer func() {
63+
<-l.queue
64+
}()
65+
default:
66+
return false
67+
}
68+
69+
var timeout <-chan time.Time
70+
// if l.maxWaitInQueue is 0, timeout will stay nil which practically means wait forever.
71+
if l.maxWaitInQueue > 0 {
72+
timer := l.newTimer(l.maxWaitInQueue)
73+
defer timer.Stop()
74+
timeout = timer.C
75+
}
76+
77+
select {
78+
case l.running <- struct{}{}:
79+
return true
80+
case <-timeout:
81+
case <-ctx.Done():
82+
}
83+
return false
84+
}
85+
86+
func (l *limiter) Done() {
87+
<-l.running
88+
}

0 commit comments

Comments
 (0)