Skip to content

Commit 00f4d52

Browse files
authored
fix volcano podgroup update issue (#2079)
* fix volcano podgroup update issue Signed-off-by: Weiyu Yen <[email protected]> * queue value shouldn't be reset once it has been set Signed-off-by: Weiyu Yen <[email protected]> * make queue immutable Signed-off-by: Weiyu Yen <[email protected]> * add unit test Signed-off-by: Weiyu Yen <[email protected]> * add retry for update operation Signed-off-by: Weiyu Yen <[email protected]> --------- Signed-off-by: Weiyu Yen <[email protected]>
1 parent 7b9c73e commit 00f4d52

File tree

9 files changed

+53
-1
lines changed

9 files changed

+53
-1
lines changed

manifests/base/crds/kubeflow.org_mpijobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7330,6 +7330,9 @@ spec:
73307330
type: string
73317331
queue:
73327332
type: string
7333+
x-kubernetes-validations:
7334+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
7335+
rule: self == oldSelf
73337336
scheduleTimeoutSeconds:
73347337
format: int32
73357338
type: integer

manifests/base/crds/kubeflow.org_mxjobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7333,6 +7333,9 @@ spec:
73337333
type: string
73347334
queue:
73357335
type: string
7336+
x-kubernetes-validations:
7337+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
7338+
rule: self == oldSelf
73367339
scheduleTimeoutSeconds:
73377340
format: int32
73387341
type: integer

manifests/base/crds/kubeflow.org_paddlejobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7812,6 +7812,9 @@ spec:
78127812
type: string
78137813
queue:
78147814
type: string
7815+
x-kubernetes-validations:
7816+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
7817+
rule: self == oldSelf
78157818
scheduleTimeoutSeconds:
78167819
format: int32
78177820
type: integer

manifests/base/crds/kubeflow.org_pytorchjobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7849,6 +7849,9 @@ spec:
78497849
type: string
78507850
queue:
78517851
type: string
7852+
x-kubernetes-validations:
7853+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
7854+
rule: self == oldSelf
78527855
scheduleTimeoutSeconds:
78537856
format: int32
78547857
type: integer

manifests/base/crds/kubeflow.org_tfjobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ spec:
9090
type: string
9191
queue:
9292
type: string
93+
x-kubernetes-validations:
94+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
95+
rule: self == oldSelf
9396
scheduleTimeoutSeconds:
9497
format: int32
9598
type: integer

manifests/base/crds/kubeflow.org_xgboostjobs.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ spec:
8686
type: string
8787
queue:
8888
type: string
89+
x-kubernetes-validations:
90+
- message: spec.runPolicy.schedulingPolicy.queue is immutable
91+
rule: self == oldSelf
8992
scheduleTimeoutSeconds:
9093
format: int32
9194
type: integer

pkg/apis/kubeflow.org/v1/common_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ type RunPolicy struct {
226226
// SchedulingPolicy encapsulates various scheduling policies of the distributed training
227227
// job, for example `minAvailable` for gang-scheduling.
228228
type SchedulingPolicy struct {
229-
MinAvailable *int32 `json:"minAvailable,omitempty"`
229+
MinAvailable *int32 `json:"minAvailable,omitempty"`
230+
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="spec.runPolicy.schedulingPolicy.queue is immutable"
230231
Queue string `json:"queue,omitempty"`
231232
MinResources *map[v1.ResourceName]resource.Quantity `json:"minResources,omitempty"`
232233
PriorityClass string `json:"priorityClass,omitempty"`

pkg/controller.v1/common/job.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,11 @@ func (jc *JobController) ReconcileJobs(
284284
if !match {
285285
return fmt.Errorf("unable to recognize PodGroup: %v", klog.KObj(pg))
286286
}
287+
288+
if q := volcanoPodGroup.Spec.Queue; len(q) > 0 {
289+
queue = q
290+
}
291+
287292
volcanoPodGroup.Spec = volcanov1beta1.PodGroupSpec{
288293
MinMember: minMember,
289294
Queue: queue,

pkg/controller.v1/pytorch/pytorchjob_controller_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,34 @@ var _ = Describe("PyTorchJob controller", func() {
195195
cond := getCondition(created.Status, kubeflowv1.JobSucceeded)
196196
Expect(cond.Status).To(Equal(corev1.ConditionTrue))
197197
})
198+
It("Shouldn't be updated resources if spec.runPolicy.schedulingPolicy.queue is changed after the job is created", func() {
199+
By("Creating a PyTorchJob with a specific queue")
200+
job.Spec.RunPolicy.SchedulingPolicy = &kubeflowv1.SchedulingPolicy{}
201+
job.Spec.RunPolicy.SchedulingPolicy.Queue = "initial-queue"
202+
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())
203+
204+
By("Attempting to update the PyTorchJob with a different queue value")
205+
updatedJob := &kubeflowv1.PyTorchJob{}
206+
Eventually(func() bool {
207+
err := testK8sClient.Get(ctx, jobKey, updatedJob)
208+
return err == nil
209+
}, testutil.Timeout, testutil.Interval).Should(BeTrue(), "Failed to get PyTorchJob")
210+
211+
Eventually(func() bool {
212+
updatedJob.Spec.RunPolicy.SchedulingPolicy.Queue = "test"
213+
err := testK8sClient.Update(ctx, updatedJob)
214+
By("Checking that the queue update fails")
215+
Expect(err).To(HaveOccurred(), "Expected an error when updating the queue, but update succeeded")
216+
Expect(err).To(MatchError(ContainSubstring("spec.runPolicy.schedulingPolicy.queue is immutable"), "The error message did not contain the expected message"))
217+
return err != nil
218+
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
219+
220+
By("Validating the queue was not updated")
221+
freshJob := &kubeflowv1.PyTorchJob{}
222+
Expect(testK8sClient.Get(ctx, client.ObjectKeyFromObject(job), freshJob)).Should(Succeed(), "Failed to get PyTorchJob after update attempt")
223+
Expect(freshJob.Spec.RunPolicy.SchedulingPolicy.Queue).To(Equal("initial-queue"), "The queue should remain as the initial value since it should be immutable")
224+
225+
})
198226

199227
It("Shouldn't create resources if PyTorchJob is suspended", func() {
200228
By("By creating a new PyTorchJob with suspend=true")

0 commit comments

Comments
 (0)