Skip to content

Commit 3c41b4e

Browse files
committed
torchx/runner,cli: add patching support via WorkspaceScheduler
1 parent c56cad6 commit 3c41b4e

File tree

6 files changed

+217
-15
lines changed

6 files changed

+217
-15
lines changed

torchx/cli/cmd_run.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from pyre_extensions import none_throws
1818
from torchx.cli.cmd_base import SubCommand
1919
from torchx.cli.cmd_log import get_logs
20-
from torchx.runner import Runner, config, get_runner
20+
from torchx.runner import Runner, config
21+
from torchx.runner.workspaces import get_workspace_runner, WorkspaceRunner
2122
from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories
2223
from torchx.specs import CfgVal
2324
from torchx.specs.finder import (
@@ -129,7 +130,7 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
129130
nargs=argparse.REMAINDER,
130131
)
131132

132-
def _run(self, runner: Runner, args: argparse.Namespace) -> None:
133+
def _run(self, runner: WorkspaceRunner, args: argparse.Namespace) -> None:
133134
if args.scheduler == "local":
134135
logger.warning(
135136
"`local` scheduler is deprecated and will be"
@@ -138,10 +139,14 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
138139
" (e.g. `local_cwd`)"
139140
)
140141

141-
run_opts = get_runner().run_opts()
142+
run_opts = runner.run_opts()
142143
scheduler_opts = run_opts[args.scheduler]
143144
cfg = _parse_run_config(args.scheduler_args, scheduler_opts)
144145
config.apply(scheduler=args.scheduler, cfg=cfg)
146+
config_files = config.find_configs()
147+
workspace = (
148+
"file://" + os.path.dirname(config_files[0]) if config_files else None
149+
)
145150

