Skip to content

feat(sdk/backend): Add support for placeholders in resource limits #11501

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 100 additions & 31 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,35 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl
return execution, nil
}

// getPodResource will accept the new field that accepts placeholders (e.g. resourceMemoryLimit) and the old float64
// field (e.g. memoryLimit) and return the resolved value as a Quantity. If the returned Quantity is nil, it was not set
// by the user. If the new field is set, the old field is ignored.
func getPodResource(
new string, old float64, executorInput *pipelinespec.ExecutorInput, oldFmtStr string,
) (*k8sres.Quantity, error) {
var resolved string

if new != "" {
var err error

resolved, err = resolvePodSpecInputRuntimeParameter(new, executorInput)
if err != nil {
return nil, fmt.Errorf("failed to resolve executor input when retrieving pod resource: %w", err)
}
} else if old != 0 {
resolved = fmt.Sprintf(oldFmtStr, old)
} else {
return nil, nil
}

q, err := k8sres.ParseQuantity(resolved)
if err != nil {
return nil, err
}

return &q, nil
}

// initPodSpecPatch generates a strategic merge patch for pod spec, it is merged
// to container base template generated in compiler/container.go. Therefore, only
// dynamic values are patched here. The volume mounts / configmap mounts are
Expand Down Expand Up @@ -414,46 +443,86 @@ func initPodSpecPatch(
Limits: map[k8score.ResourceName]k8sres.Quantity{},
Requests: map[k8score.ResourceName]k8sres.Quantity{},
}
memoryLimit := container.GetResources().GetMemoryLimit()
if memoryLimit != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryLimit))
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
res.Limits[k8score.ResourceMemory] = q

memoryLimit, err := getPodResource(
container.GetResources().GetResourceMemoryLimit(),
container.GetResources().GetMemoryLimit(),
executorInput,
"%vG",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
memoryRequest := container.GetResources().GetMemoryRequest()
if memoryRequest != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%vG", memoryRequest))
if err != nil {
return nil, err
}
res.Requests[k8score.ResourceMemory] = q
if memoryLimit != nil {
res.Limits[k8score.ResourceMemory] = *memoryLimit
}
cpuLimit := container.GetResources().GetCpuLimit()
if cpuLimit != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuLimit))
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
res.Limits[k8score.ResourceCPU] = q

memoryRequest, err := getPodResource(
container.GetResources().GetResourceMemoryRequest(),
container.GetResources().GetMemoryRequest(),
executorInput,
"%vG",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
cpuRequest := container.GetResources().GetCpuRequest()
if cpuRequest != 0 {
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", cpuRequest))
if err != nil {
return nil, err
}
res.Requests[k8score.ResourceCPU] = q
if memoryRequest != nil {
res.Requests[k8score.ResourceMemory] = *memoryRequest
}

cpuLimit, err := getPodResource(
container.GetResources().GetResourceCpuLimit(),
container.GetResources().GetCpuLimit(),
executorInput,
"%v",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
if cpuLimit != nil {
res.Limits[k8score.ResourceCPU] = *cpuLimit
}

cpuRequest, err := getPodResource(
container.GetResources().GetResourceCpuRequest(),
container.GetResources().GetCpuRequest(),
executorInput,
"%v",
)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
if cpuRequest != nil {
res.Requests[k8score.ResourceCPU] = *cpuRequest
}

accelerator := container.GetResources().GetAccelerator()
if accelerator != nil {
if accelerator.GetType() != "" && accelerator.GetCount() > 0 {
acceleratorType, err := resolvePodSpecInputRuntimeParameter(accelerator.GetType(), executorInput)
var acceleratorType string
if accelerator.GetResourceType() != "" {
acceleratorType, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceType(), executorInput)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
q, err := k8sres.ParseQuantity(fmt.Sprintf("%v", accelerator.GetCount()))
} else if accelerator.GetType() != "" {
acceleratorType = accelerator.GetType()
}

var acceleratorCount string

if accelerator.GetResourceCount() != "" {
var err error

acceleratorCount, err = resolvePodSpecInputRuntimeParameter(accelerator.GetResourceCount(), executorInput)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
} else if accelerator.Count > 0 {
acceleratorCount = fmt.Sprintf("%v", accelerator.GetCount())
}

if acceleratorType != "" && acceleratorCount != "" {
q, err := k8sres.ParseQuantity(acceleratorCount)
if err != nil {
return nil, fmt.Errorf("failed to init podSpecPatch: %w", err)
}
Expand Down
132 changes: 132 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"testing"

"google.golang.org/protobuf/types/known/structpb"
k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -259,6 +260,137 @@ func Test_initPodSpecPatch_acceleratorConfig(t *testing.T) {
}
}

func Test_initPodSpecPatch_resource_placeholders(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{
ResourceCpuRequest: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}",
ResourceCpuLimit: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}",
ResourceMemoryRequest: "{{$.inputs.parameters['pipelinechannel--memory_request']}}",
ResourceMemoryLimit: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}",
Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{
ResourceType: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}",
ResourceCount: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}",
},
},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
ParameterValues: map[string]*structpb.Value{
"cpu_request": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_request']}}",
},
},
"pipelinechannel--cpu_request": {
Kind: &structpb.Value_StringValue{
StringValue: "200m",
},
},
"cpu_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--cpu_limit']}}",
},
},
"pipelinechannel--cpu_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "400m",
},
},
"memory_request": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--memory_request']}}",
},
},
"pipelinechannel--memory_request": {
Kind: &structpb.Value_StringValue{
StringValue: "100Mi",
},
},
"memory_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--memory_limit']}}",
},
},
"pipelinechannel--memory_limit": {
Kind: &structpb.Value_StringValue{
StringValue: "500Mi",
},
},
"accelerator_type": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_type']}}",
},
},
"pipelinechannel--accelerator_type": {
Kind: &structpb.Value_StringValue{
StringValue: "nvidia.com/gpu",
},
},
"accelerator_count": {
Kind: &structpb.Value_StringValue{
StringValue: "{{$.inputs.parameters['pipelinechannel--accelerator_count']}}",
},
},
"pipelinechannel--accelerator_count": {
Kind: &structpb.Value_StringValue{
StringValue: "1",
},
},
},
},
}

podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.Containers, 1)

res := podSpec.Containers[0].Resources
assert.Equal(t, k8sres.MustParse("200m"), res.Requests[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("400m"), res.Limits[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}

func Test_initPodSpecPatch_legacy_resources(t *testing.T) {
containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{
Image: "python:3.9",
Args: []string{"--function_to_execute", "add"},
Command: []string{"sh", "-ec", "python3 -m kfp.components.executor_main"},
Resources: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec{
CpuRequest: 200,
CpuLimit: 400,
ResourceMemoryRequest: "100Mi",
ResourceMemoryLimit: "500Mi",
Accelerator: &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec_ResourceSpec_AcceleratorConfig{
Type: "nvidia.com/gpu",
Count: 1,
},
},
}
componentSpec := &pipelinespec.ComponentSpec{}
executorInput := &pipelinespec.ExecutorInput{}

podSpec, err := initPodSpecPatch(
containerSpec, componentSpec, executorInput, 27, "test", "0254beba-0be4-4065-8d97-7dc5e3adf300",
)
assert.Nil(t, err)
assert.Len(t, podSpec.Containers, 1)

res := podSpec.Containers[0].Resources
assert.Equal(t, k8sres.MustParse("200"), res.Requests[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("400"), res.Limits[k8score.ResourceCPU])
assert.Equal(t, k8sres.MustParse("100Mi"), res.Requests[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("500Mi"), res.Limits[k8score.ResourceMemory])
assert.Equal(t, k8sres.MustParse("1"), res.Limits[k8score.ResourceName("nvidia.com/gpu")])
}

func Test_makeVolumeMountPatch(t *testing.T) {
type args struct {
pvcMount []*kubernetesplatform.PvcMount
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/apiserver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENS
github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/pipeline-loops/LICENSE,Apache-2.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler/pkg/apis/exithandler,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-exithandler/LICENSE,Apache-2.0
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/apis/kfptask,https://github.com/kubeflow/kfp-tekton/blob/0b894195443c/tekton-catalog/tekton-kfptask/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/launcher.csv
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v1.16.7/LICENSE,Apache-2.0
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.6/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/da804407ad31/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/873e9dedd766/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/da804407ad31/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/lestrrat-go/strftime,https://github.com/lestrrat-go/strftime/blob/v1.0.4/LICENSE,MIT
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20240417221339-0b894195443c
github.com/kubeflow/pipelines/api v0.0.0-20240416215826-da804407ad31
github.com/kubeflow/pipelines/api v0.0.0-20250102152816-873e9dedd766
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20240416215826-da804407ad31
github.com/lestrrat-go/strftime v1.0.4
Expand Down
5 changes: 2 additions & 3 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading