Skip to content

Commit c02131d

Browse files
author
OpenShift Bot
authored
Merge pull request #14052 from sttts/sttts-tri-state-watch-cache-size
Merged by openshift-bot
2 parents 48e5e40 + d0797da commit c02131d

File tree

13 files changed

+230
-72
lines changed

13 files changed

+230
-72
lines changed

pkg/cmd/server/kubernetes/master/master_config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ import (
7070
"github.com/openshift/origin/pkg/version"
7171
)
7272

73+
const DefaultWatchCacheSize = 1000
74+
7375
// request paths that match this regular expression will be treated as long running
7476
// and not subjected to the default server timeout.
7577
const originLongRunningEndpointsRE = "(/|^)(buildconfigs/.*/instantiatebinary|imagestreamimports)$"
@@ -150,6 +152,7 @@ func BuildKubeAPIserverOptions(masterConfig configapi.MasterConfig) (*kapiserver
150152
server.Etcd.StorageConfig.KeyFile = masterConfig.EtcdClientInfo.ClientCert.KeyFile
151153
server.Etcd.StorageConfig.CertFile = masterConfig.EtcdClientInfo.ClientCert.CertFile
152154
server.Etcd.StorageConfig.CAFile = masterConfig.EtcdClientInfo.CA
155+
server.Etcd.DefaultWatchCacheSize = DefaultWatchCacheSize
153156

154157
server.GenericServerRunOptions.MaxRequestsInFlight = masterConfig.ServingInfo.MaxRequestsInFlight
155158
server.GenericServerRunOptions.MinRequestTimeout = masterConfig.ServingInfo.RequestTimeoutSeconds

pkg/cmd/server/kubernetes/master/master_config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestAPIServerDefaults(t *testing.T) {
7878
DeleteCollectionWorkers: 1,
7979
EnableGarbageCollection: true,
8080
EnableWatchCache: true,
81+
DefaultWatchCacheSize: 100,
8182
},
8283
SecureServing: &apiserveroptions.SecureServingOptions{
8384
ServingOptions: apiserveroptions.ServingOptions{

pkg/cmd/server/kubernetes/master/master_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ func TestNewMasterLeasesHasCorrectTTL(t *testing.T) {
2525
}
2626

2727
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
28-
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, 0, nil, "masterleases", nil, nil, nil, nil)
28+
watchCacheDisabled := 0
29+
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, &watchCacheDisabled, nil, "masterleases", nil, nil, nil, nil)
2930
defer server.Terminate(t)
3031

3132
masterLeases := newMasterLeases(storageInterface)

pkg/util/restoptions/configgetter.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import (
2121
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
2222
)
2323

24-
// UseConfiguredCacheSize indicates that the configured cache size should be used
25-
const UseConfiguredCacheSize = -1
26-
2724
// configRESTOptionsGetter provides RESTOptions based on a provided config
2825
type configRESTOptionsGetter struct {
2926
masterOptions configapi.MasterConfig
@@ -84,7 +81,7 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
8481
return &configRESTOptionsGetter{
8582
masterOptions: masterOptions,
8683
cacheEnabled: apiserverOptions.Etcd.EnableWatchCache,
87-
defaultCacheSize: 1000,
84+
defaultCacheSize: apiserverOptions.Etcd.DefaultWatchCacheSize,
8885
cacheSizes: cacheSizes,
8986
restOptionsMap: map[schema.GroupResource]generic.RESTOptions{},
9087
defaultResourceConfig: defaultResourceConfig,
@@ -115,30 +112,32 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
115112
if !specified || configuredCacheSize < 0 {
116113
configuredCacheSize = g.defaultCacheSize
117114
}
115+
storageWithCacher := registry.StorageWithCacher(configuredCacheSize)
118116

119117
decorator := func(
120118
copier runtime.ObjectCopier,
121119
storageConfig *storagebackend.Config,
122-
requestedSize int,
120+
requestedSize *int,
123121
objectType runtime.Object,
124122
resourcePrefix string,
125123
keyFunc func(obj runtime.Object) (string, error),
126124
newListFn func() runtime.Object,
127125
getAttrsFunc storage.AttrFunc,
128126
triggerFn storage.TriggerPublisherFunc,
129127
) (storage.Interface, factory.DestroyFunc) {
130-
capacity := requestedSize
131-
if capacity == UseConfiguredCacheSize {
132-
capacity = configuredCacheSize
128+
// use the origin default cache size, not the one in registry.StorageWithCacher
129+
capacity := &configuredCacheSize
130+
if requestedSize != nil {
131+
capacity = requestedSize
133132
}
134133

135-
if capacity == 0 || !g.cacheEnabled {
134+
if *capacity == 0 || !g.cacheEnabled {
136135
glog.V(5).Infof("using uncached watch storage for %s", resource.String())
137136
return generic.UndecoratedStorage(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
138137
}
139138

140-
glog.V(5).Infof("using watch cache storage (capacity=%d) for %s %#v", capacity, resource.String(), storageConfig)
141-
return registry.StorageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
139+
glog.V(5).Infof("using watch cache storage (capacity=%v) for %s %#v", *capacity, resource.String(), storageConfig)
140+
return storageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
142141
}
143142

144143
resourceOptions := generic.RESTOptions{

test/extended/util/test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,9 @@ var (
293293
`\[Feature:Downgrade\]`,
294294

295295
// upstream flakes
296-
`should provide basic identity`, // Basic StatefulSet functionality
297-
`Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed`, // Basic StatefulSet functionality
298-
`validates resource limits of pods that are allowed to run`, // SchedulerPredicates
299-
`should idle the service and DeploymentConfig properly`, // idling with a single service and DeploymentConfig [Conformance]
296+
`should provide basic identity`, // Basic StatefulSet functionality
297+
`validates resource limits of pods that are allowed to run`, // SchedulerPredicates
298+
`should idle the service and DeploymentConfig properly`, // idling with a single service and DeploymentConfig [Conformance]
300299
}
301300
excludedTestsFilter = regexp.MustCompile(strings.Join(excludedTests, `|`))
302301

test/integration/watch_cache_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package integration
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
"testing"
8+
9+
kerrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"k8s.io/apimachinery/pkg/watch"
13+
apiserveroptions "k8s.io/apiserver/pkg/server/options"
14+
"k8s.io/apiserver/pkg/storage/storagebackend"
15+
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
16+
17+
configapi "github.com/openshift/origin/pkg/cmd/server/api"
18+
serverkube "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
19+
testutil "github.com/openshift/origin/test/util"
20+
testserver "github.com/openshift/origin/test/util/server"
21+
)
22+
23+
func testWatchCacheWithConfig(t *testing.T, master *configapi.MasterConfig, expectedCacheSize, counterExampleCacheSize int) {
24+
clusterAdminKubeConfig, err := testserver.StartConfiguredMasterAPI(master)
25+
if err != nil {
26+
t.Fatalf("unexpected error: %v", err)
27+
}
28+
29+
client, err := testutil.GetClusterAdminKubeClient(clusterAdminKubeConfig)
30+
if err != nil {
31+
t.Fatalf("unexpected error: %v", err)
32+
}
33+
34+
// create client with high burst value, otherwise we can only do 5 changes per second
35+
config, err := testutil.GetClusterAdminClientConfig(clusterAdminKubeConfig)
36+
if err != nil {
37+
t.Fatalf("unexpected error: %v", err)
38+
}
39+
config.Burst = expectedCacheSize
40+
patchClient, err := coreclient.NewForConfig(config)
41+
if err != nil {
42+
t.Fatalf("unexpected error: %v", err)
43+
}
44+
45+
// modify labels of default namespace expectedCacheSize + 1 times
46+
defaultNS, err := client.Core().Namespaces().Get("default", metav1.GetOptions{})
47+
if err != nil {
48+
t.Fatalf("unexpected error: %v", err)
49+
}
50+
startVersion, err := strconv.Atoi(defaultNS.ResourceVersion)
51+
if err != nil {
52+
t.Fatalf("unexpected error: %v", err)
53+
}
54+
for i := 0; i < expectedCacheSize+1; i++ {
55+
for r := 0; r < 10; r++ {
56+
defaultNS, err = patchClient.Namespaces().Patch("default", types.StrategicMergePatchType,
57+
[]byte(fmt.Sprintf(`{"metadata":{"labels":{"test":"%d"}}}`, i)))
58+
if err != nil && !kerrors.IsConflict(err) {
59+
t.Fatalf("unexpected patch error: %v", err)
60+
}
61+
if err == nil {
62+
break
63+
}
64+
}
65+
if err != nil {
66+
t.Fatalf("too many retries: %v", err)
67+
}
68+
}
69+
70+
// do a versioned GET because it force the cache to sync
71+
_, err = client.Core().Namespaces().Get("default", metav1.GetOptions{ResourceVersion: defaultNS.ResourceVersion})
72+
if err != nil {
73+
t.Fatalf("unexpected error: %v", err)
74+
}
75+
76+
// try watch with very old resource version, not really expectedCacheSize versions back (there
77+
// might be other namespace changes which push the default namespace versions out of the cache.
78+
// Also note that the resource versions are global in etcd. So other resources will also lead
79+
// to resource version jumps.
80+
lastVersion, err := strconv.Atoi(defaultNS.ResourceVersion)
81+
if err != nil {
82+
t.Fatalf("unexpected error converting the resource version: %v", err)
83+
}
84+
w, err := client.Core().Namespaces().Watch(metav1.ListOptions{ResourceVersion: strconv.Itoa(lastVersion - (expectedCacheSize-counterExampleCacheSize)/2)})
85+
if err != nil {
86+
t.Fatalf("unexpected error: %v", err)
87+
}
88+
defer w.Stop()
89+
ev := <-w.ResultChan()
90+
if ev.Type == watch.Error {
91+
t.Fatalf("unexpected event of error type: %v", ev)
92+
}
93+
94+
// try watch with an version that is too old
95+
w, err = client.Core().Namespaces().Watch(metav1.ListOptions{ResourceVersion: strconv.Itoa(startVersion - 1)})
96+
if err != nil {
97+
t.Fatalf("unexpected error: %v", err)
98+
}
99+
defer w.Stop()
100+
101+
// the first event will be of error type
102+
goneErrMsg := "too old resource version"
103+
ev = <-w.ResultChan()
104+
if ev.Type != watch.Error {
105+
t.Fatalf("expected an %q error as first event, got: %v", goneErrMsg, ev)
106+
}
107+
status, ok := ev.Object.(*metav1.Status)
108+
if !ok {
109+
t.Fatalf("expected a metav1.Status object in first event, got: %v", ev.Object)
110+
}
111+
if !strings.Contains(status.Message, goneErrMsg) {
112+
t.Fatalf("expected an %q error, got: %v", goneErrMsg, err)
113+
}
114+
}
115+
116+
func TestDefaultWatchCacheSize(t *testing.T) {
117+
testutil.RequireEtcd(t)
118+
defer testutil.DumpEtcdOnFailure(t)
119+
120+
master, err := testserver.DefaultMasterOptions()
121+
if err != nil {
122+
t.Fatalf("unexpected error: %v", err)
123+
}
124+
125+
// test that the origin default really applies and that we don't fall back to kube's default
126+
etcdOptions := apiserveroptions.NewEtcdOptions(&storagebackend.Config{})
127+
kubeDefaultCacheSize := etcdOptions.DefaultWatchCacheSize
128+
if kubeDefaultCacheSize != 100 {
129+
t.Fatalf("upstream DefaultWatchCacheSize changed from 100 to %q", kubeDefaultCacheSize)
130+
}
131+
testWatchCacheWithConfig(t, master, serverkube.DefaultWatchCacheSize, kubeDefaultCacheSize)
132+
}
133+
134+
func TestWatchCacheSizeWithFlag(t *testing.T) {
135+
testutil.RequireEtcd(t)
136+
defer testutil.DumpEtcdOnFailure(t)
137+
138+
master, err := testserver.DefaultMasterOptions()
139+
if err != nil {
140+
t.Fatalf("unexpected error: %v", err)
141+
}
142+
if master.KubernetesMasterConfig.APIServerArguments == nil {
143+
master.KubernetesMasterConfig.APIServerArguments = configapi.ExtendedArguments{}
144+
}
145+
master.KubernetesMasterConfig.APIServerArguments["watch-cache-sizes"] = []string{"namespaces#2000"}
146+
147+
testWatchCacheWithConfig(t, master, 2000, serverkube.DefaultWatchCacheSize)
148+
}

vendor/k8s.io/kubernetes/pkg/registry/cachesize/cachesize.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/storage/storage.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 42 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)