Skip to content

Commit ee78030

Browse files
Merge pull request #16117 from csrwng/build_controller_policy_cache
Automatic merge from submit-queue (batch tested with PRs 16142, 16100, 16109, 16113, 16117) build controller: use a buildconfig queue to track which buildconfigs to kick Reverts to using the cache lister to determine which builds to run next. Adds a new queue to the build controller to keep track of which build configs to poke when a build has completed. Removes the OnComplete handler for policy, given that they all end up calling the same code. Removes code that updates an annotation on a build to add to the queue and instead adds the build(s) directly to the queue. Fixes #16081
2 parents 413d392 + bff1d6f commit ee78030

File tree

9 files changed

+172
-171
lines changed

9 files changed

+172
-171
lines changed

pkg/build/controller/build/build_controller.go

Lines changed: 121 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ type BuildController struct {
126126
podClient kexternalcoreclient.PodsGetter
127127
kubeClient kclientset.Interface
128128

129-
queue workqueue.RateLimitingInterface
129+
buildQueue workqueue.RateLimitingInterface
130130
imageStreamQueue *resourceTriggerQueue
131+
buildConfigQueue workqueue.RateLimitingInterface
131132

132133
buildStore buildlister.BuildLister
133134
secretStore v1lister.SecretLister
@@ -175,7 +176,6 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
175176

176177
buildClient := buildclient.NewOSClientBuildClient(params.OpenshiftClient)
177178
buildLister := params.BuildInformer.Lister()
178-
clientBuildLister := buildclient.NewOSClientBuildLister(params.OpenshiftClient)
179179
buildConfigGetter := params.BuildConfigInformer.Lister()
180180
c := &BuildController{
181181
buildPatcher: buildClient,
@@ -198,11 +198,12 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
198198
buildDefaults: params.BuildDefaults,
199199
buildOverrides: params.BuildOverrides,
200200

201-
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
201+
buildQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
202202
imageStreamQueue: newResourceTriggerQueue(),
203+
buildConfigQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
203204

204205
recorder: eventBroadcaster.NewRecorder(kapi.Scheme, clientv1.EventSource{Component: "build-controller"}),
205-
runPolicies: policy.GetAllRunPolicies(clientBuildLister, buildClient),
206+
runPolicies: policy.GetAllRunPolicies(buildLister, buildClient),
206207
}
207208

208209
c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -229,7 +230,8 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
229230
// Run begins watching and syncing.
230231
func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
231232
defer utilruntime.HandleCrash()
232-
defer bc.queue.ShutDown()
233+
defer bc.buildQueue.ShutDown()
234+
defer bc.buildConfigQueue.ShutDown()
233235

234236
// Wait for the controller stores to sync before starting any work in this controller.
235237
if !cache.WaitForCacheSync(stopCh, bc.buildStoreSynced, bc.podStoreSynced, bc.secretStoreSynced, bc.imageStreamStoreSynced) {
@@ -240,7 +242,11 @@ func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
240242
glog.Infof("Starting build controller")
241243

242244
for i := 0; i < workers; i++ {
243-
go wait.Until(bc.worker, time.Second, stopCh)
245+
go wait.Until(bc.buildWorker, time.Second, stopCh)
246+
}
247+
248+
for i := 0; i < workers; i++ {
249+
go wait.Until(bc.buildConfigWorker, time.Second, stopCh)
244250
}
245251

246252
metrics.IntializeMetricsCollector(bc.buildLister)
@@ -249,38 +255,72 @@ func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
249255
glog.Infof("Shutting down build controller")
250256
}
251257

252-
func (bc *BuildController) worker() {
258+
func (bc *BuildController) buildWorker() {
253259
for {
254-
if quit := bc.work(); quit {
260+
if quit := bc.buildWork(); quit {
255261
return
256262
}
257263
}
258264
}
259265

260-
// work gets the next build from the queue and invokes handleBuild on it
261-
func (bc *BuildController) work() bool {
262-
key, quit := bc.queue.Get()
266+
// buildWork gets the next build from the buildQueue and invokes handleBuild on it
267+
func (bc *BuildController) buildWork() bool {
268+
key, quit := bc.buildQueue.Get()
263269
if quit {
264270
return true
265271
}
266272

267-
defer bc.queue.Done(key)
273+
defer bc.buildQueue.Done(key)
268274

269275
build, err := bc.getBuildByKey(key.(string))
270276
if err != nil {
271-
bc.handleError(err, key)
277+
bc.handleBuildError(err, key)
272278
return false
273279
}
274280
if build == nil {
275281
return false
276282
}
277283

278284
err = bc.handleBuild(build)
285+
bc.handleBuildError(err, key)
286+
return false
287+
}
279288

280-
bc.handleError(err, key)
289+
func (bc *BuildController) buildConfigWorker() {
290+
for {
291+
if quit := bc.buildConfigWork(); quit {
292+
return
293+
}
294+
}
295+
}
296+
297+
// buildConfigWork gets the next build config from the buildConfigQueue and invokes handleBuildConfig on it
298+
func (bc *BuildController) buildConfigWork() bool {
299+
key, quit := bc.buildConfigQueue.Get()
300+
if quit {
301+
return true
302+
}
303+
defer bc.buildConfigQueue.Done(key)
304+
305+
namespace, name, err := parseBuildConfigKey(key.(string))
306+
if err != nil {
307+
utilruntime.HandleError(err)
308+
return false
309+
}
310+
311+
err = bc.handleBuildConfig(namespace, name)
312+
bc.handleBuildConfigError(err, key)
281313
return false
282314
}
283315

316+
func parseBuildConfigKey(key string) (string, string, error) {
317+
parts := strings.SplitN(key, "/", 2)
318+
if len(parts) != 2 {
319+
return "", "", fmt.Errorf("invalid build config key: %s", key)
320+
}
321+
return parts[0], parts[1], nil
322+
}
323+
284324
// handleBuild retrieves the build's corresponding pod and calls the appropriate
285325
// handle function based on the build's current state. Each handler returns a buildUpdate
286326
// object that includes any updates that need to be made on the build.
@@ -956,12 +996,46 @@ func (bc *BuildController) updateBuild(build *buildapi.Build, update *buildUpdat
956996
bc.recorder.Eventf(patchedBuild, kapi.EventTypeNormal, buildapi.BuildFailedEventReason, fmt.Sprintf(buildapi.BuildFailedEventMessage, patchedBuild.Namespace, patchedBuild.Name))
957997
}
958998
if buildutil.IsTerminalPhase(*update.phase) {
959-
common.HandleBuildCompletion(patchedBuild, bc.buildLister, bc.buildConfigGetter, bc.buildDeleter, bc.runPolicies)
999+
bc.handleBuildCompletion(patchedBuild)
9601000
}
9611001
}
9621002
return nil
9631003
}
9641004

1005+
func (bc *BuildController) handleBuildCompletion(build *buildapi.Build) {
1006+
bcName := buildutil.ConfigNameForBuild(build)
1007+
bc.enqueueBuildConfig(build.Namespace, bcName)
1008+
if err := common.HandleBuildPruning(bcName, build.Namespace, bc.buildLister, bc.buildConfigGetter, bc.buildDeleter); err != nil {
1009+
utilruntime.HandleError(fmt.Errorf("failed to prune old builds %s/%s: %v", build.Namespace, build.Name, err))
1010+
}
1011+
}
1012+
1013+
func (bc *BuildController) enqueueBuildConfig(ns, name string) {
1014+
key := resourceName(ns, name)
1015+
bc.buildConfigQueue.Add(key)
1016+
}
1017+
1018+
func (bc *BuildController) handleBuildConfig(bcNamespace string, bcName string) error {
1019+
glog.V(4).Infof("Handing build config %s/%s", bcNamespace, bcName)
1020+
nextBuilds, hasRunningBuilds, err := policy.GetNextConfigBuild(bc.buildLister, bcNamespace, bcName)
1021+
if err != nil {
1022+
glog.V(2).Infof("Error getting next builds for %s/%s: %v", bcNamespace, bcName, err)
1023+
return err
1024+
}
1025+
glog.V(5).Infof("Build config %s/%s: has %d next builds, is running builds: %v", bcNamespace, bcName, len(nextBuilds), hasRunningBuilds)
1026+
if len(nextBuilds) == 0 && hasRunningBuilds {
1027+
glog.V(4).Infof("Build config %s/%s has running builds, will retry", bcNamespace, bcName)
1028+
return fmt.Errorf("build config %s/%s has running builds and cannot run more builds", bcNamespace, bcName)
1029+
}
1030+
1031+
// Enqueue any builds to build next
1032+
for _, build := range nextBuilds {
1033+
glog.V(5).Infof("Queueing next build for build config %s/%s: %s", bcNamespace, bcName, build.Name)
1034+
bc.enqueueBuild(build)
1035+
}
1036+
return nil
1037+
}
1038+
9651039
// patchBuild generates a patch for the given build and buildUpdate
9661040
// and applies that patch using the REST client
9671041
func (bc *BuildController) patchBuild(build *buildapi.Build, update *buildUpdate) (*buildapi.Build, error) {
@@ -1059,16 +1133,16 @@ func (bc *BuildController) buildUpdated(old, cur interface{}) {
10591133
bc.enqueueBuild(build)
10601134
}
10611135

1062-
// enqueueBuild adds the given build to the queue.
1136+
// enqueueBuild adds the given build to the buildQueue.
10631137
func (bc *BuildController) enqueueBuild(build *buildapi.Build) {
10641138
key := resourceName(build.Namespace, build.Name)
1065-
bc.queue.Add(key)
1139+
bc.buildQueue.Add(key)
10661140
}
10671141

10681142
// enqueueBuildForPod adds the build corresponding to the given pod to the controller
1069-
// queue. If a build is not found for the pod, then an error is logged.
1143+
// buildQueue. If a build is not found for the pod, then an error is logged.
10701144
func (bc *BuildController) enqueueBuildForPod(pod *v1.Pod) {
1071-
bc.queue.Add(resourceName(pod.Namespace, buildutil.GetBuildName(pod)))
1145+
bc.buildQueue.Add(resourceName(pod.Namespace, buildutil.GetBuildName(pod)))
10721146
}
10731147

10741148
// imageStreamAdded queues any builds that have registered themselves for this image stream.
@@ -1077,7 +1151,7 @@ func (bc *BuildController) enqueueBuildForPod(pod *v1.Pod) {
10771151
func (bc *BuildController) imageStreamAdded(obj interface{}) {
10781152
stream := obj.(*imageapi.ImageStream)
10791153
for _, buildKey := range bc.imageStreamQueue.Pop(resourceName(stream.Namespace, stream.Name)) {
1080-
bc.queue.Add(buildKey)
1154+
bc.buildQueue.Add(buildKey)
10811155
}
10821156
}
10831157

@@ -1086,29 +1160,48 @@ func (bc *BuildController) imageStreamUpdated(old, cur interface{}) {
10861160
bc.imageStreamAdded(cur)
10871161
}
10881162

1089-
// handleError is called by the main work loop to check the return of calling handleBuild.
1090-
// If an error occurred, then the key is re-added to the queue unless it has been retried too many
1163+
// handleBuildError is called by the main work loop to check the return of calling handleBuild.
1164+
// If an error occurred, then the key is re-added to the buildQueue unless it has been retried too many
10911165
// times.
1092-
func (bc *BuildController) handleError(err error, key interface{}) {
1166+
func (bc *BuildController) handleBuildError(err error, key interface{}) {
10931167
if err == nil {
1094-
bc.queue.Forget(key)
1168+
bc.buildQueue.Forget(key)
10951169
return
10961170
}
10971171

10981172
if strategy.IsFatal(err) {
10991173
glog.V(2).Infof("Will not retry fatal error for key %v: %v", key, err)
1100-
bc.queue.Forget(key)
1174+
bc.buildQueue.Forget(key)
1175+
return
1176+
}
1177+
1178+
if bc.buildQueue.NumRequeues(key) < maxRetries {
1179+
glog.V(4).Infof("Retrying key %v: %v", key, err)
1180+
bc.buildQueue.AddRateLimited(key)
1181+
return
1182+
}
1183+
1184+
glog.V(2).Infof("Giving up retrying %v: %v", key, err)
1185+
bc.buildQueue.Forget(key)
1186+
}
1187+
1188+
// handleBuildConfigError is called by the buildConfig work loop to check the return of calling handleBuildConfig.
1189+
// If an error occurred, then the key is re-added to the buildConfigQueue unless it has been retried too many
1190+
// times.
1191+
func (bc *BuildController) handleBuildConfigError(err error, key interface{}) {
1192+
if err == nil {
1193+
bc.buildConfigQueue.Forget(key)
11011194
return
11021195
}
11031196

1104-
if bc.queue.NumRequeues(key) < maxRetries {
1197+
if bc.buildConfigQueue.NumRequeues(key) < maxRetries {
11051198
glog.V(4).Infof("Retrying key %v: %v", key, err)
1106-
bc.queue.AddRateLimited(key)
1199+
bc.buildConfigQueue.AddRateLimited(key)
11071200
return
11081201
}
11091202

11101203
glog.V(2).Infof("Giving up retrying %v: %v", key, err)
1111-
bc.queue.Forget(key)
1204+
bc.buildConfigQueue.Forget(key)
11121205
}
11131206

11141207
// isBuildPod returns true if the given pod is a build pod

pkg/build/controller/build/build_controller_test.go

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ func TestHandleBuild(t *testing.T) {
133133
expectPodCreated bool
134134
expectPodDeleted bool
135135
expectError bool
136-
expectOnComplete bool
137136
}{
138137
{
139138
name: "cancel running build",
@@ -145,7 +144,6 @@ func TestHandleBuild(t *testing.T) {
145144
completionTime(now).
146145
startTime(now).update,
147146
expectPodDeleted: true,
148-
expectOnComplete: true,
149147
},
150148
{
151149
name: "cancel build in terminal state",
@@ -195,7 +193,6 @@ func TestHandleBuild(t *testing.T) {
195193
startTime(now).
196194
completionTime(now).
197195
update,
198-
expectOnComplete: true,
199196
},
200197
{
201198
name: "new not runnable by policy",
@@ -231,10 +228,9 @@ func TestHandleBuild(t *testing.T) {
231228
expectError: true,
232229
},
233230
{
234-
name: "pending -> failed",
235-
build: build(buildapi.BuildPhasePending),
236-
pod: pod(v1.PodFailed),
237-
expectOnComplete: true,
231+
name: "pending -> failed",
232+
build: build(buildapi.BuildPhasePending),
233+
pod: pod(v1.PodFailed),
238234
expectUpdate: newUpdate().
239235
phase(buildapi.BuildPhaseFailed).
240236
reason(buildapi.StatusReasonGenericBuildFailed).
@@ -250,10 +246,9 @@ func TestHandleBuild(t *testing.T) {
250246
expectUpdate: nil,
251247
},
252248
{
253-
name: "running -> complete",
254-
build: build(buildapi.BuildPhaseRunning),
255-
pod: pod(v1.PodSucceeded),
256-
expectOnComplete: true,
249+
name: "running -> complete",
250+
build: build(buildapi.BuildPhaseRunning),
251+
pod: pod(v1.PodSucceeded),
257252
expectUpdate: newUpdate().
258253
phase(buildapi.BuildPhaseComplete).
259254
reason("").
@@ -269,9 +264,8 @@ func TestHandleBuild(t *testing.T) {
269264
expectUpdate: nil,
270265
},
271266
{
272-
name: "running with missing pod",
273-
build: build(buildapi.BuildPhaseRunning),
274-
expectOnComplete: true,
267+
name: "running with missing pod",
268+
build: build(buildapi.BuildPhaseRunning),
275269
expectUpdate: newUpdate().
276270
phase(buildapi.BuildPhaseError).
277271
reason(buildapi.StatusReasonBuildPodDeleted).
@@ -281,10 +275,9 @@ func TestHandleBuild(t *testing.T) {
281275
update,
282276
},
283277
{
284-
name: "failed -> failed with no completion timestamp",
285-
build: build(buildapi.BuildPhaseFailed),
286-
pod: pod(v1.PodFailed),
287-
expectOnComplete: true,
278+
name: "failed -> failed with no completion timestamp",
279+
build: build(buildapi.BuildPhaseFailed),
280+
pod: pod(v1.PodFailed),
288281
expectUpdate: newUpdate().
289282
startTime(now).
290283
completionTime(now).
@@ -294,18 +287,15 @@ func TestHandleBuild(t *testing.T) {
294287
name: "failed -> failed with completion timestamp+message and no logsnippet",
295288
build: withCompletionTS(build(buildapi.BuildPhaseFailed)),
296289
pod: withTerminationMessage(pod(v1.PodFailed)),
297-
// no oncomplete call because the completion timestamp is already set.
298-
expectOnComplete: false,
299290
expectUpdate: newUpdate().
300291
startTime(now).
301292
logSnippet("termination message").
302293
update,
303294
},
304295
{
305-
name: "failed -> failed with completion timestamp+message and logsnippet",
306-
build: withLogSnippet(withCompletionTS(build(buildapi.BuildPhaseFailed))),
307-
pod: withTerminationMessage(pod(v1.PodFailed)),
308-
expectOnComplete: false,
296+
name: "failed -> failed with completion timestamp+message and logsnippet",
297+
build: withLogSnippet(withCompletionTS(build(buildapi.BuildPhaseFailed))),
298+
pod: withTerminationMessage(pod(v1.PodFailed)),
309299
},
310300
}
311301

@@ -377,9 +367,6 @@ func TestHandleBuild(t *testing.T) {
377367
if tc.expectPodCreated != podCreated {
378368
t.Errorf("%s: pod created. expected: %v, actual: %v", tc.name, tc.expectPodCreated, podCreated)
379369
}
380-
if tc.expectOnComplete != runPolicy.onCompleteCalled {
381-
t.Errorf("%s: on complete called. expected: %v, actual: %v", tc.name, tc.expectOnComplete, runPolicy.onCompleteCalled)
382-
}
383370
if tc.expectUpdate != nil {
384371
if patchedBuild == nil {
385372
t.Errorf("%s: did not get an update. Expected: %v", tc.name, tc.expectUpdate)
@@ -423,9 +410,9 @@ func TestWorkWithNewBuild(t *testing.T) {
423410
defer bc.stop()
424411
bc.enqueueBuild(build)
425412

426-
bc.work()
413+
bc.buildWork()
427414

428-
if bc.queue.Len() > 0 {
415+
if bc.buildQueue.Len() > 0 {
429416
t.Errorf("Expected queue to be empty")
430417
}
431418
if patchedBuild == nil {

0 commit comments

Comments
 (0)