Skip to content

remove pod informer #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 63 additions & 24 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ import (

v1 "k8s.io/api/core/v1"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource"
respod "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/pod"
"k8s.io/apimachinery/pkg/labels"

arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
Expand All @@ -79,9 +77,9 @@ type XController struct {

appwrapperInformer arbinformers.AppWrapperInformer
// resources registered for the AppWrapper
qjobRegisteredResources queuejobresources.RegisteredResources
//qjobRegisteredResources queuejobresources.RegisteredResources
// controllers for these resources
qjobResControls map[arbv1.ResourceType]queuejobresources.Interface
//qjobResControls map[arbv1.ResourceType]queuejobresources.Interface

// Captures all available resources in the cluster
genericresources *genericresource.GenericResources
Expand Down Expand Up @@ -140,9 +138,9 @@ type JobAndClusterAgent struct {
}

// RegisterAllQueueJobResourceTypes - registers all resources
func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
respod.Register(regs)
}
// func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
// respod.Register(regs)
// }

func GetQueueJobKey(obj interface{}) (string, error) {
qj, ok := obj.(*arbv1.AppWrapper)
Expand All @@ -153,6 +151,47 @@ func GetQueueJobKey(obj interface{}) (string, error) {
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
}

//UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file.
//This change is done in an effort to simplify the controller and enable to move to controller runtime.
func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error {

labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name)
pods, errt := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
if errt != nil {
return errt
}

running := int32(FilterPods(pods.Items, v1.PodRunning))
podPhases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
totalResourcesConsumedForPodPhases := clusterstateapi.EmptyResource()
for _, phase := range podPhases {
totalResourcesConsumedForPodPhases.Add(GetPodResourcesByPhase(phase, pods.Items))
}
pending := int32(FilterPods(pods.Items, v1.PodPending))
succeeded := int32(FilterPods(pods.Items, v1.PodSucceeded))
failed := int32(FilterPods(pods.Items, v1.PodFailed))
podsConditionMap := PendingPodsFailedSchd(pods.Items)
klog.V(10).Infof("[UpdateQueueJobStatus] There are %d pods of AppWrapper %s: pending %d, running %d, succeeded %d, failed %d, pendingpodsfailedschd %d, total resource consumed %v",
len(pods.Items), queuejob.Name, pending, running, succeeded, failed, len(podsConditionMap), totalResourcesConsumedForPodPhases)

queuejob.Status.Pending = pending
queuejob.Status.Running = running
queuejob.Status.Succeeded = succeeded
queuejob.Status.Failed = failed
// Total resources by all running pods
queuejob.Status.TotalGPU = int32(totalResourcesConsumedForPodPhases.GPU)
queuejob.Status.TotalCPU = int32(totalResourcesConsumedForPodPhases.MilliCPU)
queuejob.Status.TotalMemory = int32(totalResourcesConsumedForPodPhases.Memory)

queuejob.Status.PendingPodConditions = nil
for podName, cond := range podsConditionMap {
podCond := GeneratePodFailedCondition(podName, cond)
queuejob.Status.PendingPodConditions = append(queuejob.Status.PendingPodConditions, podCond)
}

return nil
}

//allocatableCapacity calculates the capacity available on each node by substracting resources
//consumed by existing pods.
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
Expand Down Expand Up @@ -217,20 +256,20 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *

cc.genericresources = genericresource.NewAppWrapperGenericResource(config)

cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)
//cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
//RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)

// initialize pod sub-resource control
resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
if err != nil {
klog.Errorf("fail to create queuejob resource control")
return nil
}
if !found {
klog.Errorf("queuejob resource type Pod not found")
return nil
}
cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod
// resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
// if err != nil {
// klog.Errorf("fail to create queuejob resource control")
// return nil
// }
// if !found {
// klog.Errorf("queuejob resource type Pod not found")
// return nil
// }
// cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod

appWrapperClient, err := clientset.NewForConfig(cc.config)
if err != nil {
Expand Down Expand Up @@ -816,7 +855,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust

}

err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
err := qjm.UpdateQueueJobStatus(value)
if err != nil {
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
}
Expand All @@ -843,7 +882,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
}

err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
err := qjm.UpdateQueueJobStatus(value)
if err != nil {
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
}
Expand Down Expand Up @@ -1458,7 +1497,7 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
func (cc *XController) Run(stopCh <-chan struct{}) {
go cc.appwrapperInformer.Informer().Run(stopCh)

go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
//go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)

cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)

Expand Down Expand Up @@ -1508,7 +1547,7 @@ func (qjm *XController) UpdateQueueJobs() {
}
}
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(newjob)
err := qjm.UpdateQueueJobStatus(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
continue
Expand Down Expand Up @@ -1911,7 +1950,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e
awNew := qj.DeepCopy()
// we call sync to update pods running, pending,...
if qj.Status.State == arbv1.AppWrapperStateActive {
err := cc.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(awNew)
err := cc.UpdateQueueJobStatus(awNew)
if err != nil {
klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s, err=%+v", qj.Name, err)
return err
Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/queuejob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package queuejob

import (
"strings"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api"
)

func GetXQJFullName(qj *arbv1.AppWrapper) string {
Expand Down Expand Up @@ -77,3 +81,62 @@ func getIndexOfMatchedCondition(aw *arbv1.AppWrapper, condType arbv1.AppWrapperC
}
return index
}

// PendingPodsFailedSchd checks if pods pending have failed scheduling
func PendingPodsFailedSchd(pods []v1.Pod) map[string][]v1.PodCondition {
var podCondition = make(map[string][]v1.PodCondition)
for i := range pods {
if pods[i].Status.Phase == v1.PodPending {
for _, cond := range pods[i].Status.Conditions {
// Hack: ignore pending pods due to co-scheduler FailedScheduling event
// this exists until coscheduler performance issue is resolved.
if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable {
if strings.Contains(cond.Message, "pgName") && strings.Contains(cond.Message, "last") && strings.Contains(cond.Message, "failed") && strings.Contains(cond.Message, "deny") {
// ignore co-scheduled pending pods for coscheduler version:0.22.6
continue
} else if strings.Contains(cond.Message, "optimistic") && strings.Contains(cond.Message, "rejection") && strings.Contains(cond.Message, "PostFilter") ||
strings.Contains(cond.Message, "cannot") && strings.Contains(cond.Message, "find") && strings.Contains(cond.Message, "enough") && strings.Contains(cond.Message, "sibling") {
// ignore co-scheduled pending pods for coscheduler version:0.23.10
continue
} else {
podName := pods[i].Name
podCondition[podName] = append(podCondition[podName], *cond.DeepCopy())
}
}
}
}
}
return podCondition
}

// filterPods returns pods based on their phase.
func FilterPods(pods []v1.Pod, phase v1.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {
result++
}
}
return result
}

//GetPodResourcesByPhase returns pods based on their phase.
func GetPodResourcesByPhase(phase v1.PodPhase, pods []v1.Pod) *clusterstateapi.Resource {
req := clusterstateapi.EmptyResource()
for i := range pods {
if pods[i].Status.Phase == phase {
for _, c := range pods[i].Spec.Containers {
req.Add(clusterstateapi.NewResource(c.Resources.Requests))
}
}
}
return req
}

//GeneratePodFailedCondition returns condition of a AppWrapper condition.
func GeneratePodFailedCondition(podName string, podCondition []v1.PodCondition) arbv1.PendingPodSpec {
return arbv1.PendingPodSpec{
PodName: podName,
Conditions: podCondition,
}
}
29 changes: 0 additions & 29 deletions pkg/controller/queuejobresources/interfaces.go

This file was deleted.

Loading