diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 94d0c623..3f776778 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -364,15 +364,21 @@ func (qjm *XController) PreemptQueueJobs() { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) continue } + //we need to update AW before analyzing it as a candidate for preemption + updateErr := qjm.UpdateQueueJobStatus(newjob) + if updateErr != nil { + klog.Warningf("[PreemptQueueJobs] update of pod count to AW %v failed hence skipping preemption", newjob.Name) + return + } newjob.Status.CanRun = false newjob.Status.FilterIgnore = true // update QueueJobState only cleanAppWrapper := false // If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed. - if (aw.Status.State == arbv1.AppWrapperStateActive) && (aw.Spec.SchedSpec.DispatchDuration.Limit > 0) { - if aw.Spec.SchedSpec.DispatchDuration.Overrun { - index := getIndexOfMatchedCondition(aw, arbv1.AppWrapperCondPreemptCandidate, "DispatchDeadlineExceeded") + if (newjob.Status.State == arbv1.AppWrapperStateActive) && (newjob.Spec.SchedSpec.DispatchDuration.Limit > 0) { + if newjob.Spec.SchedSpec.DispatchDuration.Overrun { + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "DispatchDeadlineExceeded") if index < 0 { - message = fmt.Sprintf("Dispatch deadline exceeded. allowed to run for %v seconds", aw.Spec.SchedSpec.DispatchDuration.Limit) + message = fmt.Sprintf("Dispatch deadline exceeded. allowed to run for %v seconds", newjob.Spec.SchedSpec.DispatchDuration.Limit) cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "DispatchDeadlineExceeded", message) newjob.Status.Conditions = append(newjob.Status.Conditions, cond) } else { @@ -387,7 +393,7 @@ func (qjm *XController) PreemptQueueJobs() { err := qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded") if err != nil { - klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", aw.Namespace, aw.Name) + klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", newjob.Namespace, newjob.Name) continue } // cannot use cleanup AW, since it puts AW back in running state @@ -398,10 +404,10 @@ func (qjm *XController) PreemptQueueJobs() { } } - if ((aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable)) && aw.Status.State == arbv1.AppWrapperStateActive { - index := getIndexOfMatchedCondition(aw, arbv1.AppWrapperCondPreemptCandidate, "MinPodsNotRunning") + if ((newjob.Status.Running + newjob.Status.Succeeded) < int32(newjob.Spec.SchedSpec.MinAvailable)) && newjob.Status.State == arbv1.AppWrapperStateActive { + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "MinPodsNotRunning") if index < 0 { - message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded) + message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", newjob.Spec.SchedSpec.MinAvailable, newjob.Status.Running, newjob.Status.Succeeded) cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message) newjob.Status.Conditions = append(newjob.Status.Conditions, cond) } else { @@ -409,22 +415,22 @@ func (qjm *XController) PreemptQueueJobs() { newjob.Status.Conditions[index] = *cond.DeepCopy() } - if aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { - aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = aw.Spec.SchedSpec.Requeuing.TimeInSeconds + if newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { + newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = newjob.Spec.SchedSpec.Requeuing.TimeInSeconds } - if aw.Spec.SchedSpec.Requeuing.GrowthType == "exponential" { + if newjob.Spec.SchedSpec.Requeuing.GrowthType == "exponential" { if newjob.Status.RequeueingTimeInSeconds == 0 { - newjob.Status.RequeueingTimeInSeconds += aw.Spec.SchedSpec.Requeuing.TimeInSeconds + newjob.Status.RequeueingTimeInSeconds += newjob.Spec.SchedSpec.Requeuing.TimeInSeconds } else { newjob.Status.RequeueingTimeInSeconds += newjob.Status.RequeueingTimeInSeconds } - } else if aw.Spec.SchedSpec.Requeuing.GrowthType == "linear" { - newjob.Status.RequeueingTimeInSeconds += aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds + } else if newjob.Spec.SchedSpec.Requeuing.GrowthType == "linear" { + newjob.Status.RequeueingTimeInSeconds += newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds } - if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 { - if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Status.RequeueingTimeInSeconds { - newjob.Status.RequeueingTimeInSeconds = aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds + if newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 { + if newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Status.RequeueingTimeInSeconds { + newjob.Status.RequeueingTimeInSeconds = newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds } } @@ -438,7 +444,7 @@ func (qjm *XController) PreemptQueueJobs() { updateNewJob = newjob.DeepCopy() } else { // If pods failed scheduling generate new preempt condition - message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running) + message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running) index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling") // ignore co-scheduler failed scheduling events. This is a temp // work-around until co-scheduler version 0.22.X perf issues are resolved. @@ -455,17 +461,17 @@ func (qjm *XController) PreemptQueueJobs() { err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") if err != nil { - klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", aw.Namespace, aw.Name, err) + klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) continue } if cleanAppWrapper { - klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", aw.Name, aw.Namespace) + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Name, newjob.Namespace) go qjm.Cleanup(ctx, updateNewJob) } else { // Only back-off AWs that are in state running and not in state Failed if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", aw.Name, aw.Namespace) + klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Name, newjob.Namespace) qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) } }