Skip to content

schedulers/kubernetes_scheduler: add support for resource instance-type node selectors #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ ipykernel
nbsphinx
jupytext
ipython_genutils
# https://github.com/jupyter/nbconvert/issues/1736
jinja2<=3.0.3
3 changes: 3 additions & 0 deletions scripts/kube_dist_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def register_gpu_resource() -> None:
cpu=2,
gpu=1,
memMB=8 * GiB,
capabilities={
"node.kubernetes.io/instance-type": "p3.2xlarge",
},
)
print(f"Registering resource: {res}")
named_resources["GPU_X1"] = res
Expand Down
57 changes: 50 additions & 7 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@

logger: logging.Logger = logging.getLogger(__name__)

RESERVED_MILLICPU = 100
RESERVED_MEMMB = 1024

RETRY_POLICIES: Mapping[str, Iterable[Mapping[str, str]]] = {
RetryPolicy.REPLICA: [],
RetryPolicy.APPLICATION: [
Expand Down Expand Up @@ -152,6 +155,8 @@

ANNOTATION_ISTIO_SIDECAR = "sidecar.istio.io/inject"

LABEL_INSTANCE_TYPE = "node.kubernetes.io/instance-type"


def sanitize_for_serialization(obj: object) -> object:
from kubernetes import client
Expand All @@ -176,21 +181,35 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
V1EmptyDirVolumeSource,
)

# limits puts an upper cap on the resources a pod may consume.
# requests is how much the scheduler allocates. We assume that the jobs will
# be allocation the whole machine so requests is slightly lower than the
# requested resources to account for the Kubernetes node reserved resources.
limits = {}
requests = {}

resource = role.resource
if resource.cpu >= 0:
requests["cpu"] = f"{int(resource.cpu * 1000)}m"
if resource.memMB >= 0:
requests["memory"] = f"{int(resource.memMB)}M"
if resource.gpu >= 0:
requests["nvidia.com/gpu"] = str(resource.gpu)
if resource.cpu > 0:
mcpu = int(resource.cpu * 1000)
limits["cpu"] = f"{mcpu}m"
request_mcpu = max(mcpu - RESERVED_MILLICPU, 0)
requests["cpu"] = f"{request_mcpu}m"
if resource.memMB > 0:
limits["memory"] = f"{int(resource.memMB)}M"
request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0)
requests["memory"] = f"{request_memMB}M"
if resource.gpu > 0:
requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu)

resources = V1ResourceRequirements(
limits=requests,
limits=limits,
requests=requests,
)

node_selector: Dict[str, str] = {}
if LABEL_INSTANCE_TYPE in resource.capabilities:
node_selector[LABEL_INSTANCE_TYPE] = resource.capabilities[LABEL_INSTANCE_TYPE]

# To support PyTorch dataloaders we need to set /dev/shm to larger than the
# 64M default so we mount an unlimited sized tmpfs directory on it.
SHM_VOL = "dshm"
Expand Down Expand Up @@ -264,6 +283,7 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
restart_policy="Never",
service_account_name=service_account,
volumes=volumes,
node_selector=node_selector,
),
metadata=V1ObjectMeta(
annotations={
Expand Down Expand Up @@ -416,6 +436,29 @@ class KubernetesScheduler(Scheduler, DockerWorkspace):

External docs: https://kubernetes.io/docs/concepts/storage/persistent-volumes/

**Resources / Allocation**

To select a specific machine type you can add a capability to your resources
with ``node.kubernetes.io/instance-type`` which will constrain the launched
jobs to nodes of that instance type.

>>> from torchx import specs
>>> specs.Resource(
... cpu=4,
... memMB=16000,
... gpu=2,
... capabilities={
... "node.kubernetes.io/instance-type": "<cloud instance type>",
... },
... )
Resource(...)

Kubernetes may reserve some memory for the host. TorchX assumes you're
scheduling on whole hosts and thus will automatically reduce the resource
request by a small amount to account for the node reserved CPU and memory.
If you run into scheduling issues you may need to reduce the requested CPU
and memory from the host values.

**Compatibility**

.. compatibility::
Expand Down
1 change: 1 addition & 0 deletions torchx/schedulers/test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
actors.json
39 changes: 35 additions & 4 deletions torchx/schedulers/test/kubernetes_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
cleanup_str,
create_scheduler,
role_to_pod,
LABEL_INSTANCE_TYPE,
)

SKIP_DOCKER: bool = not has_docker()
Expand Down Expand Up @@ -124,13 +125,18 @@ def test_role_to_pod(self) -> None:
app = _test_app()
pod = role_to_pod("name", app.roles[0], service_account="srvacc")

requests = {
limits = {
"cpu": "2000m",
"memory": "3000M",
"nvidia.com/gpu": "4",
}
requests = {
"cpu": "1900m",
"memory": "1976M",
"nvidia.com/gpu": "4",
}
resources = V1ResourceRequirements(
limits=requests,
limits=limits,
requests=requests,
)
container = V1Container(
Expand Down Expand Up @@ -179,6 +185,7 @@ def test_role_to_pod(self) -> None:
),
),
],
node_selector={},
),
metadata=V1ObjectMeta(
annotations={
Expand Down Expand Up @@ -279,15 +286,16 @@ def test_submit_dryrun(self) -> None:
memory: 3000M
nvidia.com/gpu: '4'
requests:
cpu: 2000m
memory: 3000M
cpu: 1900m
memory: 1976M
nvidia.com/gpu: '4'
volumeMounts:
- mountPath: /dev/shm
name: dshm
- mountPath: /dst
name: mount-0
readOnly: true
nodeSelector: {{}}
restartPolicy: Never
volumes:
- emptyDir:
Expand Down Expand Up @@ -348,6 +356,29 @@ def test_volume_mounts(self) -> None:
],
)

