1
1
package restoptions
2
2
3
3
import (
4
- "fmt"
5
- "strconv"
6
- "strings"
7
4
"sync"
8
5
9
- "k8s.io/apimachinery/pkg/runtime"
6
+ "github.com/golang/glog"
7
+
10
8
"k8s.io/apimachinery/pkg/runtime/schema"
11
- kerrors "k8s.io/apimachinery/pkg/util/errors"
12
9
"k8s.io/apiserver/pkg/registry/generic"
13
10
"k8s.io/apiserver/pkg/registry/generic/registry"
11
+ "k8s.io/apiserver/pkg/server/options"
14
12
serverstorage "k8s.io/apiserver/pkg/server/storage"
15
- "k8s.io/apiserver/pkg/storage"
16
- "k8s.io/apiserver/pkg/storage/storagebackend"
17
- "k8s.io/apiserver/pkg/storage/storagebackend/factory"
18
13
19
- "github.com/golang/glog"
20
14
configapi "github.com/openshift/origin/pkg/cmd/server/api"
21
15
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
22
16
)
@@ -41,7 +35,6 @@ type configRESTOptionsGetter struct {
41
35
}
42
36
43
37
// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
44
- // By default, the etcd watch cache is enabled with a size of 1000 per resource type.
45
38
// TODO: this class should either not need to know about configapi.MasterConfig, or not be in pkg/util
46
39
func NewConfigGetter (masterOptions configapi.MasterConfig , defaultResourceConfig * serverstorage.ResourceConfig , resourcePrefixOverrides map [schema.GroupResource ]string , enforcedStorageVersions map [schema.GroupResource ]schema.GroupVersion , quorumResources map [schema.GroupResource ]struct {}) (Getter , error ) {
47
40
apiserverOptions , err := kubernetes .BuildKubeAPIserverOptions (masterOptions )
@@ -55,27 +48,24 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
55
48
storageFactory .DefaultResourcePrefixes = resourcePrefixOverrides
56
49
storageFactory .StorageConfig .Prefix = masterOptions .EtcdStorageConfig .OpenShiftStoragePrefix
57
50
58
- // TODO: refactor vendor/k8s.io/kubernetes/pkg/registry/cachesize to remove our custom cache size code
59
- errs := [] error {}
60
- cacheSizes := map [schema. GroupResource ] int {}
61
- for _ , c := range apiserverOptions .GenericServerRunOptions .WatchCacheSizes {
62
- tokens := strings . Split ( c , "#" )
63
- if len ( tokens ) != 2 {
64
- errs = append ( errs , fmt . Errorf ( "invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)" , c ))
65
- continue
51
+ // perform watch cache heuristic like upstream
52
+ if apiserverOptions . Etcd . EnableWatchCache {
53
+ glog . V ( 2 ). Infof ( "Initializing cache sizes based on %dMB limit" , apiserverOptions . GenericServerRunOptions . TargetRAMMB )
54
+ sizes := newHeuristicWatchCacheSizes ( apiserverOptions .GenericServerRunOptions .TargetRAMMB )
55
+ if userSpecified , err := options . ParseWatchCacheSizes ( apiserverOptions . Etcd . WatchCacheSizes ); err == nil {
56
+ for resource , size := range userSpecified {
57
+ sizes [ resource ] = size
58
+ }
66
59
}
67
-
68
- resource := schema .ParseGroupResource (tokens [0 ])
69
-
70
- size , err := strconv .Atoi (tokens [1 ])
60
+ apiserverOptions .Etcd .WatchCacheSizes , err = options .WriteWatchCacheSizes (sizes )
71
61
if err != nil {
72
- errs = append (errs , fmt .Errorf ("invalid watch cache size value '%s': %v" , c , err ))
73
- continue
62
+ return nil , err
74
63
}
75
- cacheSizes [resource ] = size
76
64
}
77
- if len (errs ) > 0 {
78
- return nil , kerrors .NewAggregate (errs )
65
+
66
+ cacheSizes , err := options .ParseWatchCacheSizes (apiserverOptions .Etcd .WatchCacheSizes )
67
+ if err != nil {
68
+ return nil , err
79
69
}
80
70
81
71
return & configRESTOptionsGetter {
@@ -108,41 +98,14 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
108
98
config .Quorum = true
109
99
}
110
100
111
- configuredCacheSize , specified := g .cacheSizes [resource ]
112
- if ! specified || configuredCacheSize < 0 {
113
- configuredCacheSize = g .defaultCacheSize
114
- }
115
- storageWithCacher := registry .StorageWithCacher (configuredCacheSize )
116
-
117
- decorator := func (
118
- copier runtime.ObjectCopier ,
119
- storageConfig * storagebackend.Config ,
120
- requestedSize * int ,
121
- objectType runtime.Object ,
122
- resourcePrefix string ,
123
- keyFunc func (obj runtime.Object ) (string , error ),
124
- newListFn func () runtime.Object ,
125
- getAttrsFunc storage.AttrFunc ,
126
- triggerFn storage.TriggerPublisherFunc ,
127
- ) (storage.Interface , factory.DestroyFunc ) {
128
- // use the origin default cache size, not the one in registry.StorageWithCacher
129
- capacity := & configuredCacheSize
130
- if requestedSize != nil {
131
- capacity = requestedSize
132
- }
133
-
134
- if * capacity == 0 || ! g .cacheEnabled {
135
- glog .V (5 ).Infof ("using uncached watch storage for %s (quorum=%t)" , resource .String (), storageConfig .Quorum )
136
- return generic .UndecoratedStorage (copier , storageConfig , capacity , objectType , resourcePrefix , keyFunc , newListFn , getAttrsFunc , triggerFn )
137
- }
138
-
139
- glog .V (5 ).Infof ("using watch cache storage (capacity=%v, quorum=%t) for %s %#v" , * capacity , storageConfig .Quorum , resource .String (), storageConfig )
140
- return storageWithCacher (copier , storageConfig , capacity , objectType , resourcePrefix , keyFunc , newListFn , getAttrsFunc , triggerFn )
101
+ cacheSize , ok := g .cacheSizes [resource ]
102
+ if ! ok {
103
+ cacheSize = g .defaultCacheSize
141
104
}
142
105
143
106
resourceOptions := generic.RESTOptions {
144
107
StorageConfig : config ,
145
- Decorator : decorator ,
108
+ Decorator : registry . StorageWithCacher ( cacheSize ) ,
146
109
DeleteCollectionWorkers : g .deleteCollectionWorkers ,
147
110
EnableGarbageCollection : g .enableGarbageCollection ,
148
111
ResourcePrefix : g .storageFactory .ResourcePrefix (resource ),
@@ -151,3 +114,25 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
151
114
152
115
return resourceOptions , nil
153
116
}
117
+
118
+ // newHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
119
+ // memory. It reuses the upstream heuristic and adds OpenShift specific resources.
120
+ func newHeuristicWatchCacheSizes (expectedRAMCapacityMB int ) map [schema.GroupResource ]int {
121
+ // TODO: Revisit this heuristic, copied from upstream
122
+ clusterSize := expectedRAMCapacityMB / 60
123
+
124
+ // default enable watch caches for resources that will have a high number of clients accessing it
125
+ // and where the write rate may be significant
126
+ watchCacheSizes := make (map [schema.GroupResource ]int )
127
+ watchCacheSizes [schema.GroupResource {Group : "network.openshift.io" , Resource : "hostsubnets" }] = maxInt (5 * clusterSize , 100 )
128
+ watchCacheSizes [schema.GroupResource {Group : "network.openshift.io" , Resource : "netnamespaces" }] = maxInt (5 * clusterSize , 100 )
129
+ watchCacheSizes [schema.GroupResource {Group : "network.openshift.io" , Resource : "egressnetworkpolicies" }] = maxInt (10 * clusterSize , 100 )
130
+ return watchCacheSizes
131
+ }
132
+
133
+ func maxInt (a , b int ) int {
134
+ if a > b {
135
+ return a
136
+ }
137
+ return b
138
+ }
0 commit comments