Skip to content

Commit 0d0ef62

Browse files
Add expectations
Need to go back and look at a simpler pattern - have the index report trigger the reaction?
1 parent 1225e7e commit 0d0ef62

File tree

3 files changed

+151
-153
lines changed

3 files changed

+151
-153
lines changed

pkg/image/controller/trigger/customtrigger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type BuildConfigReaction struct {
6767
Instantiator BuildConfigInstantiator
6868
}
6969

70-
func (r *BuildConfigReaction) ImageChanged(obj interface{}, is *imageapi.ImageStream, operations []TriggerOperation) error {
70+
func (r *BuildConfigReaction) ImageChanged(obj interface{}, operations []TriggerOperation) error {
7171
bc := obj.(*buildapi.BuildConfig)
7272
var (
7373
changed bool
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package trigger
2+
3+
import "sync"
4+
5+
// Expectations track the tags that have been requested for resync on image streams.
6+
// Using expectations allows fan out of trigger checks to be minimized when only a
7+
// few tags have changed
8+
type Expectations interface {
9+
// Expect indicates that a change to the image stream is expected. If tags is empty,
10+
// a full image stream resync is expected. If tags are specified, a resync may not
11+
// be necessary.
12+
Expect(imageStreamKey string, tags ...ExpectedTags)
13+
// Satisfy returns the list of tags that are expected to change. If no expectations
14+
// have been registered, all tags should be checked.
15+
Satisfy(imageStreamKey string) []ExpectedTags
16+
}
17+
18+
type ExpectedTags struct {
19+
Tag string
20+
TriggerKey string
21+
}
22+
23+
func newExpectations() Expectations {
24+
return &expectedImageStreams{
25+
streams: make(map[string][]ExpectedTags),
26+
limit: 10,
27+
}
28+
}
29+
30+
type expectedImageStreams struct {
31+
lock sync.Mutex
32+
streams map[string][]ExpectedTags
33+
limit int
34+
}
35+
36+
func (e *expectedImageStreams) Expect(key string, expected ...ExpectedTags) {
37+
e.lock.Lock()
38+
defer e.lock.Unlock()
39+
40+
if len(expected) == 0 {
41+
e.streams[key] = nil
42+
return
43+
}
44+
tags, ok := e.streams[key]
45+
if !ok {
46+
e.streams[key] = expected
47+
return
48+
}
49+
if len(tags) == 0 {
50+
return
51+
}
52+
if (len(tags) + len(expected)) > e.limit {
53+
e.streams[key] = nil
54+
return
55+
}
56+
for _, tag := range expected {
57+
tags = append(tags, tag)
58+
}
59+
e.streams[key] = tags
60+
}
61+
62+
func (e *expectedImageStreams) Satisfy(key string) []ExpectedTags {
63+
e.lock.Lock()
64+
defer e.lock.Unlock()
65+
tags, ok := e.streams[key]
66+
if ok {
67+
delete(e.streams, key)
68+
}
69+
return tags
70+
}

pkg/image/controller/trigger/trigger.go

Lines changed: 80 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const (
2929
maxRetries = 5
3030
)
3131

32-
type ReactionFunc func(obj interface{}, is *imageapi.ImageStream, operations []TriggerOperation) error
32+
type ReactionFunc func(obj interface{}, operations []TriggerOperation) error
3333

3434
type TriggerController struct {
3535
client kcoreclient.CoreInterface
@@ -46,8 +46,10 @@ type TriggerController struct {
4646
// lister can list/get image streams from the shared informer's store
4747
lister ImageStreamLister
4848

49-
// Deployments that need to be synced
49+
// queue is the list of image stream keys that must be synced.
5050
queue workqueue.RateLimitingInterface
51+
// expectations indicate the set of tags that may be changing.
52+
expectations Expectations
5153

5254
// syncs are the items that must return true before the queue can be processed
5355
syncs []cache.InformerSynced
@@ -95,6 +97,7 @@ func NewTriggerController(client kcoreclient.CoreInterface, is ImageStreamInform
9597
client: client,
9698
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "image-trigger-controller"}),
9799
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "image-trigger"),
100+
expectations: newExpectations(),
98101
}
99102

100103
c.syncHandler = c.syncImageStream
@@ -146,6 +149,7 @@ func (c *TriggerController) enqueue(is *imageapi.ImageStream) {
146149
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", is, err))
147150
return
148151
}
152+
c.expectations.Expect(key)
149153
c.queue.Add(key)
150154
}
151155

@@ -214,6 +218,28 @@ type TriggerOperation struct {
214218
Trigger *trigger.ObjectFieldTrigger
215219
}
216220

221+
type operationMap map[string][]TriggerOperation
222+
223+
func applyTriggersToOperations(operations operationMap, entry *TriggerCacheEntry, name, tag, ref, id, namespace string) operationMap {
224+
for i, trigger := range entry.Triggers {
225+
if trigger.From.Name != name || trigger.From.Kind != "ImageStreamTag" || trigger.From.APIVersion != "" || trigger.Paused {
226+
continue
227+
}
228+
if operations == nil {
229+
operations = make(map[string][]TriggerOperation)
230+
}
231+
operations[entry.Key] = append(operations[entry.Key], TriggerOperation{
232+
Name: name,
233+
Tag: tag,
234+
Ref: ref,
235+
ID: id,
236+
Namespace: namespace,
237+
Trigger: &entry.Triggers[i],
238+
})
239+
}
240+
return operations
241+
}
242+
217243
// syncImageStream will sync the image stream with the given key.
218244
// This function is not meant to be invoked concurrently with the same key.
219245
func (c *TriggerController) syncImageStream(key string) error {
@@ -223,19 +249,12 @@ func (c *TriggerController) syncImageStream(key string) error {
223249
glog.V(4).Infof("Finished syncing image stream %q (%v)", key, time.Now().Sub(startTime))
224250
}()
225251

226-
namespace, name, err := cache.SplitMetaNamespaceKey(key)
227-
if err != nil {
228-
return err
229-
}
252+
tags := c.expectations.Satisfy(key)
230253

231-
triggered, err := c.triggerCache.ByIndex("images", key)
254+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
232255
if err != nil {
233256
return err
234257
}
235-
if len(triggered) == 0 {
236-
return nil
237-
}
238-
239258
is, err := c.lister.ImageStreams(namespace).Get(name)
240259
if errors.IsNotFound(err) {
241260
glog.V(4).Infof("Image stream %v has been deleted", key)
@@ -246,38 +265,61 @@ func (c *TriggerController) syncImageStream(key string) error {
246265
return err
247266
}
248267

249-
var operations map[string][]TriggerOperation
250-
for tag, v := range is.Status.Tags {
251-
if len(v.Items) == 0 {
252-
continue
268+
// the operations we're expected to perform
269+
var operations operationMap
270+
271+
if len(tags) > 0 {
272+
// check only the provided tags against the latest value of the trigger cache
273+
for _, expected := range tags {
274+
tag := expected.Tag
275+
v := is.Status.Tags[tag]
276+
if len(v.Items) == 0 {
277+
continue
278+
}
279+
280+
// TODO: is this possibly a race?
281+
item, ok := c.triggerCache.Get(expected.TriggerKey)
282+
if !ok {
283+
continue
284+
}
285+
286+
ref, ok := imageapi.ResolveTagReference(is, tag, &v.Items[0])
287+
if !ok {
288+
continue
289+
}
290+
name := imageapi.JoinImageStreamTag(name, tag)
291+
292+
entry := item.(*TriggerCacheEntry)
293+
operations = applyTriggersToOperations(operations, entry, name, tag, ref, v.Items[0].Image, namespace)
253294
}
254-
ref, ok := imageapi.ResolveTagReference(is, tag, &v.Items[0])
255-
if !ok {
256-
continue
295+
296+
} else {
297+
// check everything against all triggers
298+
triggered, err := c.triggerCache.ByIndex("images", key)
299+
if err != nil {
300+
return err
257301
}
258-
name := imageapi.JoinImageStreamTag(name, tag)
259-
for _, t := range triggered {
260-
entry := t.(*TriggerCacheEntry)
261-
for i, trigger := range entry.Triggers {
262-
if trigger.From.Name != name || trigger.From.Kind != "ImageStreamTag" || trigger.From.APIVersion != "" || trigger.Paused {
263-
continue
264-
}
265-
if operations == nil {
266-
operations = make(map[string][]TriggerOperation)
267-
}
268-
operations[entry.Key] = append(operations[entry.Key], TriggerOperation{
269-
Name: name,
270-
Tag: tag,
271-
Ref: ref,
272-
ID: v.Items[0].Image,
273-
Namespace: namespace,
274-
Trigger: &entry.Triggers[i],
275-
})
302+
if len(triggered) == 0 {
303+
return nil
304+
}
305+
306+
for tag, v := range is.Status.Tags {
307+
if len(v.Items) == 0 {
308+
continue
309+
}
310+
ref, ok := imageapi.ResolveTagReference(is, tag, &v.Items[0])
311+
if !ok {
312+
continue
313+
}
314+
name := imageapi.JoinImageStreamTag(name, tag)
315+
for _, t := range triggered {
316+
entry := t.(*TriggerCacheEntry)
317+
operations = applyTriggersToOperations(operations, entry, name, tag, ref, v.Items[0].Image, namespace)
276318
}
277319
}
278320
}
279321

280-
// TODO: separate worker pool
322+
// TODO: separate worker pool?
281323
var failures bool
282324
for key, ops := range operations {
283325
parts := strings.SplitN(key, "/", 2)
@@ -292,7 +334,7 @@ func (c *TriggerController) syncImageStream(key string) error {
292334
// TODO: record expectation?
293335
continue
294336
}
295-
if err := source.ReactionFn(obj, is, ops); err != nil {
337+
if err := source.ReactionFn(obj, ops); err != nil {
296338
// TODO: enqueue again
297339
utilruntime.HandleError(fmt.Errorf("unable to react to new image for %s %s from store: %v", parts[0], parts[1], err))
298340
failures = true
@@ -305,118 +347,4 @@ func (c *TriggerController) syncImageStream(key string) error {
305347
}
306348

307349
return nil
308-
309-
// // Deep-copy otherwise we are mutating our cache.
310-
// // TODO: Deep-copy only when needed.
311-
// d, err := util.DeploymentDeepCopy(deployment)
312-
// if err != nil {
313-
// return err
314-
// }
315-
316-
// everything := metav1.LabelSelector{}
317-
// if reflect.DeepEqual(d.Spec.Selector, &everything) {
318-
// c.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
319-
// if d.Status.ObservedGeneration < d.Generation {
320-
// d.Status.ObservedGeneration = d.Generation
321-
// c.client.Extensions().Deployments(d.Namespace).UpdateStatus(d)
322-
// }
323-
// return nil
324-
// }
325-
326-
// deployments, err := c.dLister.Deployments(d.Namespace).List(labels.Everything())
327-
// if err != nil {
328-
// return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
329-
// }
330-
331-
// // Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets.
332-
// overlaps, err := c.handleOverlap(d, deployments)
333-
// if err != nil {
334-
// if overlaps {
335-
// // Emit an event and return a nil error for overlapping deployments so we won't resync them again.
336-
// c.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectorOverlap", err.Error())
337-
// return nil
338-
// }
339-
// // For any other failure, we should retry the deployment.
340-
// return err
341-
// }
342-
343-
// if d.DeletionTimestamp != nil {
344-
// return c.syncStatusOnly(d)
345-
// }
346-
347-
// // Why run the cleanup policy only when there is no rollback request?
348-
// // The thing with the cleanup policy currently is that it is far from smart because it takes into account
349-
// // the latest replica sets while it should instead retain the latest *working* replica sets. This means that
350-
// // you can have a cleanup policy of 1 but your last known working replica set may be 2 or 3 versions back
351-
// // in the history.
352-
// // Eventually we will want to find a way to recognize replica sets that have worked at some point in time
353-
// // (and chances are higher that they will work again as opposed to others that didn't) for candidates to
354-
// // automatically roll back to (#23211) and the cleanup policy should help.
355-
// if d.Spec.RollbackTo == nil {
356-
// _, oldRSs, err := c.getAllReplicaSetsAndSyncRevision(d, false)
357-
// if err != nil {
358-
// return err
359-
// }
360-
// // So far the cleanup policy was executed once a deployment was paused, scaled up/down, or it
361-
// // succesfully completed deploying a replica set. Decouple it from the strategies and have it
362-
// // run almost unconditionally - cleanupDeployment is safe by default.
363-
// c.cleanupDeployment(oldRSs, d)
364-
// }
365-
366-
// err = c.claimReplicaSets(d)
367-
// if err != nil {
368-
// return err
369-
// }
370-
371-
// // Update deployment conditions with an Unknown condition when pausing/resuming
372-
// // a deployment. In this way, we can be sure that we won't timeout when a user
373-
// // resumes a Deployment with a set progressDeadlineSeconds.
374-
// if err = c.checkPausedConditions(d); err != nil {
375-
// return err
376-
// }
377-
378-
// _, err = c.hasFailed(d)
379-
// if err != nil {
380-
// return err
381-
// }
382-
// // TODO: Automatically rollback here if we failed above. Locate the last complete
383-
// // revision and populate the rollback spec with it.
384-
// // See https://github.com/kubernetes/kubernetes/issues/23211.
385-
386-
// if d.Spec.Paused {
387-
// return c.sync(d)
388-
// }
389-
390-
// if d.Spec.RollbackTo != nil {
391-
// revision := d.Spec.RollbackTo.Revision
392-
// if d, err = c.rollback(d, &revision); err != nil {
393-
// return err
394-
// }
395-
// }
396-
397-
// scalingEvent, err := c.isScalingEvent(d)
398-
// if err != nil {
399-
// return err
400-
// }
401-
// if scalingEvent {
402-
// return c.sync(d)
403-
// }
404-
405-
// switch d.Spec.Strategy.Type {
406-
// case extensions.RecreateDeploymentStrategyType:
407-
// return c.rolloutRecreate(d)
408-
// case extensions.RollingUpdateDeploymentStrategyType:
409-
// return c.rolloutRolling(d)
410-
// }
411-
// return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
412350
}
413-
414-
// func startReplicationController(ctx ControllerContext) (bool, error) {
415-
// go replicationcontroller.NewReplicationManager(
416-
// ctx.InformerFactory.Core().V1().Pods(),
417-
// ctx.InformerFactory.Core().V1().ReplicationControllers(),
418-
// ctx.ClientBuilder.ClientOrDie("replication-controller"),
419-
// replicationcontroller.BurstReplicas,
420-
// ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
421-
// return true, nil
422-
// }

0 commit comments

Comments
 (0)