Skip to content

Commit 4668a91

Browse files
Use the route informer to detect conflicts
Because the initial sync of the router may take a long time and we need to detect conflicts caused early in that sync, separate out contention detection into a separate goroutine fed by the informer which looks for repeated changes to a given route's status. The status plugin resets the state to candidate when it makes a write.
1 parent a4e322b commit 4668a91

File tree

6 files changed

+415
-261
lines changed

6 files changed

+415
-261
lines changed

pkg/cmd/infra/router/f5.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,11 @@ func (o *F5RouterOptions) Run() error {
239239
if o.UpdateStatus {
240240
lease := writerlease.New(time.Minute, 3*time.Second)
241241
go lease.Run(wait.NeverStop)
242-
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
242+
informer := factory.CreateRoutesSharedInformer()
243+
tracker := controller.NewSimpleContentionTracker(informer, o.RouterName, o.ResyncInterval/10)
243244
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
244245
go tracker.Run(wait.NeverStop)
245-
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
246+
routeLister := routelisters.NewRouteLister(informer.GetIndexer())
246247
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
247248
recorder = status
248249
plugin = status

pkg/cmd/infra/router/template.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,11 @@ func (o *TemplateRouterOptions) Run() error {
420420
if o.UpdateStatus {
421421
lease := writerlease.New(time.Minute, 3*time.Second)
422422
go lease.Run(wait.NeverStop)
423-
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
423+
informer := factory.CreateRoutesSharedInformer()
424+
tracker := controller.NewSimpleContentionTracker(informer, o.RouterName, o.ResyncInterval/10)
424425
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
425426
go tracker.Run(wait.NeverStop)
426-
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
427+
routeLister := routelisters.NewRouteLister(informer.GetIndexer())
427428
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
428429
recorder = status
429430
plugin = status

pkg/router/controller/contention.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package controller
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/golang/glog"
8+
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/tools/cache"
11+
12+
routeapi "github.com/openshift/origin/pkg/route/apis/route"
13+
)
14+
15+
// ContentionTracker records modifications to a particular entry to prevent endless
16+
// loops when multiple routers are configured with conflicting info. A given router
17+
// process tracks whether the ingress status is change from a correct value to any
18+
// other value (by invoking IsContended when the state has diverged).
19+
type ContentionTracker interface {
20+
// IsContended should be invoked when the state of the object in storage differs
21+
// from the desired state. It will return true if the provided id was recently
22+
// reset from the correct state to an incorrect state. The current ingress is the
23+
// expected state of the object at this time and may be used by the tracker to
24+
// determine if the most recent update was a contention. This method does not
25+
// update the state of the tracker.
26+
IsChangeContended(id string, now time.Time, current *routeapi.RouteIngress) bool
27+
// Clear informs the tracker that the provided ingress state was confirmed to
28+
// match the current state of this process. If a subsequent call to IsChangeContended
29+
// is made within the expiration window, the object will be considered as contended.
30+
Clear(id string, current *routeapi.RouteIngress)
31+
}
32+
33+
type elementState int
34+
35+
const (
36+
stateCandidate elementState = iota
37+
stateContended
38+
)
39+
40+
type trackerElement struct {
41+
at time.Time
42+
state elementState
43+
last *routeapi.RouteIngress
44+
}
45+
46+
// SimpleContentionTracker tracks whether a given identifier is changed from a correct
47+
// state (set by Clear) to an incorrect state (inferred by calling IsContended).
48+
type SimpleContentionTracker struct {
49+
informer cache.SharedInformer
50+
routerName string
51+
52+
expires time.Duration
53+
// maxContentions is the number of contentions detected before the entire
54+
maxContentions int
55+
message string
56+
57+
lock sync.Mutex
58+
contentions int
59+
ids map[string]trackerElement
60+
}
61+
62+
// NewSimpleContentionTracker creates a ContentionTracker that will prevent writing
63+
// to the same route more often than once per interval. A background process will
64+
// periodically flush old entries (at twice interval) in order to prevent the list
65+
// growing unbounded if routes are created and deleted frequently. The informer
66+
// detects changes to ingress records for routerName and will advance the tracker
67+
// state from candidate to contended if the host, wildcardPolicy, or canonical host
68+
// name fields are repeatedly updated.
69+
func NewSimpleContentionTracker(informer cache.SharedInformer, routerName string, interval time.Duration) *SimpleContentionTracker {
70+
return &SimpleContentionTracker{
71+
informer: informer,
72+
routerName: routerName,
73+
expires: interval,
74+
maxContentions: 5,
75+
76+
ids: make(map[string]trackerElement),
77+
}
78+
}
79+
80+
// SetConflictMessage will print message whenever contention with another writer
81+
// is detected.
82+
func (t *SimpleContentionTracker) SetConflictMessage(message string) {
83+
t.lock.Lock()
84+
defer t.lock.Unlock()
85+
t.message = message
86+
}
87+
88+
// Run starts the background cleanup process for expired items.
89+
func (t *SimpleContentionTracker) Run(stopCh <-chan struct{}) {
90+
// Watch the informer for changes to the route ingress we care about (identified
91+
// by router name) and if we see it change remember it. This loop can process
92+
// changes to routes faster than the router, which means it has more up-to-date
93+
// contention info and can detect contention while the main controller is still
94+
// syncing.
95+
t.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
96+
UpdateFunc: func(oldObj, obj interface{}) {
97+
oldRoute, ok := oldObj.(*routeapi.Route)
98+
if !ok {
99+
return
100+
}
101+
route, ok := obj.(*routeapi.Route)
102+
if !ok {
103+
return
104+
}
105+
if ingress := ingressChanged(oldRoute, route, t.routerName); ingress != nil {
106+
t.Changed(string(route.UID), ingress)
107+
}
108+
},
109+
})
110+
111+
// periodically clean up expired changes
112+
ticker := time.NewTicker(t.expires * 2)
113+
defer ticker.Stop()
114+
for {
115+
select {
116+
case <-stopCh:
117+
return
118+
case <-ticker.C:
119+
t.flush()
120+
}
121+
}
122+
}
123+
124+
func (t *SimpleContentionTracker) flush() {
125+
t.lock.Lock()
126+
defer t.lock.Unlock()
127+
128+
// reset conflicts every expiration interval, but remove tracking info less often
129+
contentionExpiration := nowFn().Add(-t.expires)
130+
trackerExpiration := contentionExpiration.Add(-2 * t.expires)
131+
132+
removed := 0
133+
contentions := 0
134+
for id, last := range t.ids {
135+
switch last.state {
136+
case stateContended:
137+
if last.at.Before(contentionExpiration) {
138+
delete(t.ids, id)
139+
removed++
140+
continue
141+
}
142+
contentions++
143+
default:
144+
if last.at.Before(trackerExpiration) {
145+
delete(t.ids, id)
146+
removed++
147+
continue
148+
}
149+
}
150+
}
151+
if t.contentions > 0 && len(t.message) > 0 {
152+
glog.Warning(t.message)
153+
}
154+
glog.V(5).Infof("Flushed contention tracker (%s): %d out of %d removed, %d total contentions", t.expires*2, removed, removed+len(t.ids), t.contentions)
155+
t.contentions = contentions
156+
}
157+
158+
// Changed records that a change to an ingress value was detected. This is called from
159+
// a separate goroutine and may have seen newer events than the current route controller
160+
// plugins, so we don't do direct time comparisons. Instead we count edge transitions on
161+
// a given id.
162+
func (t *SimpleContentionTracker) Changed(id string, current *routeapi.RouteIngress) {
163+
t.lock.Lock()
164+
defer t.lock.Unlock()
165+
166+
// we have detected a sufficient number of conflicts to skip all updates for this interval
167+
if t.contentions > t.maxContentions {
168+
glog.V(4).Infof("Reached max contentions, stop tracking changes")
169+
return
170+
}
171+
172+
// if we have never recorded this object
173+
last, ok := t.ids[id]
174+
if !ok {
175+
t.ids[id] = trackerElement{
176+
at: nowFn().Time,
177+
state: stateCandidate,
178+
last: current,
179+
}
180+
glog.V(4).Infof("Object %s is a candidate for contention", id)
181+
return
182+
}
183+
184+
// the previous state matches the current state, nothing to do
185+
if ingressEqual(last.last, current) {
186+
glog.V(4).Infof("Object %s is unchanged", id)
187+
return
188+
}
189+
190+
if last.state == stateContended {
191+
t.contentions++
192+
glog.V(4).Infof("Object %s is contended and has been modified by another writer", id)
193+
return
194+
}
195+
196+
// if it appears that the state is being changed by another party, mark it as contended
197+
if last.state == stateCandidate {
198+
t.ids[id] = trackerElement{
199+
at: nowFn().Time,
200+
state: stateContended,
201+
last: current,
202+
}
203+
t.contentions++
204+
glog.V(4).Infof("Object %s has been modified by another writer", id)
205+
return
206+
}
207+
}
208+
209+
func (t *SimpleContentionTracker) IsChangeContended(id string, now time.Time, current *routeapi.RouteIngress) bool {
210+
t.lock.Lock()
211+
defer t.lock.Unlock()
212+
213+
// we have detected a sufficient number of conflicts to skip all updates for this interval
214+
if t.contentions > t.maxContentions {
215+
glog.V(4).Infof("Reached max contentions, rejecting all update attempts until the next interval")
216+
return true
217+
}
218+
219+
// if we have expired or never recorded this object
220+
last, ok := t.ids[id]
221+
if !ok || last.at.Add(t.expires).Before(now) {
222+
return false
223+
}
224+
225+
// if the object is contended, exit early
226+
if last.state == stateContended {
227+
glog.V(4).Infof("Object %s is being contended by another writer", id)
228+
return true
229+
}
230+
231+
return false
232+
}
233+
234+
func (t *SimpleContentionTracker) Clear(id string, current *routeapi.RouteIngress) {
235+
t.lock.Lock()
236+
defer t.lock.Unlock()
237+
238+
last, ok := t.ids[id]
239+
if !ok {
240+
return
241+
}
242+
last.last = current
243+
last.state = stateCandidate
244+
t.ids[id] = last
245+
}
246+
247+
func ingressEqual(a, b *routeapi.RouteIngress) bool {
248+
return a.Host == b.Host && a.RouterCanonicalHostname == b.RouterCanonicalHostname && a.WildcardPolicy == b.WildcardPolicy && a.RouterName == b.RouterName
249+
}
250+
251+
func ingressConditionTouched(ingress *routeapi.RouteIngress) *metav1.Time {
252+
var lastTouch *metav1.Time
253+
for _, condition := range ingress.Conditions {
254+
if t := condition.LastTransitionTime; t != nil {
255+
switch {
256+
case lastTouch == nil, t.After(lastTouch.Time):
257+
lastTouch = t
258+
}
259+
}
260+
}
261+
return lastTouch
262+
}
263+
264+
func ingressChanged(oldRoute, route *routeapi.Route, routerName string) *routeapi.RouteIngress {
265+
var ingress *routeapi.RouteIngress
266+
for i := range route.Status.Ingress {
267+
if route.Status.Ingress[i].RouterName == routerName {
268+
ingress = &route.Status.Ingress[i]
269+
for _, old := range oldRoute.Status.Ingress {
270+
if old.RouterName == routerName {
271+
if !ingressEqual(ingress, &old) {
272+
return ingress
273+
}
274+
return nil
275+
}
276+
}
277+
return nil
278+
}
279+
}
280+
return nil
281+
}

0 commit comments

Comments
 (0)