From f6fef3b5ba6acd5e63d9d127975c33308300f8f7 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 13 Sep 2023 11:10:18 -0400 Subject: [PATCH 1/5] only update etcd when preempted --- .../queuejob/queuejob_controller_ex.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index a25250a8..4b0cfe95 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -345,6 +345,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { newjob.Status.CanRun = false newjob.Status.FilterIgnore = true // update QueueJobState only cleanAppWrapper := false + generatedCondition := false // If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed. if (newjob.Status.State == arbv1.AppWrapperStateActive) && (newjob.Spec.SchedSpec.DispatchDuration.Limit > 0) { if newjob.Spec.SchedSpec.DispatchDuration.Overrun { @@ -372,6 +373,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) } + generatedCondition = true } if ((newjob.Status.Running + newjob.Status.Succeeded) < int32(newjob.Spec.SchedSpec.MinAvailable)) && newjob.Status.State == arbv1.AppWrapperStateActive { @@ -412,7 +414,8 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } updateNewJob = newjob.DeepCopy() - } else { + generatedCondition = true + } else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive { // If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running) index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling") @@ -427,13 +430,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } updateNewJob = newjob.DeepCopy() + generatedCondition = true } - err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") - if err != nil { - klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) - return - } + if generatedCondition { + err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") + if err != nil { + klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) + return + } if cleanAppWrapper { klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) From 7a8e3dc4a2ad6adfa60db0b42933d6a84553a404 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 14 Sep 2023 14:50:54 -0400 Subject: [PATCH 2/5] address review 1 --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 4b0cfe95..ba5a7b11 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -371,9 +371,9 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) + generatedCondition = true } - generatedCondition = true } if ((newjob.Status.Running + newjob.Status.Succeeded) < int32(newjob.Spec.SchedSpec.MinAvailable)) && newjob.Status.State == arbv1.AppWrapperStateActive { From d2d7a3b1ccdf108a26ff86a2c0047dae124d9902 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 28 Sep 2023 20:35:41 -0400 Subject: [PATCH 3/5] cleanup routines and logger --- .../queuejob/queuejob_controller_ex.go | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index ba5a7b11..ae70b837 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *arbv1.AppWrapper: - klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender, t, t) + klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s ", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender) // todo: This is a current workaround for duplicate message bug. // if t.Status.Local == true { // ignore duplicate message from cache // return false @@ -440,14 +440,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { return } - if cleanAppWrapper { - klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) - go qjm.Cleanup(ctx, updateNewJob) - } else { - // Only back-off AWs that are in state running and not in state Failed - if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) - qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + if cleanAppWrapper { + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name) + go qjm.Cleanup(ctx, updateNewJob) + } else { + // Only back-off AWs that are in state running and not in state Failed + if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { + klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) + qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + } } } } @@ -1367,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper func (cc *XController) updateStatusInEtcdWithRetry(ctx context.Context, source *arbv1.AppWrapper, caller string) error { klog.V(4).Infof("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'", source.Namespace, source.Name, source.ResourceVersion, caller) source.Status.Sender = "before " + caller // set Sender string to indicate code location - updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{}) + updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{}) updateStatusRetrierRetrier.SetJitter(0.05) updatedAW := source.DeepCopy() err := updateStatusRetrierRetrier.RunCtx(ctx, func(localContext context.Context) error { @@ -1564,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) { firstTime := metav1.NowMicro() qj, ok := obj.(*arbv1.AppWrapper) if !ok { - klog.Errorf("[Informer-addQJ] object is not AppWrapper. object=%+v", obj) + klog.Errorf("[Informer-addQJ] object is not AppWrapper.") return } - klog.V(6).Infof("[Informer-addQJ] %s/%s &qj=%p qj=%+v", qj.Namespace, qj.Name, qj, qj) + klog.V(6).Infof("[Informer-addQJ] %s/%s", qj.Namespace, qj.Name) if qj.Status.QueueJobState == "" { qj.Status.ControllerFirstTimestamp = firstTime qj.Status.SystemPriority = float64(qj.Spec.Priority) @@ -1602,7 +1603,7 @@ func (cc *XController) addQueueJob(obj interface{}) { // updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate // on stale AWs. This has potential to improve performance at scale. if hasCompletionStatus { - requeueInterval := 5 * time.Second + requeueIntervalForCompletionStatus := 5 * time.Second key, err := cache.MetaNamespaceKeyFunc(qj) if err != nil { klog.Warningf("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status", qj.Namespace, qj.Name) @@ -1610,10 +1611,11 @@ func (cc *XController) addQueueJob(obj interface{}) { } go func() { for { - time.Sleep(requeueInterval) + time.Sleep(requeueIntervalForCompletionStatus) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1648,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) { for { time.Sleep(requeueInterval) latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if err != nil && !exists { - klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name) + if err != nil || !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name) + break } else { var latestAw *arbv1.AppWrapper if latestObj != nil { @@ -1757,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error { err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue if err != nil { - klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, err) + klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err) } else { - klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) } return err } From 50a95b1e57853c312dbce5cfb83c2fa36df8547e Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Mon, 9 Oct 2023 16:21:57 -0400 Subject: [PATCH 4/5] resolve merge --- test/perf-test/job-1000-22:21:17.log | 4 ++++ test/perf-test/job-22:21:04.log | 1 + 2 files changed, 5 insertions(+) create mode 100644 test/perf-test/job-1000-22:21:17.log create mode 100644 test/perf-test/job-22:21:04.log diff --git a/test/perf-test/job-1000-22:21:17.log b/test/perf-test/job-1000-22:21:17.log new file mode 100644 index 00000000..e0be906c --- /dev/null +++ b/test/perf-test/job-1000-22:21:17.log @@ -0,0 +1,4 @@ +Jobs started at: 22:21:17 +All 1000 jobs finished: 23:25:49 +Total amount of time for 1000 appwrappers is: 3272 seconds (debug mode) +2900-3000 seconds diff --git a/test/perf-test/job-22:21:04.log b/test/perf-test/job-22:21:04.log new file mode 100644 index 00000000..5f7464c1 --- /dev/null +++ b/test/perf-test/job-22:21:04.log @@ -0,0 +1 @@ +Jobs started at: 22:21:04 From b704c047d5a26b73001796fc6dc8f4ecf436eb76 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Mon, 9 Oct 2023 16:55:58 -0400 Subject: [PATCH 5/5] rem log files --- test/perf-test/job-1000-22:21:17.log | 4 ---- test/perf-test/job-22:21:04.log | 1 - 2 files changed, 5 deletions(-) delete mode 100644 test/perf-test/job-1000-22:21:17.log delete mode 100644 test/perf-test/job-22:21:04.log diff --git a/test/perf-test/job-1000-22:21:17.log b/test/perf-test/job-1000-22:21:17.log deleted file mode 100644 index e0be906c..00000000 --- a/test/perf-test/job-1000-22:21:17.log +++ /dev/null @@ -1,4 +0,0 @@ -Jobs started at: 22:21:17 -All 1000 jobs finished: 23:25:49 -Total amount of time for 1000 appwrappers is: 3272 seconds (debug mode) -2900-3000 seconds diff --git a/test/perf-test/job-22:21:04.log b/test/perf-test/job-22:21:04.log deleted file mode 100644 index 5f7464c1..00000000 --- a/test/perf-test/job-22:21:04.log +++ /dev/null @@ -1 +0,0 @@ -Jobs started at: 22:21:04