Skip to content

Commit 5398c1a

Browse files
committed
deploy: tweak enqueueing in the trigger controller
1 parent 0787d9f commit 5398c1a

File tree

4 files changed

+88
-22
lines changed

4 files changed

+88
-22
lines changed

pkg/cmd/server/origin/run_components.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,11 @@ func (c *MasterConfig) RunDeploymentConfigController() {
354354
// RunDeploymentTriggerController starts the deployment trigger controller process.
355355
func (c *MasterConfig) RunDeploymentTriggerController() {
356356
dcInfomer := c.Informers.DeploymentConfigs().Informer()
357+
rcInformer := c.Informers.ReplicationControllers().Informer()
357358
streamInformer := c.Informers.ImageStreams().Informer()
358359
osclient := c.DeploymentTriggerControllerClient()
359360

360-
controller := triggercontroller.NewDeploymentTriggerController(dcInfomer, streamInformer, osclient, c.ExternalVersionCodec)
361+
controller := triggercontroller.NewDeploymentTriggerController(dcInfomer, rcInformer, streamInformer, osclient, c.ExternalVersionCodec)
361362
go controller.Run(5, utilwait.NeverStop)
362363
}
363364

pkg/deploy/controller/generictrigger/controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package generictrigger
22

33
import (
4+
"k8s.io/kubernetes/pkg/client/cache"
45
"k8s.io/kubernetes/pkg/runtime"
56
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
67
"k8s.io/kubernetes/pkg/util/workqueue"
@@ -20,10 +21,14 @@ type DeploymentTriggerController struct {
2021
// queue contains deployment configs that need to be synced.
2122
queue workqueue.RateLimitingInterface
2223

23-
// dcStore provides a local cache for deployment configs.
24-
dcStore oscache.StoreToDeploymentConfigLister
25-
// dcStoreSynced makes sure the dc store is synced before reconcling any deployment config.
26-
dcStoreSynced func() bool
24+
// dcLister provides a local cache for deployment configs.
25+
dcLister oscache.StoreToDeploymentConfigLister
26+
// dcListerSynced makes sure the dc store is synced before reconcling any deployment config.
27+
dcListerSynced func() bool
28+
// rcLister provides a local cache for replication controllers.
29+
rcLister cache.StoreToReplicationControllerLister
30+
// rcListerSynced makes sure the dc store is synced before reconcling any replication controller.
31+
rcListerSynced func() bool
2732

2833
// codec is used for decoding a config out of a deployment.
2934
codec runtime.Codec

pkg/deploy/controller/generictrigger/controller_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ var (
3434
2*time.Minute,
3535
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
3636
)
37+
rcInformer = framework.NewSharedIndexInformer(
38+
&cache.ListWatch{
39+
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
40+
return (&ktestclient.Fake{}).ReplicationControllers(kapi.NamespaceAll).List(options)
41+
},
42+
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
43+
return (&ktestclient.Fake{}).ReplicationControllers(kapi.NamespaceAll).Watch(options)
44+
},
45+
},
46+
&kapi.ReplicationController{},
47+
2*time.Minute,
48+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
49+
)
3750
streamInformer = framework.NewSharedIndexInformer(
3851
&cache.ListWatch{
3952
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
@@ -54,7 +67,7 @@ var (
5467
func TestHandle_noTriggers(t *testing.T) {
5568
fake := &testclient.Fake{}
5669

57-
controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, codec)
70+
controller := NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer, fake, codec)
5871

5972
config := testapi.OkDeploymentConfig(1)
6073
config.Namespace = kapi.NamespaceDefault
@@ -71,7 +84,7 @@ func TestHandle_noTriggers(t *testing.T) {
7184
func TestHandle_pausedConfig(t *testing.T) {
7285
fake := &testclient.Fake{}
7386

74-
controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, codec)
87+
controller := NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer, fake, codec)
7588

7689
config := testapi.OkDeploymentConfig(1)
7790
config.Namespace = kapi.NamespaceDefault
@@ -95,7 +108,7 @@ func TestHandle_configChangeTrigger(t *testing.T) {
95108
return true, nil, nil
96109
})
97110

98-
controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, codec)
111+
controller := NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer, fake, codec)
99112

