Skip to content

Commit 4ea97be

Browse files
aivanoufacebook-github-bot
authored andcommitted
Add automatic set of CUDA_VISIBLE_DEVICES for local scheduler (#383)
Summary: Pull Request resolved: #383 The diff adds automatic set of `CUDA_VISIBLE_DEVICES` based on `num_replicas`. Each replica gets the same number of devices The alg. applies only when `CUDA_VISIBLE_DEVICES` is not set The diff uses `nvidia-smi` to determine the number of GPUs #297 #377 Reviewed By: kiukchung Differential Revision: D34064433 fbshipit-source-id: 7ec24c9707e2133fafd6747b4357960b9dd0e253
1 parent f5278cc commit 4ea97be

File tree

2 files changed

+187
-6
lines changed

2 files changed

+187
-6
lines changed

torchx/schedulers/local_scheduler.py

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ class ReplicaParam:
102102
env: Dict[str, str]
103103

104104
# IO stream files
105-
stdout: Optional[str]
106-
stderr: Optional[str]
107-
combined: Optional[str]
105+
stdout: Optional[str] = None
106+
stderr: Optional[str] = None
107+
combined: Optional[str] = None
108108

109109
cwd: Optional[str] = None
110110

@@ -592,6 +592,13 @@ def run_opts(self) -> runopts:
592592
help="if set, prepends CWD to replica's PATH env var"
593593
" making any binaries in CWD take precedence over those in PATH",
594594
)
595+
opts.add(
596+
"auto_set_cuda_visible_devices",
597+
type_=bool,
598+
default=False,
599+
help="sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources."
600+
" Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.",
601+
)
595602
return opts
596603

597604
def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None:
@@ -770,6 +777,66 @@ def _submit_dryrun(
770777
request, lambda p: pprint.pformat(asdict(p), indent=2, width=80)
771778
)
772779

