Skip to content

Commit 1360dd3

Browse files
Avoid some status updates from being delayed to next sync
The writerlease is a work queue, but we were exiting immediately on conflicts. This is not our normal pattern, which is to build a work queue and then resync from the latest cache state. Change how status.go queues up work so that we perform our retry inside the lease function. Should ensure that the correct output is eventually written.
1 parent a6f0394 commit 1360dd3

File tree

8 files changed

+270
-164
lines changed

8 files changed

+270
-164
lines changed

pkg/cmd/infra/router/f5.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
2121
routeapi "github.com/openshift/origin/pkg/route/apis/route"
2222
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
23+
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
2324
"github.com/openshift/origin/pkg/router"
2425
"github.com/openshift/origin/pkg/router/controller"
2526
f5plugin "github.com/openshift/origin/pkg/router/f5"
@@ -231,6 +232,8 @@ func (o *F5RouterOptions) Run() error {
231232
return err
232233
}
233234

235+
factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
236+
234237
var plugin router.Plugin = f5Plugin
235238
var recorder controller.RejectionRecorder = controller.LogRejections
236239
if o.UpdateStatus {
@@ -239,7 +242,8 @@ func (o *F5RouterOptions) Run() error {
239242
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
240243
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
241244
go tracker.Run(wait.NeverStop)
242-
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
245+
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
246+
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
243247
recorder = status
244248
plugin = status
245249
}
@@ -249,7 +253,6 @@ func (o *F5RouterOptions) Run() error {
249253
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
250254
plugin = controller.NewHostAdmitter(plugin, o.F5RouteAdmitterFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
251255

252-
factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
253256
watchNodes := (len(o.InternalAddress) != 0 && len(o.VxlanGateway) != 0)
254257
controller := factory.Create(plugin, watchNodes)
255258
controller.Run()

pkg/cmd/infra/router/template.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
cmdversion "github.com/openshift/origin/pkg/cmd/version"
3434
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
3535
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
36+
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
3637
"github.com/openshift/origin/pkg/router"
3738
"github.com/openshift/origin/pkg/router/controller"
3839
"github.com/openshift/origin/pkg/router/metrics"
@@ -412,6 +413,8 @@ func (o *TemplateRouterOptions) Run() error {
412413
return err
413414
}
414415

416+
factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
417+
415418
var plugin router.Plugin = templatePlugin
416419
var recorder controller.RejectionRecorder = controller.LogRejections
417420
if o.UpdateStatus {
@@ -420,7 +423,8 @@ func (o *TemplateRouterOptions) Run() error {
420423
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
421424
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
422425
go tracker.Run(wait.NeverStop)
423-
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
426+
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
427+
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
424428
recorder = status
425429
plugin = status
426430
}
@@ -430,7 +434,6 @@ func (o *TemplateRouterOptions) Run() error {
430434
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
431435
plugin = controller.NewHostAdmitter(plugin, o.RouteAdmissionFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
432436

433-
factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
434437
controller := factory.Create(plugin, false)
435438
controller.Run()
436439

pkg/router/controller/factory/factory.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@ func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes bool)
9696

9797
func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterController) {
9898
if f.NamespaceLabels != nil {
99-
f.createNamespacesSharedInformer(rc)
99+
f.createNamespacesSharedInformer()
100100
}
101-
f.createEndpointsSharedInformer(rc)
102-
f.createRoutesSharedInformer(rc)
101+
f.createEndpointsSharedInformer()
102+
f.CreateRoutesSharedInformer()
103103

104104
if rc.WatchNodes {
105-
f.createNodesSharedInformer(rc)
105+
f.createNodesSharedInformer()
106106
}
107107

108108
// Start informers
@@ -187,7 +187,7 @@ func (f *RouterControllerFactory) setSelectors(options *v1.ListOptions) {
187187
options.FieldSelector = f.FieldSelector
188188
}
189189

190-
func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontroller.RouterController) {
190+
func (f *RouterControllerFactory) createEndpointsSharedInformer() {
191191
// we do not scope endpoints by labels or fields because the route labels != endpoints labels
192192
lw := &kcache.ListWatch{
193193
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
@@ -204,7 +204,13 @@ func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontro
204204
f.informers[objType] = informer
205205
}
206206

207-
func (f *RouterControllerFactory) createRoutesSharedInformer(rc *routercontroller.RouterController) {
207+
func (f *RouterControllerFactory) CreateRoutesSharedInformer() kcache.SharedIndexInformer {
208+
rt := &routeapi.Route{}
209+
objType := reflect.TypeOf(rt)
210+
if informer, ok := f.informers[objType]; ok {
211+
return informer
212+
}
213+
208214
lw := &kcache.ListWatch{
209215
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
210216
f.setSelectors(&options)
@@ -221,22 +227,21 @@ func (f *RouterControllerFactory) createRoutesSharedInformer(rc *routercontrolle
221227
return f.RClient.Route().Routes(f.Namespace).Watch(options)
222228
},
223229
}
224-
rt := &routeapi.Route{}
225-
objType := reflect.TypeOf(rt)
226230
indexers := kcache.Indexers{kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc}
227231
informer := kcache.NewSharedIndexInformer(lw, rt, f.ResyncInterval, indexers)
228232
f.informers[objType] = informer
233+
return informer
229234
}
230235

231-
func (f *RouterControllerFactory) createNodesSharedInformer(rc *routercontroller.RouterController) {
236+
func (f *RouterControllerFactory) createNodesSharedInformer() {
232237
// Use stock node informer as we don't need namespace/labels/fields filtering on nodes
233238
ifactory := informerfactory.NewSharedInformerFactory(f.KClient, f.ResyncInterval)
234239
informer := ifactory.Core().InternalVersion().Nodes().Informer()
235240
objType := reflect.TypeOf(&kapi.Node{})
236241
f.informers[objType] = informer
237242
}
238243

239-
func (f *RouterControllerFactory) createNamespacesSharedInformer(rc *routercontroller.RouterController) {
244+
func (f *RouterControllerFactory) createNamespacesSharedInformer() {
240245
lw := &kcache.ListWatch{
241246
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
242247
options.LabelSelector = f.NamespaceLabels.String()

pkg/router/controller/status.go

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
routeapi "github.com/openshift/origin/pkg/route/apis/route"
1818
client "github.com/openshift/origin/pkg/route/generated/internalclientset/typed/route/internalversion"
19+
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
1920
"github.com/openshift/origin/pkg/router"
2021
"github.com/openshift/origin/pkg/util/writerlease"
2122
)
@@ -36,8 +37,10 @@ func (logRecorder) RecordRouteRejection(route *routeapi.Route, reason, message s
3637

3738
// StatusAdmitter ensures routes added to the plugin have status set.
3839
type StatusAdmitter struct {
39-
plugin router.Plugin
40-
client client.RoutesGetter
40+
plugin router.Plugin
41+
client client.RoutesGetter
42+
lister routelisters.RouteLister
43+
4144
routerName string
4245
routerCanonicalHostname string
4346

@@ -49,10 +52,12 @@ type StatusAdmitter struct {
4952
// route has a status field set that matches this router. The admitter manages
5053
// an LRU of recently seen conflicting updates to handle when two router processes
5154
// with differing configurations are writing updates at the same time.
52-
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string, lease writerlease.Lease, tracker ContentionTracker) *StatusAdmitter {
55+
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, lister routelisters.RouteLister, name, hostName string, lease writerlease.Lease, tracker ContentionTracker) *StatusAdmitter {
5356
return &StatusAdmitter{
54-
plugin: plugin,
55-
client: client,
57+
plugin: plugin,
58+
client: client,
59+
lister: lister,
60+
5661
routerName: name,
5762
routerCanonicalHostname: hostName,
5863

@@ -74,7 +79,7 @@ var nowFn = getRfc3339Timestamp
7479
func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi.Route) error {
7580
switch eventType {
7681
case watch.Added, watch.Modified:
77-
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
82+
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, a.lister, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
7883
Type: routeapi.RouteAdmitted,
7984
Status: kapi.ConditionTrue,
8085
})
@@ -100,51 +105,79 @@ func (a *StatusAdmitter) Commit() error {
100105

101106
// RecordRouteRejection attempts to update the route status with a reason for a route being rejected.
102107
func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, message string) {
103-
performIngressConditionUpdate("reject", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
108+
performIngressConditionUpdate("reject", a.lease, a.tracker, a.client, a.lister, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
104109
Type: routeapi.RouteAdmitted,
105110
Status: kapi.ConditionFalse,
106111
Reason: reason,
107112
Message: message,
108113
})
109114
}
110115

111-
// performIngressConditionUpdate updates the route to the the appropriate status for the provided condition.
112-
func performIngressConditionUpdate(action string, lease writerlease.Lease, tracker ContentionTracker, oc client.RoutesGetter, route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) {
116+
// performIngressConditionUpdate updates the route to the appropriate status for the provided condition.
117+
func performIngressConditionUpdate(action string, lease writerlease.Lease, tracker ContentionTracker, oc client.RoutesGetter, lister routelisters.RouteLister, route *routeapi.Route, routerName, hostName string, condition routeapi.RouteIngressCondition) {
118+
attempts := 3
113119
key := string(route.UID)
114-
newestIngressName := findMostRecentIngress(route)
115-
changed, created, now, latest := recordIngressCondition(route, name, hostName, condition)
116-
if !changed {
117-
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
118-
tracker.Clear(key, latest)
119-
// if the most recent change was to our ingress status, consider the current lease extended
120-
if newestIngressName == name {
121-
lease.Extend(key)
120+
routeNamespace, routeName := route.Namespace, route.Name
121+
122+
lease.Try(key, func() (writerlease.WorkResult, bool) {
123+
route, err := lister.Routes(routeNamespace).Get(routeName)
124+
if err != nil {
125+
return writerlease.None, false
126+
}
127+
if string(route.UID) != key {
128+
glog.V(4).Infof("%s: skipped update due to route UID changing (likely delete and recreate): %s/%s", action, route.Namespace, route.Name)
129+
return writerlease.None, false
122130
}
123-
return
124-
}
125-
// If the tracker determines that another process is attempting to update the ingress to an inconsistent
126-
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
127-
// with different configurations from endlessly updating the route status.
128-
if !created && tracker.IsContended(key, now, latest) {
129-
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
130-
return
131-
}
132131

133-
lease.Try(key, func() (bool, bool) {
134-
updated := updateStatus(oc, route)
135-
if updated {
132+
route = route.DeepCopy()
133+
changed, created, now, latest := recordIngressCondition(route, routerName, hostName, condition)
134+
if !changed {
135+
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
136136
tracker.Clear(key, latest)
137-
} else {
138-
glog.V(4).Infof("%s: did not update route status, skipping route until next resync", action)
137+
// if the most recent change was to our ingress status, consider the current lease extended
138+
if findMostRecentIngress(route) == routerName {
139+
lease.Extend(key)
140+
}
141+
return writerlease.None, false
142+
}
143+
144+
// If the tracker determines that another process is attempting to update the ingress to an inconsistent
145+
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
146+
// with different configurations from endlessly updating the route status.
147+
if !created && tracker.IsContended(key, now, latest) {
148+
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
149+
return writerlease.None, false
150+
}
151+
152+
switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
153+
case err == nil:
154+
tracker.Clear(key, latest)
155+
return writerlease.Extend, false
156+
case errors.IsForbidden(err):
157+
// if the router can't write status updates, allow the route to go through
158+
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
159+
tracker.Clear(key, latest)
160+
return writerlease.Extend, false
161+
case errors.IsNotFound(err):
162+
// route was deleted
163+
return writerlease.Release, false
164+
case errors.IsConflict(err):
165+
// just follow the normal process, and retry when we receive the update notification due to
166+
// the other entity updating the route.
167+
glog.V(4).Infof("%s: updating status of %s/%s failed due to write conflict", action, route.Namespace, route.Name)
168+
return writerlease.Release, true
169+
default:
170+
utilruntime.HandleError(fmt.Errorf("Unable to write router status for %s/%s: %v", route.Namespace, route.Name, err))
171+
attempts--
172+
return writerlease.Release, attempts > 0
139173
}
140-
return updated, false
141174
})
142175
}
143176

144177
// recordIngressCondition updates the matching ingress on the route (or adds a new one) with the specified
145178
// condition, returning whether the route was updated or created, the time assigned to the condition, and
146179
// a pointer to the current ingress record.
147-
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, _ time.Time, latest *routeapi.RouteIngress) {
180+
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, at time.Time, latest *routeapi.RouteIngress) {
148181
for i := range route.Status.Ingress {
149182
existing := &route.Status.Ingress[i]
150183
if existing.RouterName != name {
@@ -227,31 +260,6 @@ func findCondition(ingress *routeapi.RouteIngress, t routeapi.RouteIngressCondit
227260
return nil
228261
}
229262

230-
func updateStatus(oc client.RoutesGetter, route *routeapi.Route) bool {
231-
for i := 0; i < 3; i++ {
232-
switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
233-
case err == nil:
234-
return true
235-
case errors.IsNotFound(err):
236-
// route was deleted
237-
return false
238-
case errors.IsForbidden(err):
239-
// if the router can't write status updates, allow the route to go through
240-
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
241-
return true
242-
case errors.IsConflict(err):
243-
// just follow the normal process, and retry when we receive the update notification due to
244-
// the other entity updating the route.
245-
glog.V(4).Infof("updating status of %s/%s failed due to write conflict", route.Namespace, route.Name)
246-
return false
247-
default:
248-
utilruntime.HandleError(fmt.Errorf("Unable to write router status for %s/%s: %v", route.Namespace, route.Name, err))
249-
continue
250-
}
251-
}
252-
return false
253-
}
254-
255263
// ContentionTracker records modifications to a particular entry to prevent endless
256264
// loops when multiple routers are configured with conflicting info. A given router
257265
// process tracks whether the ingress status is change from a correct value to any

0 commit comments

Comments
 (0)