Skip to content

Commit ff6c2ad

Browse files
mbobrovskyimimowo
andauthored
Automated cherry pick of #4824: Fix getAncestorWorkload if integration disabled. (#4924)
* Fix getAncestorWorkload if integration disabled. Co-authored-by: Michał Woźniak <[email protected]> * With manageJobsWithoutQueueName logic. * Rename GetAncestorJobManagedByKueue to FindAncestorJobManagedByKueue * Add test cases when child jobs have queue-name. * Rename getAncestorWorkload to getWorkloadForObject. * Minor fix. --------- Co-authored-by: Michał Woźniak <[email protected]>
1 parent 3fab3ec commit ff6c2ad

File tree

8 files changed

+554
-108
lines changed

8 files changed

+554
-108
lines changed

pkg/controller/jobframework/interface.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ type JobWithManagedBy interface {
164164
SetManagedBy(*string)
165165
}
166166

167+
// TopLevelJob interface is an optional interface used to indicate
168+
// that the Job owns/manages the Workload object, regardless of the Job
169+
// owner references.
170+
type TopLevelJob interface {
171+
// IsTopLevel returns true if the Job owns/manages the Workload.
172+
IsTopLevel() bool
173+
}
174+
167175
func QueueName(job GenericJob) string {
168176
return QueueNameForObject(job.Object())
169177
}

pkg/controller/jobframework/reconciler.go

Lines changed: 82 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ const (
6767
)
6868

6969
var (
70-
ErrUnknownWorkloadOwner = errors.New("workload owner is unknown")
70+
ErrCyclicOwnership = errors.New("cyclic ownership")
7171
ErrWorkloadOwnerNotFound = errors.New("workload owner not found")
7272
ErrNoMatchingWorkloads = errors.New("no matching workloads")
7373
ErrExtraWorkloads = errors.New("extra workloads")
@@ -292,27 +292,31 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
292292
return ctrl.Result{}, client.IgnoreNotFound(err)
293293
}
294294

