Skip to content

support monitor podgroup #1688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func main() {
"Enabling this will ensure there is only one active controller manager.")
flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+
" Now supporting TFJob, PyTorchJob, MXNetJob, XGBoostJob. By default, all supported schemes will be enabled.")
flag.Var(&config.Config.WatchedResources, "enable-watch-resources", "The list of resources that need be watched to trigger reconcile, in the form: Kind.version.group (e.g. TFJob.v1.kubeflow.org)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the example, you can provide pod group resource for easy reference

flag.BoolVar(&enableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling")
flag.StringVar(&gangSchedulerName, "gang-scheduler-name", "volcano", "The scheduler to gang-schedule kubeflow jobs, defaults to volcano")
flag.StringVar(&namespace, "namespace", os.Getenv(commonutil.EnvKubeflowNamespace), "The namespace to monitor kubeflow jobs. If unset, it monitors all namespaces cluster-wide."+
Expand Down
30 changes: 30 additions & 0 deletions pkg/common/util/flag_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package util
Copy link
Member

@johnugeorge johnugeorge Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file used?


import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
)

// GvkListFlag is the custom flag to parse GroupVersionKind list for trial resources.
type GvkListFlag []schema.GroupVersionKind

// Set is the method to convert gvk to string value
func (flag *GvkListFlag) String() string {
gvkStrings := []string{}
for _, x := range []schema.GroupVersionKind(*flag) {
gvkStrings = append(gvkStrings, x.String())
}
return strings.Join(gvkStrings, ",")
}

// Set is the method to set gvk from string flag value
func (flag *GvkListFlag) Set(value string) error {
gvk, _ := schema.ParseKindArg(value)
if gvk == nil {
return fmt.Errorf("Invalid GroupVersionKind: %v", value)
}
*flag = append(*flag, *gvk)
return nil
}
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

package config

import "github.com/kubeflow/training-operator/pkg/common/util"

// Config is the global configuration for the training operator.
var Config struct {
PyTorchInitContainerTemplateFile string
PyTorchInitContainerImage string
MPIKubectlDeliveryImage string

WatchedResources util.GvkListFlag
}

const (
Expand Down
48 changes: 48 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"fmt"
"time"

"github.com/kubeflow/training-operator/pkg/config"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/go-logr/logr"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
Expand Down Expand Up @@ -215,6 +218,51 @@ func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

// inject watching for job related service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplicate?

if err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.PyTorchJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}); err != nil {
return err
}

// inject watching for job related objects,such as PodGroup
if config.Config.WatchedResources != nil {
Copy link
Member

@johnugeorge johnugeorge Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rebase your code and add it for all frameworks? Also, the volcano static watches in the current code should be removed for this dynamic watcher. #1666

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this feature, I did not realize that it was already supported and merged. @_@

gvkList := config.Config.WatchedResources

// Watch for changes in custom resources
for _, gvk := range gvkList {
// Check if CRD is installed on the cluster.
_, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
if meta.IsNoMatchError(err) {
logrus.Info("Job watch error. CRD might be missing. Please install CRD and restart pytorchjob-controller",
"CRD Group ", gvk.Group, ",CRD Version ", gvk.Version, ",CRD Kind ", gvk.Kind)
continue
}
return err
}
// Watch for the CRD changes.
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
&source.Kind{Type: unstructuredJob},
&handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kubeflowv1.PyTorchJob{},
})
if err != nil {
return err
}
logrus.Info("Job watch added successfully",
"CRD Group ", gvk.Group, ",CRD Version ", gvk.Version, ",CRD Kind ", gvk.Kind)
}
}

return nil
}

Expand Down