780+
def _get_gpu_device_count(self) -> int:
781+
gpu_cmd = "nvidia-smi -L"
782+
try:
783+
log.debug(f"Running {gpu_cmd}")
784+
result = subprocess.run(
785+
gpu_cmd.split(), capture_output=True, text=True, check=True
786+
)
787+
log.debug(f"Cmd {gpu_cmd} returned: {result}")
788+
gpus_info = [gpu_info for gpu_info in result.stdout.split("\n") if gpu_info]
789+
return len(gpus_info)
790+
except subprocess.CalledProcessError as e:
791+
log.exception(f"Got exception while listing GPUs {e.stderr}")
792+
return 0
793+
794+
def _set_cuda_visible_devices_for_role_replica(
795+
self,
796+
replica: ReplicaParam,
797+
replica_id: int,
798+
requested_gpus: int,
799+
role_gpu_start_idx: int,
800+
) -> None:
801+
if requested_gpus <= 0:
802+
return
803+
start_device = role_gpu_start_idx + requested_gpus * replica_id
804+
end_device = role_gpu_start_idx + requested_gpus * (replica_id + 1)
805+
devices = list(range(start_device, end_device))
806+
visible_devices = ",".join([str(device) for device in devices])
807+
replica.env["CUDA_VISIBLE_DEVICES"] = visible_devices
808+
809+
def _update_env_cuda_visible_devices(
810+
self,
811+
role_params: Dict[str, List[ReplicaParam]],
812+
app: AppDef,
813+
cfg: Mapping[str, CfgVal],
814+
) -> None:
815+
device_count = 0
816+
autoset = cfg.get("auto_set_cuda_visible_devices")
817+
if not autoset:
818+
return
819+
requested_gpus_total = sum(
820+
[role.resource.gpu * role.num_replicas for role in app.roles]
821+
)
822+
if requested_gpus_total <= 0:
823+
return
824+
device_count = self._get_gpu_device_count()
825+
if requested_gpus_total > device_count:
826+
autoset = False
827+
log.warning(
828+
"Cannot set `CUDA_VISIBLE_DEVICES` due to "
829+
f"Available GPUs {device_count} less than requested {requested_gpus_total}"
830+
)
831+
role_gpu_start_idx = 0
832+
for role in app.roles:
833+
role_replicas = role_params[role.name]
834+
for replica_id, replica in enumerate(role_replicas):
835+
self._set_cuda_visible_devices_for_role_replica(
836+
replica, replica_id, role.resource.gpu, role_gpu_start_idx
837+
)
838+
role_gpu_start_idx += role.resource.gpu * role.num_replicas
839+
773840
def _to_popen_request(
774841
self,
775842
app: AppDef,
@@ -785,6 +852,7 @@ def _to_popen_request(
785852

786853
role_params: Dict[str, List[ReplicaParam]] = {}
787854
role_log_dirs: Dict[str, List[str]] = {}
855+
788856
for role in app.roles:
789857
replica_params = role_params.setdefault(role.name, [])
790858
replica_log_dirs = role_log_dirs.setdefault(role.name, [])
@@ -798,8 +866,8 @@ def _to_popen_request(
798866
replica_id=str(replica_id),
799867
)
800868
replica_role = values.apply(role)
801-
replica_log_dir = os.path.join(app_log_dir, role.name, str(replica_id))
802869

870+
replica_log_dir = os.path.join(app_log_dir, role.name, str(replica_id))
803871
if "TORCHELASTIC_ERROR_FILE" not in replica_role.env:
804872
# this is the top level (agent if using elastic role) error file
805873
# a.k.a scheduler reply file
@@ -818,7 +886,7 @@ def _to_popen_request(
818886
)
819887
)
820888
replica_log_dirs.append(replica_log_dir)
821-
889+
self._update_env_cuda_visible_devices(role_params, app, cfg)
822890
return PopenRequest(app_id, app_log_dir, role_params, role_log_dirs)
823891

824892
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import os
1212
import shutil
1313
import signal
14+
import subprocess
1415
import tempfile
1516
import time
1617
import unittest
1718
from contextlib import contextmanager
19+
from dataclasses import dataclass
1820
from datetime import datetime
1921
from os.path import join
2022
from typing import Callable, Generator, Optional
@@ -32,7 +34,7 @@
3234
join_PATH,
3335
make_unique,
3436
)
35-
from torchx.specs.api import AppDef, AppState, Role, is_terminal, macros
37+
from torchx.specs.api import AppDef, AppState, Role, is_terminal, macros, Resource
3638

3739
from .test_util import write_shell_script
3840

@@ -828,6 +830,114 @@ def test_close_twice(self) -> None:
828830
self.scheduler.close()
829831
# nothing to validate just make sure no errors are raised
830832