100113
config := testapi.OkDeploymentConfig(0)
101114
config.Namespace = kapi.NamespaceDefault
@@ -119,7 +132,7 @@ func TestHandle_imageChangeTrigger(t *testing.T) {
119132
return true, nil, nil
120133
})
121134

122-
controller := NewDeploymentTriggerController(dcInformer, streamInformer, fake, codec)
135+
controller := NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer, fake, codec)
123136

124137
config := testapi.OkDeploymentConfig(0)
125138
config.Namespace = kapi.NamespaceDefault

pkg/deploy/controller/generictrigger/factory.go

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package generictrigger
22

33
import (
4+
"reflect"
45
"time"
56

67
"github.com/golang/glog"
@@ -14,6 +15,7 @@ import (
1415

1516
osclient "github.com/openshift/origin/pkg/client"
1617
deployapi "github.com/openshift/origin/pkg/deploy/api"
18+
deployutil "github.com/openshift/origin/pkg/deploy/util"
1719
imageapi "github.com/openshift/origin/pkg/image/api"
1820
)
1921

@@ -28,7 +30,7 @@ const (
2830
)
2931

3032
// NewDeploymentTriggerController returns a new DeploymentTriggerController.
31-
func NewDeploymentTriggerController(dcInformer, streamInformer framework.SharedIndexInformer, oc osclient.Interface, codec runtime.Codec) *DeploymentTriggerController {
33+
func NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer framework.SharedIndexInformer, oc osclient.Interface, codec runtime.Codec) *DeploymentTriggerController {
3234
c := &DeploymentTriggerController{
3335
dn: oc,
3436

@@ -37,18 +39,19 @@ func NewDeploymentTriggerController(dcInformer, streamInformer framework.SharedI
3739
codec: codec,
3840
}
3941

40-
c.dcStore.Indexer = dcInformer.GetIndexer()
4142
dcInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
4243
AddFunc: c.addDeploymentConfig,
4344
UpdateFunc: c.updateDeploymentConfig,
4445
})
45-
c.dcStoreSynced = dcInformer.HasSynced
46-
4746
streamInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
4847
AddFunc: c.addImageStream,
4948
UpdateFunc: c.updateImageStream,
5049
})
5150

51+
c.dcLister.Indexer = dcInformer.GetIndexer()
52+
c.rcLister.Indexer = rcInformer.GetIndexer()
53+
c.dcListerSynced = dcInformer.HasSynced
54+
c.rcListerSynced = rcInformer.HasSynced
5255
return c
5356
}
5457