def test_instance_type(self) -> None:
scheduler = create_scheduler("test")
role = specs.Role(
name="foo",
image="",
mounts=[],
resource=specs.Resource(
cpu=4,
memMB=4000,
gpu=8,
capabilities={
LABEL_INSTANCE_TYPE: "some_instance",
},
),
)
pod = role_to_pod("foo", role, service_account="")
self.assertEqual(
pod.spec.node_selector,
{
"node.kubernetes.io/instance-type": "some_instance",
},
)

def test_rank0_env(self) -> None:
from kubernetes.client.models import (
V1EnvVar,
Expand Down
11 changes: 3 additions & 8 deletions torchx/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from typing import Dict, Optional

import torchx.specs.named_resources_aws as aws_resources
import torchx.specs.named_resources_aws as named_resources_aws
from torchx.util.entrypoints import load_group

from .api import ( # noqa: F401 F403
Expand Down Expand Up @@ -58,14 +58,9 @@
def _load_named_resources() -> Dict[str, Resource]:
resource_methods = load_group("torchx.named_resources", default={})
materialized_resources = {}
default = {
"aws_t3.medium": aws_resources.aws_t3_medium(),
"aws_m5.2xlarge": aws_resources.aws_m5_2xlarge(),
"aws_p3.2xlarge": aws_resources.aws_p3_2xlarge(),
"aws_p3.8xlarge": aws_resources.aws_p3_8xlarge(),
}
default = named_resources_aws.NAMED_RESOURCES
for name, resource in default.items():
materialized_resources[name] = resource
materialized_resources[name] = resource()
for resource_name, resource_method in resource_methods.items():
materialized_resources[resource_name] = resource_method()
materialized_resources["NULL"] = NULL_RESOURCE
Expand Down
38 changes: 34 additions & 4 deletions torchx/specs/named_resources_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

"""

from typing import Mapping, Callable

from torchx.specs.api import Resource

GiB: int = 1024
Expand All @@ -39,7 +41,9 @@ def aws_p3_2xlarge() -> Resource:
cpu=8,
gpu=1,
memMB=61 * GiB,
capabilities={},
capabilities={
"node.kubernetes.io/instance-type": "p3.2xlarge",
},
)


Expand All @@ -48,7 +52,9 @@ def aws_p3_8xlarge() -> Resource:
cpu=32,
gpu=4,
memMB=244 * GiB,
capabilities={},
capabilities={
"node.kubernetes.io/instance-type": "p3.8xlarge",
},
)


Expand All @@ -57,7 +63,9 @@ def aws_t3_medium() -> Resource:
cpu=2,
gpu=0,
memMB=4 * GiB,
capabilities={},
capabilities={
"node.kubernetes.io/instance-type": "t3.medium",
},
)


Expand All @@ -66,5 +74,27 @@ def aws_m5_2xlarge() -> Resource:
cpu=8,
gpu=0,
memMB=32 * GiB,
capabilities={},
capabilities={
"node.kubernetes.io/instance-type": "m5.2xlarge",
},
)


def aws_g4dn_xlarge() -> Resource:
return Resource(
cpu=4,
gpu=1,
memMB=16 * GiB,
capabilities={
"node.kubernetes.io/instance-type": "g4dn.xlarge",
},
)


NAMED_RESOURCES: Mapping[str, Callable[[], Resource]] = {
"aws_t3.medium": aws_t3_medium,
"aws_m5.2xlarge": aws_m5_2xlarge,
"aws_p3.2xlarge": aws_p3_2xlarge,
"aws_p3.8xlarge": aws_p3_8xlarge,
"aws_g4dn.xlarge": aws_g4dn_xlarge,
}
6 changes: 6 additions & 0 deletions torchx/specs/test/named_resources_test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
aws_m5_2xlarge,
aws_t3_medium,
GiB,
NAMED_RESOURCES,
)


Expand Down Expand Up @@ -40,3 +41,8 @@ def test_aws_t3_medium(self) -> None:
self.assertEqual(2, resource.cpu)
self.assertEqual(0, resource.gpu)
self.assertEqual(4 * GiB, resource.memMB)

def test_capabilities(self) -> None:
for name, func in NAMED_RESOURCES.items():
resource = func()
self.assertIn("node.kubernetes.io/instance-type", resource.capabilities)