Skip to content

build controller: use a buildconfig queue to track which buildconfigs to kick #16117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 121 additions & 28 deletions pkg/build/controller/build/build_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ type BuildController struct {
podClient kexternalcoreclient.PodsGetter
kubeClient kclientset.Interface

queue workqueue.RateLimitingInterface
buildQueue workqueue.RateLimitingInterface
imageStreamQueue *resourceTriggerQueue
buildConfigQueue workqueue.RateLimitingInterface

buildStore buildlister.BuildLister
secretStore v1lister.SecretLister
Expand Down Expand Up @@ -175,7 +176,6 @@ func NewBuildController(params *BuildControllerParams) *BuildController {

buildClient := buildclient.NewOSClientBuildClient(params.OpenshiftClient)
buildLister := params.BuildInformer.Lister()
clientBuildLister := buildclient.NewOSClientBuildLister(params.OpenshiftClient)
buildConfigGetter := params.BuildConfigInformer.Lister()
c := &BuildController{
buildPatcher: buildClient,
Expand All @@ -198,11 +198,12 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
buildDefaults: params.BuildDefaults,
buildOverrides: params.BuildOverrides,

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
buildQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
imageStreamQueue: newResourceTriggerQueue(),
buildConfigQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

recorder: eventBroadcaster.NewRecorder(kapi.Scheme, clientv1.EventSource{Component: "build-controller"}),
runPolicies: policy.GetAllRunPolicies(clientBuildLister, buildClient),
runPolicies: policy.GetAllRunPolicies(buildLister, buildClient),
}

c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -229,7 +230,8 @@ func NewBuildController(params *BuildControllerParams) *BuildController {
// Run begins watching and syncing.
func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer bc.queue.ShutDown()
defer bc.buildQueue.ShutDown()
defer bc.buildConfigQueue.ShutDown()

// Wait for the controller stores to sync before starting any work in this controller.
if !cache.WaitForCacheSync(stopCh, bc.buildStoreSynced, bc.podStoreSynced, bc.secretStoreSynced, bc.imageStreamStoreSynced) {
Expand All @@ -240,7 +242,11 @@ func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting build controller")

for i := 0; i < workers; i++ {
go wait.Until(bc.worker, time.Second, stopCh)
go wait.Until(bc.buildWorker, time.Second, stopCh)
}

for i := 0; i < workers; i++ {
go wait.Until(bc.buildConfigWorker, time.Second, stopCh)
}

metrics.IntializeMetricsCollector(bc.buildLister)
Expand All @@ -249,38 +255,72 @@ func (bc *BuildController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Shutting down build controller")
}

func (bc *BuildController) worker() {
func (bc *BuildController) buildWorker() {
for {
if quit := bc.work(); quit {
if quit := bc.buildWork(); quit {
return
}
}
}

// work gets the next build from the queue and invokes handleBuild on it
func (bc *BuildController) work() bool {
key, quit := bc.queue.Get()
// buildWork gets the next build from the buildQueue and invokes handleBuild on it
func (bc *BuildController) buildWork() bool {
key, quit := bc.buildQueue.Get()
if quit {
return true
}

defer bc.queue.Done(key)
defer bc.buildQueue.Done(key)

build, err := bc.getBuildByKey(key.(string))
if err != nil {
bc.handleError(err, key)
bc.handleBuildError(err, key)
return false
}
if build == nil {
return false
}

err = bc.handleBuild(build)
bc.handleBuildError(err, key)
return false
}

bc.handleError(err, key)
func (bc *BuildController) buildConfigWorker() {
for {
if quit := bc.buildConfigWork(); quit {
return
}
}
}

// buildConfigWork gets the next build config from the buildConfigQueue and invokes handleBuildConfig on it
func (bc *BuildController) buildConfigWork() bool {
key, quit := bc.buildConfigQueue.Get()
if quit {
return true
}
defer bc.buildConfigQueue.Done(key)

namespace, name, err := parseBuildConfigKey(key.(string))
if err != nil {
utilruntime.HandleError(err)
return false
}

err = bc.handleBuildConfig(namespace, name)
bc.handleBuildConfigError(err, key)
return false
}

func parseBuildConfigKey(key string) (string, string, error) {
parts := strings.SplitN(key, "/", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("invalid build config key: %s", key)
}
return parts[0], parts[1], nil
}

// handleBuild retrieves the build's corresponding pod and calls the appropriate
// handle function based on the build's current state. Each handler returns a buildUpdate
// object that includes any updates that need to be made on the build.
Expand Down Expand Up @@ -956,12 +996,46 @@ func (bc *BuildController) updateBuild(build *buildapi.Build, update *buildUpdat
bc.recorder.Eventf(patchedBuild, kapi.EventTypeNormal, buildapi.BuildFailedEventReason, fmt.Sprintf(buildapi.BuildFailedEventMessage, patchedBuild.Namespace, patchedBuild.Name))
}
if buildutil.IsTerminalPhase(*update.phase) {
common.HandleBuildCompletion(patchedBuild, bc.buildLister, bc.buildConfigGetter, bc.buildDeleter, bc.runPolicies)
bc.handleBuildCompletion(patchedBuild)
}
}
return nil
}

func (bc *BuildController) handleBuildCompletion(build *buildapi.Build) {
bcName := buildutil.ConfigNameForBuild(build)
bc.enqueueBuildConfig(build.Namespace, bcName)
if err := common.HandleBuildPruning(bcName, build.Namespace, bc.buildLister, bc.buildConfigGetter, bc.buildDeleter); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to prune old builds %s/%s: %v", build.Namespace, build.Name, err))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this could also run inside handleBuildConfig. Not sure if there's an advantage to moving it though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving as is given that in handleBuildConfig it could potentially be called multiple times which could be wasteful.

}

func (bc *BuildController) enqueueBuildConfig(ns, name string) {
key := resourceName(ns, name)
bc.buildConfigQueue.Add(key)
}

func (bc *BuildController) handleBuildConfig(bcNamespace string, bcName string) error {
glog.V(4).Infof("Handing build config %s/%s", bcNamespace, bcName)
nextBuilds, hasRunningBuilds, err := policy.GetNextConfigBuild(bc.buildLister, bcNamespace, bcName)
if err != nil {
glog.V(2).Infof("Error getting next builds for %s/%s: %v", bcNamespace, bcName, err)
return err
}
glog.V(5).Infof("Build config %s/%s: has %d next builds, is running builds: %v", bcNamespace, bcName, len(nextBuilds), hasRunningBuilds)
if len(nextBuilds) == 0 && hasRunningBuilds {
glog.V(4).Infof("Build config %s/%s has running builds, will retry", bcNamespace, bcName)
return fmt.Errorf("build config %s/%s has running builds and cannot run more builds", bcNamespace, bcName)
}

// Enqueue any builds to build next
for _, build := range nextBuilds {
glog.V(5).Infof("Queueing next build for build config %s/%s: %s", bcNamespace, bcName, build.Name)
bc.enqueueBuild(build)
}
return nil
}

// patchBuild generates a patch for the given build and buildUpdate
// and applies that patch using the REST client
func (bc *BuildController) patchBuild(build *buildapi.Build, update *buildUpdate) (*buildapi.Build, error) {
Expand Down Expand Up @@ -1059,16 +1133,16 @@ func (bc *BuildController) buildUpdated(old, cur interface{}) {
bc.enqueueBuild(build)
}

// enqueueBuild adds the given build to the queue.
// enqueueBuild adds the given build to the buildQueue.
func (bc *BuildController) enqueueBuild(build *buildapi.Build) {
key := resourceName(build.Namespace, build.Name)
bc.queue.Add(key)
bc.buildQueue.Add(key)
}

// enqueueBuildForPod adds the build corresponding to the given pod to the controller
// queue. If a build is not found for the pod, then an error is logged.
// buildQueue. If a build is not found for the pod, then an error is logged.
func (bc *BuildController) enqueueBuildForPod(pod *v1.Pod) {
bc.queue.Add(resourceName(pod.Namespace, buildutil.GetBuildName(pod)))
bc.buildQueue.Add(resourceName(pod.Namespace, buildutil.GetBuildName(pod)))
}

// imageStreamAdded queues any builds that have registered themselves for this image stream.
Expand All @@ -1077,7 +1151,7 @@ func (bc *BuildController) enqueueBuildForPod(pod *v1.Pod) {
func (bc *BuildController) imageStreamAdded(obj interface{}) {
stream := obj.(*imageapi.ImageStream)
for _, buildKey := range bc.imageStreamQueue.Pop(resourceName(stream.Namespace, stream.Name)) {
bc.queue.Add(buildKey)
bc.buildQueue.Add(buildKey)
}
}

Expand All @@ -1086,29 +1160,48 @@ func (bc *BuildController) imageStreamUpdated(old, cur interface{}) {
bc.imageStreamAdded(cur)
}

// handleError is called by the main work loop to check the return of calling handleBuild.
// If an error occurred, then the key is re-added to the queue unless it has been retried too many
// handleBuildError is called by the main work loop to check the return of calling handleBuild.
// If an error occurred, then the key is re-added to the buildQueue unless it has been retried too many
// times.
func (bc *BuildController) handleError(err error, key interface{}) {
func (bc *BuildController) handleBuildError(err error, key interface{}) {
if err == nil {
bc.queue.Forget(key)
bc.buildQueue.Forget(key)
return
}

if strategy.IsFatal(err) {
glog.V(2).Infof("Will not retry fatal error for key %v: %v", key, err)
bc.queue.Forget(key)
bc.buildQueue.Forget(key)
return
}

if bc.buildQueue.NumRequeues(key) < maxRetries {
glog.V(4).Infof("Retrying key %v: %v", key, err)
bc.buildQueue.AddRateLimited(key)
return
}

glog.V(2).Infof("Giving up retrying %v: %v", key, err)
bc.buildQueue.Forget(key)
}

// handleBuildConfigError is called by the buildConfig work loop to check the return of calling handleBuildConfig.
// If an error occurred, then the key is re-added to the buildConfigQueue unless it has been retried too many
// times.
func (bc *BuildController) handleBuildConfigError(err error, key interface{}) {
if err == nil {
bc.buildConfigQueue.Forget(key)
return
}

if bc.queue.NumRequeues(key) < maxRetries {
if bc.buildConfigQueue.NumRequeues(key) < maxRetries {
glog.V(4).Infof("Retrying key %v: %v", key, err)
bc.queue.AddRateLimited(key)
bc.buildConfigQueue.AddRateLimited(key)
return
}

glog.V(2).Infof("Giving up retrying %v: %v", key, err)
bc.queue.Forget(key)
bc.buildConfigQueue.Forget(key)
}

// isBuildPod returns true if the given pod is a build pod
Expand Down
45 changes: 16 additions & 29 deletions pkg/build/controller/build/build_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func TestHandleBuild(t *testing.T) {
expectPodCreated bool
expectPodDeleted bool
expectError bool
expectOnComplete bool
}{
{
name: "cancel running build",
Expand All @@ -145,7 +144,6 @@ func TestHandleBuild(t *testing.T) {
completionTime(now).
startTime(now).update,
expectPodDeleted: true,
expectOnComplete: true,
},
{
name: "cancel build in terminal state",
Expand Down Expand Up @@ -195,7 +193,6 @@ func TestHandleBuild(t *testing.T) {
startTime(now).
completionTime(now).
update,
expectOnComplete: true,
},
{
name: "new not runnable by policy",
Expand Down Expand Up @@ -231,10 +228,9 @@ func TestHandleBuild(t *testing.T) {
expectError: true,
},
{
name: "pending -> failed",
build: build(buildapi.BuildPhasePending),
pod: pod(v1.PodFailed),
expectOnComplete: true,
name: "pending -> failed",
build: build(buildapi.BuildPhasePending),
pod: pod(v1.PodFailed),
expectUpdate: newUpdate().
phase(buildapi.BuildPhaseFailed).
reason(buildapi.StatusReasonGenericBuildFailed).
Expand All @@ -250,10 +246,9 @@ func TestHandleBuild(t *testing.T) {
expectUpdate: nil,
},
{
name: "running -> complete",
build: build(buildapi.BuildPhaseRunning),
pod: pod(v1.PodSucceeded),
expectOnComplete: true,
name: "running -> complete",
build: build(buildapi.BuildPhaseRunning),
pod: pod(v1.PodSucceeded),
expectUpdate: newUpdate().
phase(buildapi.BuildPhaseComplete).
reason("").
Expand All @@ -269,9 +264,8 @@ func TestHandleBuild(t *testing.T) {
expectUpdate: nil,
},
{
name: "running with missing pod",
build: build(buildapi.BuildPhaseRunning),
expectOnComplete: true,
name: "running with missing pod",
build: build(buildapi.BuildPhaseRunning),
expectUpdate: newUpdate().
phase(buildapi.BuildPhaseError).
reason(buildapi.StatusReasonBuildPodDeleted).
Expand All @@ -281,10 +275,9 @@ func TestHandleBuild(t *testing.T) {
update,
},
{
name: "failed -> failed with no completion timestamp",
build: build(buildapi.BuildPhaseFailed),
pod: pod(v1.PodFailed),
expectOnComplete: true,
name: "failed -> failed with no completion timestamp",
build: build(buildapi.BuildPhaseFailed),
pod: pod(v1.PodFailed),
expectUpdate: newUpdate().
startTime(now).
completionTime(now).
Expand All @@ -294,18 +287,15 @@ func TestHandleBuild(t *testing.T) {
name: "failed -> failed with completion timestamp+message and no logsnippet",
build: withCompletionTS(build(buildapi.BuildPhaseFailed)),
pod: withTerminationMessage(pod(v1.PodFailed)),
// no oncomplete call because the completion timestamp is already set.
expectOnComplete: false,
expectUpdate: newUpdate().
startTime(now).
logSnippet("termination message").
update,
},
{
name: "failed -> failed with completion timestamp+message and logsnippet",
build: withLogSnippet(withCompletionTS(build(buildapi.BuildPhaseFailed))),
pod: withTerminationMessage(pod(v1.PodFailed)),
expectOnComplete: false,
name: "failed -> failed with completion timestamp+message and logsnippet",
build: withLogSnippet(withCompletionTS(build(buildapi.BuildPhaseFailed))),
pod: withTerminationMessage(pod(v1.PodFailed)),
},
}

Expand Down Expand Up @@ -377,9 +367,6 @@ func TestHandleBuild(t *testing.T) {
if tc.expectPodCreated != podCreated {
t.Errorf("%s: pod created. expected: %v, actual: %v", tc.name, tc.expectPodCreated, podCreated)
}
if tc.expectOnComplete != runPolicy.onCompleteCalled {
t.Errorf("%s: on complete called. expected: %v, actual: %v", tc.name, tc.expectOnComplete, runPolicy.onCompleteCalled)
}
if tc.expectUpdate != nil {
if patchedBuild == nil {
t.Errorf("%s: did not get an update. Expected: %v", tc.name, tc.expectUpdate)
Expand Down Expand Up @@ -423,9 +410,9 @@ func TestWorkWithNewBuild(t *testing.T) {
defer bc.stop()
bc.enqueueBuild(build)

bc.work()
bc.buildWork()

if bc.queue.Len() > 0 {
if bc.buildQueue.Len() > 0 {
t.Errorf("Expected queue to be empty")
}
if patchedBuild == nil {
Expand Down
Loading