diff --git a/docs/requirements.txt b/docs/requirements.txt index 2ec1751c4..ec8f36735 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -8,3 +8,5 @@ ipykernel nbsphinx jupytext ipython_genutils +# https://github.com/jupyter/nbconvert/issues/1736 +jinja2<=3.0.3 diff --git a/scripts/kube_dist_trainer.py b/scripts/kube_dist_trainer.py index e1967077d..2c05a3beb 100755 --- a/scripts/kube_dist_trainer.py +++ b/scripts/kube_dist_trainer.py @@ -32,6 +32,9 @@ def register_gpu_resource() -> None: cpu=2, gpu=1, memMB=8 * GiB, + capabilities={ + "node.kubernetes.io/instance-type": "p3.2xlarge", + }, ) print(f"Registering resource: {res}") named_resources["GPU_X1"] = res diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 52c8ad9fe..06f46f35f 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -85,6 +85,9 @@ logger: logging.Logger = logging.getLogger(__name__) +RESERVED_MILLICPU = 100 +RESERVED_MEMMB = 1024 + RETRY_POLICIES: Mapping[str, Iterable[Mapping[str, str]]] = { RetryPolicy.REPLICA: [], RetryPolicy.APPLICATION: [ @@ -152,6 +155,8 @@ ANNOTATION_ISTIO_SIDECAR = "sidecar.istio.io/inject" +LABEL_INSTANCE_TYPE = "node.kubernetes.io/instance-type" + def sanitize_for_serialization(obj: object) -> object: from kubernetes import client @@ -176,21 +181,35 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod V1EmptyDirVolumeSource, ) + # limits puts an upper cap on the resources a pod may consume. + # requests is how much the scheduler allocates. We assume that the jobs will + # be allocation the whole machine so requests is slightly lower than the + # requested resources to account for the Kubernetes node reserved resources. + limits = {} requests = {} resource = role.resource - if resource.cpu >= 0: - requests["cpu"] = f"{int(resource.cpu * 1000)}m" - if resource.memMB >= 0: - requests["memory"] = f"{int(resource.memMB)}M" - if resource.gpu >= 0: - requests["nvidia.com/gpu"] = str(resource.gpu) + if resource.cpu > 0: + mcpu = int(resource.cpu * 1000) + limits["cpu"] = f"{mcpu}m" + request_mcpu = max(mcpu - RESERVED_MILLICPU, 0) + requests["cpu"] = f"{request_mcpu}m" + if resource.memMB > 0: + limits["memory"] = f"{int(resource.memMB)}M" + request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0) + requests["memory"] = f"{request_memMB}M" + if resource.gpu > 0: + requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu) resources = V1ResourceRequirements( - limits=requests, + limits=limits, requests=requests, ) + node_selector: Dict[str, str] = {} + if LABEL_INSTANCE_TYPE in resource.capabilities: + node_selector[LABEL_INSTANCE_TYPE] = resource.capabilities[LABEL_INSTANCE_TYPE] + # To support PyTorch dataloaders we need to set /dev/shm to larger than the # 64M default so we mount an unlimited sized tmpfs directory on it. SHM_VOL = "dshm" @@ -264,6 +283,7 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod restart_policy="Never", service_account_name=service_account, volumes=volumes, + node_selector=node_selector, ), metadata=V1ObjectMeta( annotations={ @@ -416,6 +436,29 @@ class KubernetesScheduler(Scheduler, DockerWorkspace): External docs: https://kubernetes.io/docs/concepts/storage/persistent-volumes/ + **Resources / Allocation** + + To select a specific machine type you can add a capability to your resources + with ``node.kubernetes.io/instance-type`` which will constrain the launched + jobs to nodes of that instance type. + + >>> from torchx import specs + >>> specs.Resource( + ... cpu=4, + ... memMB=16000, + ... gpu=2, + ... capabilities={ + ... "node.kubernetes.io/instance-type": "", + ... }, + ... ) + Resource(...) + + Kubernetes may reserve some memory for the host. TorchX assumes you're + scheduling on whole hosts and thus will automatically reduce the resource + request by a small amount to account for the node reserved CPU and memory. + If you run into scheduling issues you may need to reduce the requested CPU + and memory from the host values. + **Compatibility** .. compatibility:: diff --git a/torchx/schedulers/test/.gitignore b/torchx/schedulers/test/.gitignore new file mode 100644 index 000000000..d3e71ede2 --- /dev/null +++ b/torchx/schedulers/test/.gitignore @@ -0,0 +1 @@ +actors.json diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index b7f64233a..ee7285b52 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -24,6 +24,7 @@ cleanup_str, create_scheduler, role_to_pod, + LABEL_INSTANCE_TYPE, ) SKIP_DOCKER: bool = not has_docker() @@ -124,13 +125,18 @@ def test_role_to_pod(self) -> None: app = _test_app() pod = role_to_pod("name", app.roles[0], service_account="srvacc") - requests = { + limits = { "cpu": "2000m", "memory": "3000M", "nvidia.com/gpu": "4", } + requests = { + "cpu": "1900m", + "memory": "1976M", + "nvidia.com/gpu": "4", + } resources = V1ResourceRequirements( - limits=requests, + limits=limits, requests=requests, ) container = V1Container( @@ -179,6 +185,7 @@ def test_role_to_pod(self) -> None: ), ), ], + node_selector={}, ), metadata=V1ObjectMeta( annotations={ @@ -279,8 +286,8 @@ def test_submit_dryrun(self) -> None: memory: 3000M nvidia.com/gpu: '4' requests: - cpu: 2000m - memory: 3000M + cpu: 1900m + memory: 1976M nvidia.com/gpu: '4' volumeMounts: - mountPath: /dev/shm @@ -288,6 +295,7 @@ def test_submit_dryrun(self) -> None: - mountPath: /dst name: mount-0 readOnly: true + nodeSelector: {{}} restartPolicy: Never volumes: - emptyDir: @@ -348,6 +356,29 @@ def test_volume_mounts(self) -> None: ], ) + def test_instance_type(self) -> None: + scheduler = create_scheduler("test") + role = specs.Role( + name="foo", + image="", + mounts=[], + resource=specs.Resource( + cpu=4, + memMB=4000, + gpu=8, + capabilities={ + LABEL_INSTANCE_TYPE: "some_instance", + }, + ), + ) + pod = role_to_pod("foo", role, service_account="") + self.assertEqual( + pod.spec.node_selector, + { + "node.kubernetes.io/instance-type": "some_instance", + }, + ) + def test_rank0_env(self) -> None: from kubernetes.client.models import ( V1EnvVar, diff --git a/torchx/specs/__init__.py b/torchx/specs/__init__.py index 80c608c08..d819fb501 100644 --- a/torchx/specs/__init__.py +++ b/torchx/specs/__init__.py @@ -13,7 +13,7 @@ from typing import Dict, Optional -import torchx.specs.named_resources_aws as aws_resources +import torchx.specs.named_resources_aws as named_resources_aws from torchx.util.entrypoints import load_group from .api import ( # noqa: F401 F403 @@ -58,14 +58,9 @@ def _load_named_resources() -> Dict[str, Resource]: resource_methods = load_group("torchx.named_resources", default={}) materialized_resources = {} - default = { - "aws_t3.medium": aws_resources.aws_t3_medium(), - "aws_m5.2xlarge": aws_resources.aws_m5_2xlarge(), - "aws_p3.2xlarge": aws_resources.aws_p3_2xlarge(), - "aws_p3.8xlarge": aws_resources.aws_p3_8xlarge(), - } + default = named_resources_aws.NAMED_RESOURCES for name, resource in default.items(): - materialized_resources[name] = resource + materialized_resources[name] = resource() for resource_name, resource_method in resource_methods.items(): materialized_resources[resource_name] = resource_method() materialized_resources["NULL"] = NULL_RESOURCE diff --git a/torchx/specs/named_resources_aws.py b/torchx/specs/named_resources_aws.py index 32c7cab1e..bf2525aec 100644 --- a/torchx/specs/named_resources_aws.py +++ b/torchx/specs/named_resources_aws.py @@ -29,6 +29,8 @@ """ +from typing import Mapping, Callable + from torchx.specs.api import Resource GiB: int = 1024 @@ -39,7 +41,9 @@ def aws_p3_2xlarge() -> Resource: cpu=8, gpu=1, memMB=61 * GiB, - capabilities={}, + capabilities={ + "node.kubernetes.io/instance-type": "p3.2xlarge", + }, ) @@ -48,7 +52,9 @@ def aws_p3_8xlarge() -> Resource: cpu=32, gpu=4, memMB=244 * GiB, - capabilities={}, + capabilities={ + "node.kubernetes.io/instance-type": "p3.8xlarge", + }, ) @@ -57,7 +63,9 @@ def aws_t3_medium() -> Resource: cpu=2, gpu=0, memMB=4 * GiB, - capabilities={}, + capabilities={ + "node.kubernetes.io/instance-type": "t3.medium", + }, ) @@ -66,5 +74,27 @@ def aws_m5_2xlarge() -> Resource: cpu=8, gpu=0, memMB=32 * GiB, - capabilities={}, + capabilities={ + "node.kubernetes.io/instance-type": "m5.2xlarge", + }, + ) + + +def aws_g4dn_xlarge() -> Resource: + return Resource( + cpu=4, + gpu=1, + memMB=16 * GiB, + capabilities={ + "node.kubernetes.io/instance-type": "g4dn.xlarge", + }, ) + + +NAMED_RESOURCES: Mapping[str, Callable[[], Resource]] = { + "aws_t3.medium": aws_t3_medium, + "aws_m5.2xlarge": aws_m5_2xlarge, + "aws_p3.2xlarge": aws_p3_2xlarge, + "aws_p3.8xlarge": aws_p3_8xlarge, + "aws_g4dn.xlarge": aws_g4dn_xlarge, +} diff --git a/torchx/specs/test/named_resources_test_aws.py b/torchx/specs/test/named_resources_test_aws.py index 1e2d8f008..9fb5a0355 100644 --- a/torchx/specs/test/named_resources_test_aws.py +++ b/torchx/specs/test/named_resources_test_aws.py @@ -13,6 +13,7 @@ aws_m5_2xlarge, aws_t3_medium, GiB, + NAMED_RESOURCES, ) @@ -40,3 +41,8 @@ def test_aws_t3_medium(self) -> None: self.assertEqual(2, resource.cpu) self.assertEqual(0, resource.gpu) self.assertEqual(4 * GiB, resource.memMB) + + def test_capabilities(self) -> None: + for name, func in NAMED_RESOURCES.items(): + resource = func() + self.assertIn("node.kubernetes.io/instance-type", resource.capabilities)