833+
def test_get_gpu_device_count(self) -> None:
834+
@dataclass
835+
class ProcResult:
836+
stdout: str
837+
838+
nvidia_smi_out = (
839+
"GPU 0: Tesla V100-SXM2-16GB (UUID: GPU-196a22c5-717b-66db-0acc-58cde6f3df85)\n"
840+
"GPU 1: Tesla V100-SXM2-16GB (UUID: GPU-45e9165d-4f7e-d954-7ff5-481bc2c0ec7b)\n"
841+
"GPU 2: Tesla V100-SXM2-16GB (UUID: GPU-26e22503-5fd5-8f55-d068-e1714fbb6fd6)\n"
842+
"GPU 3: Tesla V100-SXM2-16GB (UUID: GPU-ebfc20c7-5f1a-1bc9-0d98-601cbe21fc2d)\n"
843+
)
844+
845+
stdout = nvidia_smi_out
846+
result = ProcResult(stdout)
847+
with patch("subprocess.run", return_value=result):
848+
gpu_count = self.scheduler._get_gpu_device_count()
849+
self.assertEqual(4, gpu_count)
850+
851+
def test_get_gpu_device_count_error(self) -> None:
852+
error = subprocess.CalledProcessError(
853+
returncode=2,
854+
cmd="",
855+
output="",
856+
stderr="",
857+
)
858+
with patch("subprocess.run", side_effect=error):
859+
gpu_count = self.scheduler._get_gpu_device_count()
860+
self.assertEqual(0, gpu_count)
861+
862+
def test_set_cuda_visible_devices_for_role_replica(self) -> None:
863+
replica_param1 = ReplicaParam(
864+
args=["a", "b"],
865+
env={},
866+
cwd="/home/bob",
867+
)
868+
replica_param2 = ReplicaParam(
869+
args=["a", "b"],
870+
env={},
871+
cwd="/home/bob",
872+
)
873+
self.scheduler._set_cuda_visible_devices_for_role_replica(
874+
replica_param1, 0, 4, 0
875+
)
876+
self.assertEqual("0,1,2,3", replica_param1.env["CUDA_VISIBLE_DEVICES"])
877+
self.scheduler._set_cuda_visible_devices_for_role_replica(
878+
replica_param2, 1, 8, 4
879+
)
880+
# start gpu is 4(request_gpu_start=4) + 8(replica_id=1)
881+
self.assertEqual(
882+
"12,13,14,15,16,17,18,19", replica_param2.env["CUDA_VISIBLE_DEVICES"]
883+
)
884+
885+
def test_get_cuda_devices_is_set(self) -> None:
886+
with patch.object(self.scheduler, "_get_gpu_device_count", return_value=16):
887+
appdef = AppDef(
888+
name="role1",
889+
roles=[
890+
Role(
891+
name="role1",
892+
image=self.test_dir,
893+
entrypoint="train",
894+
resource=Resource(gpu=2, cpu=0, memMB=0),
895+
num_replicas=2,
896+
),
897+
Role(
898+
name="role2",
899+
image=self.test_dir,
900+
entrypoint="train",
901+
resource=Resource(gpu=3, cpu=0, memMB=0),
902+
num_replicas=2,
903+
),
904+
],
905+
)
906+
popen_req = self.scheduler._to_popen_request(
907+
appdef, {"auto_set_cuda_visible_devices": True}
908+
)
909+
role1_params = popen_req.role_params["role1"]
910+
self.assertEqual(2, len(role1_params))
911+
self.assertEqual("0,1", role1_params[0].env["CUDA_VISIBLE_DEVICES"])
912+
self.assertEqual("2,3", role1_params[1].env["CUDA_VISIBLE_DEVICES"])
913+
role2_params = popen_req.role_params["role2"]
914+
self.assertEqual(2, len(role2_params))
915+
self.assertEqual("4,5,6", role2_params[0].env["CUDA_VISIBLE_DEVICES"])
916+
self.assertEqual("7,8,9", role2_params[1].env["CUDA_VISIBLE_DEVICES"])
917+
918+
def test_get_cuda_devices_not_set(self) -> None:
919+
with patch.object(self.scheduler, "_get_gpu_device_count", return_value=8):
920+
trainer1 = AppDef(
921+
name="trainer1",
922+
roles=[
923+
Role(
924+
name="trainer1",
925+
image=self.test_dir,
926+
entrypoint="trainer1.sh",
927+
resource=Resource(gpu=4, cpu=0, memMB=0),
928+
num_replicas=4,
929+
)
930+
],
931+
)
932+
933+
popen_req = self.scheduler._to_popen_request(trainer1, {})
934+
role_params = popen_req.role_params["trainer1"]
935+
self.assertEqual(4, len(role_params))
936+
self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[0].env)
937+
self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[1].env)
938+
self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[2].env)
939+
self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[3].env)
940+
831941
def test_no_orphan_process_function(self) -> None:
832942
self._test_orphan_workflow()
833943

@@ -839,6 +949,9 @@ def _test_orphan_workflow(self) -> None:
839949
target=start_sleep_processes, args=(self.test_dir, mp_queue, child_nproc)
840950
)
841951
proc.start()
952+
# Before querying the queue we need to wait
953+
# Otherwise we will get `FileNotFoundError: [Errno 2] No such file or directory` error
954+
time.sleep(10)
842955
total_processes = child_nproc + 1
843956
pids = []
844957
for _ in range(total_processes):

0 commit comments

Comments
 (0)