@@ -76,8 +79,8 @@ func (c *DeploymentTriggerController) Run(workers int, stopCh <-chan struct{}) {
7679
func (c *DeploymentTriggerController) waitForSyncedStore(ready chan<- struct{}, stopCh <-chan struct{}) {
7780
defer utilruntime.HandleCrash()
7881

79-
for !c.dcStoreSynced() {
80-
glog.V(4).Infof("Waiting for the deployment config cache to sync before starting the trigger controller workers")
82+
for !c.dcListerSynced() || !c.rcListerSynced() {
83+
glog.V(4).Infof("Waiting for the dc and rc caches to sync before starting the trigger controller workers")
8184
select {
8285
case <-time.After(storeSyncedPollPeriod):
8386
case <-stopCh:
@@ -89,24 +92,64 @@ func (c *DeploymentTriggerController) waitForSyncedStore(ready chan<- struct{},
8992

9093
func (c *DeploymentTriggerController) addDeploymentConfig(obj interface{}) {
9194
dc := obj.(*deployapi.DeploymentConfig)
95+
96+
// No need to enqueue deployment configs that have no triggers or are paused.
9297
if len(dc.Spec.Triggers) == 0 || dc.Spec.Paused {
9398
return
9499
}
100+
// We don't want to compete with the main deployment config controller. Let's process this
101+
// config once it's synced.
102+
if !deployutil.HasSynced(dc, dc.Generation) {
103+
return
104+
}
105+
95106
c.enqueueDeploymentConfig(dc)
96107
}
97108

98109
func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{}) {
99-
// If there is no generation update for this deployment config,
100-
// we have a good indication to not enqueue it.
101110
newDc := cur.(*deployapi.DeploymentConfig)
102111
oldDc := old.(*deployapi.DeploymentConfig)
103-
if newDc.Generation == oldDc.Generation {
112+
113+
// A periodic relist will send update events for all known deployment configs.
114+
if newDc.ResourceVersion == oldDc.ResourceVersion {
104115
return
105116
}
106-
117+
// No need to enqueue deployment configs that have no triggers or are paused.
107118
if len(newDc.Spec.Triggers) == 0 || newDc.Spec.Paused {
108119
return
109120
}
121+
// We don't want to compete with the main deployment config controller. Let's process this
122+
// config once it's synced. Note that this does not eliminate conflicts between the two
123+
// controllers because the main controller is constantly updating deployment configs as
124+
// owning replication controllers and pods are updated.
125+
if !deployutil.HasSynced(newDc, newDc.Generation) {
126+
return
127+
}
128+
// Enqueue the deployment config if it hasn't been deployed yet.
129+
if newDc.Status.LatestVersion == 0 {
130+
c.enqueueDeploymentConfig(newDc)
131+
return
132+
}
133+
// Compare deployment config templates before enqueueing. This reduces the amount of times
134+
// we will try to instantiate a deployment config at the expense of duplicating some of the
135+
// work that the instantiate endpoint is already doing but I think this is fine.
136+
shouldInstantiate := true
137+
latestRc, err := c.rcLister.ReplicationControllers(newDc.Namespace).Get(deployutil.LatestDeploymentNameForConfig(newDc))
138+
if err != nil {
139+
// If we get an error here it may be due to the rc cache lagging behind. In such a case
140+
// just defer to the api server (instantiate REST) where we will retry this.
141+
glog.V(2).Infof("Cannot get latest rc for dc %s:%d (%v) - will defer to instantiate", deployutil.LabelForDeploymentConfig(newDc), newDc.Status.LatestVersion, err)
142+
} else {
143+
initial, err := deployutil.DecodeDeploymentConfig(latestRc, c.codec)
144+
if err != nil {
145+
glog.V(2).Infof("Cannot decode dc from replication controller %s: %v", deployutil.LabelForDeployment(latestRc), err)
146+
return
147+
}
148+
shouldInstantiate = !reflect.DeepEqual(newDc.Spec.Template, initial.Spec.Template)
149+
}
150+
if !shouldInstantiate {
151+
return
152+
}
110153

111154
c.enqueueDeploymentConfig(newDc)
112155
}
@@ -115,10 +158,12 @@ func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{
115158
func (c *DeploymentTriggerController) addImageStream(obj interface{}) {
116159
stream := obj.(*imageapi.ImageStream)
117160
glog.V(4).Infof("Image stream %q added.", stream.Name)
118-
dcList, err := c.dcStore.GetConfigsForImageStream(stream)
161+
dcList, err := c.dcLister.GetConfigsForImageStream(stream)
119162
if err != nil {
120163
return
121164
}
165+
// TODO: We could check image stream tags here and enqueue only deployment configs
166+
// with stale lastTriggeredImages.
122167
for _, dc := range dcList {
123168
c.enqueueDeploymentConfig(dc)
124169
}
@@ -134,10 +179,12 @@ func (c *DeploymentTriggerController) updateImageStream(old, cur interface{}) {
134179
}
135180

136181
glog.V(4).Infof("Image stream %q updated.", newStream.Name)
137-
dcList, err := c.dcStore.GetConfigsForImageStream(newStream)
182+
dcList, err := c.dcLister.GetConfigsForImageStream(newStream)
138183
if err != nil {
139184
return
140185
}
186+
// TODO: We could check image stream tags here and enqueue only deployment configs
187+
// with stale lastTriggeredImages.
141188
for _, dc := range dcList {
142189
c.enqueueDeploymentConfig(dc)
143190
}
@@ -183,7 +230,7 @@ func (c *DeploymentTriggerController) work() bool {
183230
}
184231

185232
func (c *DeploymentTriggerController) getByKey(key string) (*deployapi.DeploymentConfig, error) {
186-
obj, exists, err := c.dcStore.Indexer.GetByKey(key)
233+
obj, exists, err := c.dcLister.Indexer.GetByKey(key)
187234
if err != nil {
188235
glog.Infof("Unable to retrieve deployment config %q from store: %v", key, err)
189236
c.queue.Add(key)

0 commit comments

Comments
 (0)