146151
if len(args.conf_args) < 1:
147152
none_throws(self._subparser).error(
@@ -155,7 +160,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
155160
try:
156161
if args.dryrun:
157162
dryrun_info = runner.dryrun_component(
158-
conf_file, conf_args, args.scheduler, cfg
163+
conf_file, conf_args, args.scheduler, workspace, cfg
159164
)
160165
logger.info(
161166
"\n=== APPLICATION ===\n"
@@ -168,6 +173,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
168173
conf_file,
169174
conf_args,
170175
args.scheduler,
176+
workspace,
171177
cfg,
172178
)
173179
# DO NOT delete this line. It is used by slurm tests to retrieve the app id
@@ -200,7 +206,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
200206

201207
def run(self, args: argparse.Namespace) -> None:
202208
os.environ["TORCHX_CONTEXT_NAME"] = os.getenv("TORCHX_CONTEXT_NAME", "cli_run")
203-
with get_runner() as runner:
209+
with get_workspace_runner() as runner:
204210
self._run(runner, args)
205211

206212
def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:

torchx/runner/config.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,13 @@ def my_component(a: int) -> specs.AppDef:
9696
import configparser as configparser
9797
import logging
9898
from pathlib import Path
99-
from typing import Dict, List, Optional, TextIO
99+
from typing import Dict, List, Optional, TextIO, Iterable
100100

101101
from torchx.schedulers import Scheduler, get_schedulers
102102
from torchx.specs import CfgVal, get_type_name
103103
from torchx.specs.api import runopt
104104

105+
CONFIG_FILE = ".torchxconfig"
105106

106107
_NONE = "None"
107108

@@ -235,15 +236,28 @@ def apply(
235236
Then after the method call, ``cfg={"foo":"bar","hello":"world"}``.
236237
"""
237238

239+
for configfile in find_configs(dirs):
240+
with open(configfile, "r") as f:
241+
load(scheduler, f, cfg)
242+
log.info(f"loaded configs from {configfile}")
243+
244+
245+
def find_configs(dirs: Optional[Iterable[str]] = None) -> List[str]:
246+
"""
247+
find_configs returns all the .torchxconfig files it finds in the specified
248+
directories. If directories is empty it checks the local directory.
249+
"""
238250
if not dirs:
239251
dirs = [str(Path.cwd())]
240252

253+
config_files = []
254+
241255
for d in dirs:
242-
configfile = Path(d) / ".torchxconfig"
256+
configfile = Path(d) / CONFIG_FILE
243257
if configfile.exists():
244-
with open(str(configfile), "r") as f:
245-
load(scheduler, f, cfg)
246-
log.info(f"loaded configs from {configfile}")
258+
config_files.append(str(configfile))
259+
260+
return config_files
247261

248262

249263
def load(scheduler: str, f: TextIO, cfg: Dict[str, CfgVal]) -> None:

torchx/runner/test/workspaces_test.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import unittest
2+
from unittest.mock import (
3+
MagicMock,
4+
call
5+
)
6+
7+
from torchx.specs.api import (
8+
AppDryRunInfo
9+
)
10+
from torchx.runner.workspaces import (
11+
get_workspace_runner,
12+
WorkspaceRunner,
13+
)
14+
from torchx.schedulers.api import (
15+
WorkspaceScheduler,
16+
)
17+
18+
class WorkspaceRunnerTest(unittest.TestCase):
19+
def test_get_workspace_runner(self) -> None:
20+
self.assertIsInstance(get_workspace_runner(), WorkspaceRunner)
21+
22+
def test_workspace_runner(self) -> None:
23+
scheduler = MagicMock(spec=WorkspaceScheduler)
24+
def submit_dryrun(app, cfg) -> AppDryRunInfo[str]:
25+
self.assertEqual(app.roles[0].image, "$img")
26+
27+
dryrun_info = AppDryRunInfo("req", str)
28+
dryrun_info._app = app
29+
return dryrun_info
30+
scheduler.submit_dryrun = submit_dryrun
31+
scheduler.build_workspace_image.return_value = "$img"
32+
runner = WorkspaceRunner(
33+
"workspaces_test",
34+
schedulers={
35+
"mock": scheduler,
36+
}
37+
)
38+
app_args = ["--image", "dummy_image", "--script", "test.py"]
39+
workspace = "memory:///foo"
40+
ret = runner.run_component("dist.ddp", app_args, "mock", workspace)
41+
42+
self.assertEqual(scheduler.build_workspace_image.mock_calls, [
43+
call("dummy_image", workspace),
44+
])
45+

torchx/runner/workspaces.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Facebook, Inc. and its affiliates.
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under the BSD-style license found in the
6+
# LICENSE file in the root directory of this source tree.
7+
8+
"""
9+
This contains an experimental patching runner that can overlay a workspace on
10+
top of the provided image. This allows for fast iterations without having to
11+
rebuild a new image with your application code.
12+
13+
The workspace is a fsspec filesystem that gets walked and overlaid on the image.
14+
This allows having multiple different interfaces such as from Jupyter Notebooks
15+
as well as local file systems.
16+
"""
17+
18+
import logging
19+
import warnings
20+
from typing import List, Mapping, Optional
21+
22+
from torchx.runner.api import Runner
23+
from torchx.schedulers import get_schedulers
24+
from torchx.schedulers.api import WorkspaceScheduler
25+
from torchx.specs import (
26+
from_function,
27+
AppDef,
28+
SchedulerBackend,
29+
AppHandle,
30+
CfgVal,
31+
AppDryRunInfo,
32+
)
33+
from torchx.specs.finder import get_component
34+
35+
log: logging.Logger = logging.getLogger(__name__)
36+
37+
38+
class WorkspaceRunner(Runner):
39+
"""
40+
WorkspaceRunner is a special runner that takes an optional workspace
41+
argument for the run and dryrun_component methods. If a workspace is
42+
specified a new image will be built with the workspace overlaid on top.
43+
"""
44+
45+
def run_component(
46+
self,
47+
component: str,
48+
component_args: List[str],
49+
scheduler: SchedulerBackend,
50+
workspace: Optional[str],
51+
cfg: Optional[Mapping[str, CfgVal]] = None,
52+
) -> AppHandle:
53+
dryrun_info = self.dryrun_component(
54+
component,
55+
component_args,
56+
scheduler,
57+
workspace,
58+
cfg,
59+
)
60+
return self.schedule(dryrun_info)
61+
62+
def dryrun_component(
63+
self,
64+
component: str,
65+
component_args: List[str],
66+
scheduler: SchedulerBackend,
67+
workspace: Optional[str],
68+
cfg: Optional[Mapping[str, CfgVal]] = None,
69+
) -> AppDryRunInfo:
70+
component_def = get_component(component)
71+
app = from_function(component_def.fn, component_args)
72+
return self.dryrun(app, scheduler, workspace, cfg)
73+
74+
def dryrun(
75+
self,
76+
app: AppDef,
77+
scheduler: SchedulerBackend,
78+
workspace: Optional[str],
79+
cfg: Optional[Mapping[str, CfgVal]] = None,
80+
) -> AppDryRunInfo:
81+
if workspace:
82+
self._patch_app(app, scheduler, workspace)
83+
84+
return super().dryrun(app, scheduler, cfg)
85+
86+
def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None:
87+
sched = self._scheduler(scheduler)
88+
if not isinstance(sched, WorkspaceScheduler):
89+
warnings.warn(
90+
f"can't apply workspace to image since {sched} is not a "
91+
"WorkspaceScheduler"
92+
)
93+
return
94+
95+
log.info(f"building patch images for workspace: {workspace}...")
96+
97+
images = {}
98+
for role in app.roles:
99+
img = images.get(role.image)
100+
if not img:
101+
img = sched.build_workspace_image(role.image, workspace)
102+
images[role.image] = img
103+
role.image = img
104+
105+
def __enter__(self) -> "WorkspaceRunner":
106+
return self
107+
108+
109+
def get_workspace_runner(
110+
name: Optional[str] = None, **scheduler_params: object
111+
) -> WorkspaceRunner:
112+
"""
113+
Returns a WorkspaceRunner. See torchx.runner.get_runner for more info.
114+
"""
115+
if not name:
116+
name = "torchx"
117+
118+
schedulers = get_schedulers(session_name=name, **scheduler_params)
119+
return WorkspaceRunner(name, schedulers)

torchx/schedulers/api.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,26 @@ def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None:
280280
)
281281

282282

283+
class WorkspaceScheduler(Scheduler):
284+
"""
285+
WorkspaceScheduler is a Scheduler that has workspace support.
286+
287+
Experimental: this interface may change without notice.
288+
"""
289+
290+
@abc.abstractmethod
291+
def build_workspace_image(self, img: str, workspace: str) -> str:
292+
"""
293+
build_workspace_image builds a new image with the workspace filesystem
294+
overlaid on it and returns the new image name.
295+
296+
This may be a lazy operation that doesn't actually build the new image
297+
until launch time so the workspace filesystem must persist until the job
298+
launches.
299+
"""
300+
...
301+
302+
283303
def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
284304
"""
285305
filter_regex takes a string iterator and returns an iterator that only has

torchx/schedulers/docker_scheduler.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from torchx.schedulers.api import (
3232
AppDryRunInfo,
3333
DescribeAppResponse,
34-
Scheduler,
34+
WorkspaceScheduler,
3535
filter_regex,
3636
Stream,
3737
)
@@ -102,7 +102,7 @@ def has_docker() -> bool:
102102
return False
103103

104104

105-
class DockerScheduler(Scheduler):
105+
class DockerScheduler(WorkspaceScheduler):
106106
"""
107107
DockerScheduler is a TorchX scheduling interface to Docker.
108108
@@ -415,7 +415,6 @@ def build_workspace_image(self, img: str, workspace: str) -> str:
415415
416416
Returns:
417417
The new Docker image ID.
418-
419418
"""
420419
return _build_container_from_workspace(self._client(), img, workspace)
421420

@@ -444,7 +443,6 @@ def _copy_to_tarfile(workspace: str, tf: tarfile.TarFile) -> None:
444443
assert isinstance(dir, str), "path must be str"
445444
relpath = posixpath.relpath(dir, path)
446445
for file, info in files.items():
447-
print(relpath, dir, file, info)
448446
with fs.open(info["name"], "rb") as f:
449447
tinfo = tarfile.TarInfo(posixpath.join(relpath, file))
450448
tinfo.size = info["size"]
@@ -486,5 +484,5 @@ def _build_container_from_workspace(
486484
)
487485
finally:
488486
context.close()
489-
print(image)
487+
490488
return image.id

0 commit comments

Comments
 (0)