Skip to content

Commit 74560bb

Browse files
Kiuk Chungfacebook-github-bot
authored andcommitted
(torchx/scheduler) make PenvImageProvider add fbpkg download dir to PATH and move PATH manipulation to _to_popen_request so that dryrun captures PATH set by the scheduler (#431)
Summary: Pull Request resolved: #431 Resolves: T115068826 (enables DPP to be run as a separate reader role). Does two things: 1) (OSS + FB) moves PATH manipulation logic from `local_scheduler._popen()` into `local_scheduler._to_popen_request()` so that `dryrun` can better capture `PATH` env var that is manipulated by the local scheduler. 2) (FB-only) Makes PenvImageProvider add the fbpkg root directory to `PATH` of the role. This makes it possible to use entrypoints other than `python` (penv python) with PenvImageProvider, which is the case with DPP worker and master roles. Reviewed By: aivanou Differential Revision: D35066879 fbshipit-source-id: 259457a9fe2af7ca579099b91bbe75f5e2b51ce7
1 parent 453483e commit 74560bb

File tree

2 files changed

+77
-74
lines changed

2 files changed

+77
-74
lines changed

torchx/schedulers/local_scheduler.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,6 @@ def _popen(
656656
role_name: RoleName,
657657
replica_id: int,
658658
replica_params: ReplicaParam,
659-
prepend_cwd: bool,
660659
) -> _LocalReplica:
661660
"""
662661
Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
@@ -678,20 +677,8 @@ def _popen(
678677
# just make sure we override the parent's env vars with the user_defined ones
679678
env = os.environ.copy()
680679
env.update(replica_params.env)
681-
682-
# prepend extra_paths to PATH
683-
env["PATH"] = _join_PATH(*self._extra_paths, env.get("PATH"))
684-
685-
cwd = replica_params.cwd
686-
if cwd:
687-
# if prepend_cwd is set, then prepend cwd to PATH
688-
# making binaries in cwd take precedence to those in PATH
689-
# otherwise append cwd to PATH so that the binaries in PATH
690-
# precede over those in cwd
691-
if prepend_cwd:
692-
env["PATH"] = _join_PATH(cwd, env.get("PATH"))
693-
else:
694-
env["PATH"] = _join_PATH(env.get("PATH"), cwd)
680+
# PATH is a special one, instead of overriding, append
681+
env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH"))
695682

696683
# default to unbuffered python for faster responsiveness locally
697684
env.setdefault("PYTHONUNBUFFERED", "x")
@@ -763,8 +750,6 @@ def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str:
763750
role_name,
764751
replica_id,
765752
replica_params,
766-
# pyre-ignore[6] cfg type checked by runopt.resolve()
767-
dryrun_info._cfg.get("prepend_cwd"),
768753
)
769754
local_app.add_replica(role_name, replica)
770755
self._apps[app_id] = local_app
@@ -789,7 +774,7 @@ def _get_gpu_device_count(self) -> int:
789774
gpus_info = [gpu_info for gpu_info in result.stdout.split("\n") if gpu_info]
790775
return len(gpus_info)
791776
except subprocess.CalledProcessError as e:
792-
log.exception(f"Got exception while listing GPUs {e.stderr}")
777+
log.exception(f"Got exception while listing GPUs: {e.stderr}")
793778
return 0
794779

795780
def _set_cuda_visible_devices_for_role_replica(
@@ -813,18 +798,18 @@ def _update_env_cuda_visible_devices(
813798
app: AppDef,
814799
cfg: Mapping[str, CfgVal],
815800
) -> None:
816-
device_count = 0
817801
autoset = cfg.get("auto_set_cuda_visible_devices")
818802
if not autoset:
819803
return
804+
820805
requested_gpus_total = sum(
821806
[role.resource.gpu * role.num_replicas for role in app.roles]
822807
)
823808
if requested_gpus_total <= 0:
824809
return
810+
825811
device_count = self._get_gpu_device_count()
826812
if requested_gpus_total > device_count:
827-
autoset = False
828813
log.warning(
829814
"Cannot set `CUDA_VISIBLE_DEVICES` due to "
830815
f"Available GPUs {device_count} less than requested {requested_gpus_total}"
@@ -860,6 +845,22 @@ def _to_popen_request(
860845

861846
img_root = image_provider.fetch_role(role)
862847

848+
# prepend extra_paths to PATH
849+
role.env["PATH"] = _join_PATH(*self._extra_paths, role.env.get("PATH"))
850+
cwd = image_provider.get_cwd(role.image)
851+
852+
if cwd:
853+
# if prepend_cwd is set, then prepend cwd to PATH
854+
# making binaries in cwd take precedence to those in PATH
855+
# otherwise append cwd to PATH so that the binaries in PATH
856+
# precede over those in cwd
857+
prepend_cwd = cfg.get("prepend_cwd")
858+
859+
if prepend_cwd:
860+
role.env["PATH"] = _join_PATH(cwd, role.env.get("PATH"))
861+
else:
862+
role.env["PATH"] = _join_PATH(role.env.get("PATH"), cwd)
863+
863864
for replica_id in range(role.num_replicas):
864865
values = macros.Values(
865866
img_root=img_root,

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
LocalDirectoryImageProvider,
3131
LocalScheduler,
3232
ReplicaParam,
33-
create_cwd_scheduler,
3433
_join_PATH,
34+
create_cwd_scheduler,
3535
make_unique,
3636
)
37-
from torchx.specs.api import AppDef, AppState, Role, is_terminal, macros, Resource
37+
from torchx.specs.api import AppDef, AppState, Resource, Role, is_terminal, macros
3838

3939
from .test_util import write_shell_script
4040

@@ -336,24 +336,29 @@ def test_submit_inherit_parent_envs(self) -> None:
336336
os.environ, {"PATH": "/bin:/usr/bin", "TORCHELASTIC_ERROR_FILE": "ignored"}
337337
)
338338
def test_child_process_env_append_cwd(self, popen_mock: MagicMock) -> None:
339-
ignored = 0
340-
self.scheduler._popen(
341-
role_name="ignored",
342-
replica_id=ignored,
343-
replica_params=ReplicaParam(
344-
args=["a", "b"],
345-
env={},
346-
cwd="/home/bob",
347-
stdout=None,
348-
stderr=None,
349-
combined=None,
339+
popen_request = self.scheduler._to_popen_request(
340+
app=AppDef(
341+
name="_",
342+
roles=[
343+
Role(name="foo", image=self.test_dir, env={"PATH": "/home/bob"}),
344+
],
350345
),
351-
prepend_cwd=False,
346+
cfg={"prepend_cwd": False},
347+
)
348+
349+
# the test scheduler is hooked up with LocalDirectoryImageProvider
350+
# which treats the role's image as CWD
351+
replica_params = popen_request.role_params["foo"][0]
352+
self.assertEqual(f"/home/bob:{self.test_dir}", replica_params.env["PATH"])
353+
354+
# _popen actually adds in the os.environ[PATH]
355+
self.scheduler._popen(
356+
role_name="foo", replica_id=0, replica_params=replica_params
352357
)
353358

354359
self.assertEqual(
355360
# for python 3.7 BC get call_args.kwargs by index
356-
"/bin:/usr/bin:/home/bob",
361+
f"/home/bob:{self.test_dir}:/bin:/usr/bin",
357362
popen_mock.call_args[1]["env"]["PATH"],
358363
)
359364

@@ -362,61 +367,58 @@ def test_child_process_env_append_cwd(self, popen_mock: MagicMock) -> None:
362367
os.environ, {"PATH": "/bin:/usr/bin", "TORCHELASTIC_ERROR_FILE": "ignored"}
363368
)
364369
def test_child_process_env_prepend_cwd(self, popen_mock: MagicMock) -> None:
365-
ignored = 0
366-
self.scheduler._popen(
367-
role_name="ignored",
368-
replica_id=ignored,
369-
replica_params=ReplicaParam(
370-
args=["a", "b"],
371-
env={},
372-
cwd="/home/bob",
373-
stdout=None,
374-
stderr=None,
375-
combined=None,
370+
popen_request = self.scheduler._to_popen_request(
371+
app=AppDef(
372+
name="_",
373+
roles=[
374+
Role(name="foo", image=self.test_dir, env={"PATH": "/home/bob"}),
375+
],
376376
),
377-
prepend_cwd=True,
377+
cfg={"prepend_cwd": True},
378+
)
379+
380+
# the test scheduler is hooked up with LocalDirectoryImageProvider
381+
# which treats the role's image as CWD
382+
replica_params = popen_request.role_params["foo"][0]
383+
self.assertEqual(f"{self.test_dir}:/home/bob", replica_params.env["PATH"])
384+
385+
# _popen actually adds in the os.environ[PATH]
386+
self.scheduler._popen(
387+
role_name="foo", replica_id=0, replica_params=replica_params
378388
)
379389

380390
self.assertEqual(
381391
# for python 3.7 BC get call_args.kwargs by index
382-
"/home/bob:/bin:/usr/bin",
392+
f"{self.test_dir}:/home/bob:/bin:/usr/bin",
383393
popen_mock.call_args[1]["env"]["PATH"],
384394
)
385395

386396
@mock.patch("subprocess.Popen")
387397
@mock.patch.dict(os.environ, {"TORCHELASTIC_ERROR_FILE": "ignored"}, clear=True)
388398
def test_child_process_env_none(self, popen_mock: MagicMock) -> None:
389-
ignored = 0
390-
self.scheduler._popen(
391-
role_name="ignored",
392-
replica_id=ignored,
393-
replica_params=ReplicaParam(
394-
args=["a", "b"],
395-
env={},
396-
cwd="/home/bob",
397-
stdout=None,
398-
stderr=None,
399-
combined=None,
399+
popen_request = self.scheduler._to_popen_request(
400+
app=AppDef(
401+
name="_",
402+
roles=[Role(name="foo", image=self.test_dir)],
400403
),
401-
prepend_cwd=True,
404+
cfg={},
402405
)
403-
self.assertEqual("/home/bob", popen_mock.call_args[1]["env"]["PATH"])
404406

407+
# the test scheduler is hooked up with LocalDirectoryImageProvider
408+
# which treats the role's image as CWD
409+
replica_params = popen_request.role_params["foo"][0]
410+
self.assertEqual(f"{self.test_dir}", replica_params.env["PATH"])
411+
412+
# _popen actually adds in the os.environ[PATH]
405413
self.scheduler._popen(
406-
role_name="ignored",
407-
replica_id=ignored,
408-
replica_params=ReplicaParam(
409-
args=["a", "b"],
410-
env={},
411-
cwd=None,
412-
stdout=None,
413-
stderr=None,
414-
combined=None,
415-
),
416-
prepend_cwd=True,
414+
role_name="foo", replica_id=0, replica_params=replica_params
415+
)
416+
417+
self.assertEqual(
418+
# for python 3.7 BC get call_args.kwargs by index
419+
f"{self.test_dir}",
420+
popen_mock.call_args[1]["env"]["PATH"],
417421
)
418-
# for python 3.7 BC get call_args.kwargs by index
419-
self.assertFalse(popen_mock.call_args[1]["env"]["PATH"])
420422

421423
@mock.patch.dict(os.environ, {"FOO": "bar"})
422424
def test_submit_override_parent_env(self) -> None:

0 commit comments

Comments
 (0)