295-
isTopLevelJob := true
296-
objectOwner := metav1.GetControllerOf(object)
297-
if objectOwner != nil && IsOwnerManagedByKueue(objectOwner) {
298-
isTopLevelJob = false
295+
var (
296+
ancestorJob client.Object
297+
isTopLevelJob bool
298+
)
299+
300+
if topLevelJob, ok := job.(TopLevelJob); ok && topLevelJob.IsTopLevel() {
301+
// Skipping traversal to top-level ancestor job because this is already a top-level job.
302+
isTopLevelJob = true
303+
} else {
304+
ancestorJob, err = FindAncestorJobManagedByKueue(ctx, r.client, r.record, object, r.manageJobsWithoutQueueName)
305+
if err != nil {
306+
return ctrl.Result{}, err
307+
}
308+
isTopLevelJob = ancestorJob == nil
299309
}
300310

301311
// when manageJobsWithoutQueueName is disabled we only reconcile jobs that either
302312
// have a queue-name label or have a kueue-managed ancestor that has a queue-name label.
303313
if !r.manageJobsWithoutQueueName && QueueName(job) == "" {
304314
if isTopLevelJob {
305-
log.V(3).Info("queue-name label is not set, ignoring the job", "queueName", QueueName(job))
315+
log.V(3).Info("queue-name label is not set, ignoring the job")
306316
return ctrl.Result{}, nil
307317
}
308-
isAncestorJobManaged, err := r.IsAncestorJobManaged(ctx, job.Object())
309-
if err != nil {
310-
log.Error(err, "couldn't check whether an ancestor job is managed by kueue")
311-
return ctrl.Result{}, err
312-
}
313-
if !isAncestorJobManaged {
314-
log.V(3).Info("No kueue-managed ancestors have a queue-name label, ignoring the job",
315-
"parentJob", objectOwner.Name)
318+
if QueueNameForObject(ancestorJob) == "" {
319+
log.V(3).Info("No kueue-managed ancestors have a queue-name label, ignoring the job")
316320
return ctrl.Result{}, nil
317321
}
318322
}
@@ -321,7 +325,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
321325
if !isTopLevelJob {
322326
_, _, finished := job.Finished()
323327
if !finished && !job.IsSuspended() {
324-
if ancestorWorkload, err := r.getAncestorWorkload(ctx, object); err != nil {
328+
if ancestorWorkload, err := r.getWorkloadForObject(ctx, ancestorJob); err != nil {
325329
log.Error(err, "couldn't get an ancestor job workload")
326330
return ctrl.Result{}, err
327331
} else if ancestorWorkload == nil || !workload.IsAdmitted(ancestorWorkload) {
@@ -572,66 +576,91 @@ func (r *JobReconciler) recordAdmissionCheckUpdate(wl *kueue.Workload, job Gener
572576
}
573577
}
574578

575-
// IsAncestorJobManaged checks whether an ancestor job is managed by kueue.
576-
func (r *JobReconciler) IsAncestorJobManaged(ctx context.Context, jobObj client.Object) (bool, error) {
577-
ancestor, err := r.getAncestorJobManagedByKueue(ctx, jobObj)
578-
if err != nil {
579-
return false, err
580-
}
581-
return ancestor != nil, nil
582-
}
583-
584-
// getAncestorWorkload returns the Workload object of the Kueue-managed ancestor job.
585-
func (r *JobReconciler) getAncestorWorkload(ctx context.Context, jobObj client.Object) (*kueue.Workload, error) {
586-
ancestor, err := r.getAncestorJobManagedByKueue(ctx, jobObj)
587-
if err != nil || ancestor == nil {
588-
return nil, err
589-
}
579+
// getWorkloadForObject returns the Workload associated with the given job.
580+
func (r *JobReconciler) getWorkloadForObject(ctx context.Context, jobObj client.Object) (*kueue.Workload, error) {
590581
wlList := kueue.WorkloadList{}
591-
if err := r.client.List(ctx, &wlList, client.InNamespace(ancestor.GetNamespace()), client.MatchingFields{indexer.OwnerReferenceUID: string(ancestor.GetUID())}); client.IgnoreNotFound(err) != nil {
582+
if err := r.client.List(ctx, &wlList, client.InNamespace(jobObj.GetNamespace()), client.MatchingFields{indexer.OwnerReferenceUID: string(jobObj.GetUID())}); client.IgnoreNotFound(err) != nil {
592583
return nil, err
593584
}
594585
if len(wlList.Items) > 0 {
595586
// In theory the job can own multiple Workloads, we cannot do too much about it, maybe log it.
587+
ctrl.LoggerFrom(ctx).V(2).Info(
588+
"WARNING: The job has multiple associated Workloads.",
589+
"job", jobObj.GetName(),
590+
"workloads", klog.KObjSlice(wlList.Items),
591+
)
596592
return &wlList.Items[0], nil
597593
}
598594
return nil, nil
599595
}
600596

601-
// getAncestorJobManagedByKueue traverses controllerRefs to find an ancestor job that is manged by Kueue (ie, it has a queue-name label).
602-
func (r *JobReconciler) getAncestorJobManagedByKueue(ctx context.Context, jobObj client.Object) (client.Object, error) {
597+
// FindAncestorJobManagedByKueue traverses controllerRefs to find the top-level ancestor Job managed by Kueue.
598+
// If manageJobsWithoutQueueName is set to false, it returns only Jobs with a queue-name.
599+
// If manageJobsWithoutQueueName is true, it may return a Job even if it doesn't have a queue-name.
600+
//
601+
// Examples:
602+
//
603+
// With manageJobsWithoutQueueName=false:
604+
// Job -> JobSet -> AppWrapper => nil
605+
// Job (queue-name) -> JobSet (queue-name) -> AppWrapper => JobSet
606+
// Job (queue-name) -> JobSet -> AppWrapper (queue-name) => AppWrapper
607+
// Job (queue-name) -> JobSet (queue-name) -> AppWrapper (queue-name) => AppWrapper
608+
// Job -> JobSet (disabled) -> AppWrapper (queue-name) => AppWrapper
609+
//
610+
// With manageJobsWithoutQueueName=true:
611+
// Job -> JobSet -> AppWrapper => AppWrapper
612+
// Job (queue-name) -> JobSet (queue-name) -> AppWrapper => AppWrapper
613+
// Job (queue-name) -> JobSet -> AppWrapper (queue-name) => AppWrapper
614+
// Job (queue-name) -> JobSet (queue-name) -> AppWrapper (queue-name) => AppWrapper
615+
// Job -> JobSet (disabled) -> AppWrapper => AppWrapper
616+
func FindAncestorJobManagedByKueue(ctx context.Context, c client.Client, record record.EventRecorder, jobObj client.Object, manageJobsWithoutQueueName bool) (client.Object, error) {
617+
log := ctrl.LoggerFrom(ctx)
603618
seen := sets.New[types.UID]()
604-
currentJob := jobObj
619+
currentObj := jobObj
620+
621+
var topLevelJob client.Object
605622
for {
606-
if seen.Has(currentJob.GetUID()) {
607-
return nil, nil
623+
if seen.Has(currentObj.GetUID()) {
624+
log.Error(ErrCyclicOwnership,
625+
"Terminated search for Kueue-managed Job because of cyclic ownership",
626+
"owner", currentObj,
627+
)
628+
return nil, ErrCyclicOwnership
608629
}
609-
seen.Insert(currentJob.GetUID())
630+
seen.Insert(currentObj.GetUID())
610631

611-
owner := metav1.GetControllerOf(currentJob)
612-
if owner == nil || !IsOwnerManagedByKueue(owner) {
613-
return nil, nil
632+
owner := metav1.GetControllerOf(currentObj)
633+
if owner == nil {
634+
return topLevelJob, nil
614635
}
615-
parentJob := GetEmptyOwnerObject(owner)
616-
if parentJob == nil {
617-
return nil, fmt.Errorf("workload owner %v: %w", owner, ErrUnknownWorkloadOwner)
636+
637+
parentObj := GetEmptyOwnerObject(owner)
638+
managed := parentObj != nil
639+
if parentObj == nil {
640+
parentObj = &metav1.PartialObjectMetadata{
641+
TypeMeta: metav1.TypeMeta{
642+
APIVersion: owner.APIVersion,
643+
Kind: owner.Kind,
644+
},
645+
}
618646
}
619-
if err := r.client.Get(ctx, client.ObjectKey{Name: owner.Name, Namespace: jobObj.GetNamespace()}, parentJob); err != nil {
647+
if err := c.Get(ctx, client.ObjectKey{Name: owner.Name, Namespace: jobObj.GetNamespace()}, parentObj); err != nil {
620648
return nil, errors.Join(ErrWorkloadOwnerNotFound, err)
621649
}
622-
if QueueNameForObject(parentJob) != "" {
623-
return parentJob, nil
650+
if managed && (manageJobsWithoutQueueName || QueueNameForObject(parentObj) != "") {
651+
topLevelJob = parentObj
624652
}
625-
currentJob = parentJob
653+
currentObj = parentObj
626654
if len(seen) > managedOwnersChainLimit {
627-
r.record.Eventf(jobObj, corev1.EventTypeWarning, ReasonJobNestingTooDeep,
628-
"Terminated search for Kueue-managed Job because ancestor depth exceeded limit of %d", managedOwnersChainLimit)
629-
ctrl.LoggerFrom(ctx).V(2).Info(
630-
"Terminated search for Kueue-managed Job because ancestor depth exceeded managedOwnersChainlimit",
655+
record.Eventf(jobObj, corev1.EventTypeWarning, ReasonJobNestingTooDeep,
656+
"Terminated search for Kueue-managed Job because ancestor depth exceeded limit of %d", managedOwnersChainLimit,
657+
)
658+
log.V(2).Info(
659+
"WARNING: Terminated search for Kueue-managed Job because ancestor depth exceeded managedOwnersChainLimit",
631660
"limit ", managedOwnersChainLimit,
632-
"lastParentReached", parentJob,
661+
"lastParentReached", parentObj,
633662
)
634-
return nil, nil
663+
return topLevelJob, nil
635664
}
636665
}
637666
}

0 commit comments

Comments
 (0)