Skip to content

Commit a54b46d

Browse files
d4l3kfacebook-github-bot
authored andcommitted
torchx/runner,cli: add patching support via WorkspaceScheduler (#360)
Summary: This adds patching support to the cli and runner. To keep this somewhat isolated from the main APIs there's now a couple of new concepts: * WorkspaceScheduler extends Scheduler and adds a build_workspace_image * WorkspaceRunner extends Runner and adds support for building a workspace image as part of dryrun This makes the cli run command use the WorkspaceRunner instead of the normal Runner for experimentation purposes. Patching is enabled if the cwd contains a .torchxconfig file. Pull Request resolved: #360 Test Plan: $ touch .torchxconfig $ echo "echo foo" > foo.sh $ torchx run --scheduler local_docker utils.sh sh foo.sh torchx 2021-12-10 16:24:19 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2021-12-10 16:24:19 INFO building patch images for workspace: file:///home/tristanr/Developer/torchx-proj... torchx 2021-12-10 16:24:21 INFO Pulling container image: sha256:0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be (this may take a while) torchx 2021-12-10 16:24:23 WARNING failed to pull image sha256:0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be, falling back to local: 404 Client Error for htt p+docker://localhost/v1.41/images/create?tag=0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be&fromImage=sha256: Not Found ("pull access denied for sha256, reposi tory does not exist or may require 'docker login': denied: requested access to the resource is denied") local_docker://torchx/sh-g7frzl4q92g2bd torchx 2021-12-10 16:24:24 INFO Waiting for the app to finish... torchx 2021-12-10 16:24:24 INFO Job finished: SUCCEEDED sh/0 foo Reviewed By: kiukchung Differential Revision: D33036468 Pulled By: d4l3k fbshipit-source-id: 790ee5fc784adc0c5a3826b63aed5eb6f4e0817e
1 parent a40aca3 commit a54b46d

File tree

6 files changed

+247
-22
lines changed

6 files changed

+247
-22
lines changed

torchx/cli/cmd_run.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from pyre_extensions import none_throws
1818
from torchx.cli.cmd_base import SubCommand
1919
from torchx.cli.cmd_log import get_logs
20-
from torchx.runner import Runner, config, get_runner
20+
from torchx.runner import Runner, config
21+
from torchx.runner.workspaces import get_workspace_runner, WorkspaceRunner
2122
from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories
2223
from torchx.specs import CfgVal
2324
from torchx.specs.finder import (
@@ -138,10 +139,14 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
138139
" (e.g. `local_cwd`)"
139140
)
140141

141-
run_opts = get_runner().run_opts()
142+
run_opts = runner.run_opts()
142143
scheduler_opts = run_opts[args.scheduler]
143144
cfg = _parse_run_config(args.scheduler_args, scheduler_opts)
144145
config.apply(scheduler=args.scheduler, cfg=cfg)
146+
config_files = config.find_configs()
147+
workspace = (
148+
"file://" + os.path.dirname(config_files[0]) if config_files else None
149+
)
145150

146151
if len(args.conf_args) < 1:
147152
none_throws(self._subparser).error(
@@ -154,22 +159,40 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
154159
conf_file, conf_args = args.conf_args[0], args.conf_args[1:]
155160
try:
156161
if args.dryrun:
157-
dryrun_info = runner.dryrun_component(
158-
conf_file, conf_args, args.scheduler, cfg
159-
)
162+
if isinstance(runner, WorkspaceRunner):
163+
dryrun_info = runner.dryrun_component(
164+
conf_file,
165+
conf_args,
166+
args.scheduler,
167+
workspace=workspace,
168+
cfg=cfg,
169+
)
170+
else:
171+
dryrun_info = runner.dryrun_component(
172+
conf_file, conf_args, args.scheduler, cfg=cfg
173+
)
160174
logger.info(
161175
"\n=== APPLICATION ===\n"
162176
f"{pformat(asdict(dryrun_info._app), indent=2, width=80)}"
163177
)
164178

165179
logger.info("\n=== SCHEDULER REQUEST ===\n" f"{dryrun_info}")
166180
else:
167-
app_handle = runner.run_component(
168-
conf_file,
169-
conf_args,
170-
args.scheduler,
171-
cfg,
172-
)
181+
if isinstance(runner, WorkspaceRunner):
182+
app_handle = runner.run_component(
183+
conf_file,
184+
conf_args,
185+
args.scheduler,
186+
workspace=workspace,
187+
cfg=cfg,
188+
)
189+
else:
190+
app_handle = runner.run_component(
191+
conf_file,
192+
conf_args,
193+
args.scheduler,
194+
cfg=cfg,
195+
)
173196
# DO NOT delete this line. It is used by slurm tests to retrieve the app id
174197
print(app_handle)
175198

@@ -200,7 +223,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
200223

201224
def run(self, args: argparse.Namespace) -> None:
202225
os.environ["TORCHX_CONTEXT_NAME"] = os.getenv("TORCHX_CONTEXT_NAME", "cli_run")
203-
with get_runner() as runner:
226+
with get_workspace_runner() as runner:
204227
self._run(runner, args)
205228

206229
def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:

torchx/runner/config.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,13 @@ def my_component(a: int) -> specs.AppDef:
9696
import configparser as configparser
9797
import logging
9898
from pathlib import Path
99-
from typing import Dict, List, Optional, TextIO
99+
from typing import Dict, List, Optional, TextIO, Iterable
100100

101101
from torchx.schedulers import Scheduler, get_schedulers
102102
from torchx.specs import CfgVal, get_type_name
103103
from torchx.specs.api import runopt
104104

105+
CONFIG_FILE = ".torchxconfig"
105106

106107
_NONE = "None"
107108

@@ -235,15 +236,28 @@ def apply(
235236
Then after the method call, ``cfg={"foo":"bar","hello":"world"}``.
236237
"""
237238

239+
for configfile in find_configs(dirs):
240+
with open(configfile, "r") as f:
241+
load(scheduler, f, cfg)
242+
log.info(f"loaded configs from {configfile}")
243+
244+
245+
def find_configs(dirs: Optional[Iterable[str]] = None) -> List[str]:
246+
"""
247+
find_configs returns all the .torchxconfig files it finds in the specified
248+
directories. If directories is empty it checks the local directory.
249+
"""
238250
if not dirs:
239251
dirs = [str(Path.cwd())]
240252

253+
config_files = []
254+
241255
for d in dirs:
242-
configfile = Path(d) / ".torchxconfig"
256+
configfile = Path(d) / CONFIG_FILE
243257
if configfile.exists():
244-
with open(str(configfile), "r") as f:
245-
load(scheduler, f, cfg)
246-
log.info(f"loaded configs from {configfile}")
258+
config_files.append(str(configfile))
259+
260+
return config_files
247261

248262

249263
def load(scheduler: str, f: TextIO, cfg: Dict[str, CfgVal]) -> None:

torchx/runner/test/workspaces_test.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
import unittest
8+
from typing import Mapping
9+
from unittest.mock import MagicMock, call
10+
11+
from torchx.runner.workspaces import (
12+
get_workspace_runner,
13+
WorkspaceRunner,
14+
)
15+
from torchx.schedulers.api import (
16+
WorkspaceScheduler,
17+
)
18+
from torchx.specs.api import AppDryRunInfo, AppDef, CfgVal
19+
20+
21+
class WorkspaceRunnerTest(unittest.TestCase):
22+
def test_get_workspace_runner(self) -> None:
23+
self.assertIsInstance(get_workspace_runner(), WorkspaceRunner)
24+
25+
def test_workspace_runner(self) -> None:
26+
scheduler = MagicMock(spec=WorkspaceScheduler)
27+
28+
def submit_dryrun(app: AppDef, cfg: Mapping[str, CfgVal]) -> AppDryRunInfo[str]:
29+
self.assertEqual(app.roles[0].image, "$img")
30+
31+
dryrun_info: AppDryRunInfo[str] = AppDryRunInfo(str("req"), str)
32+
dryrun_info._app = app
33+
return dryrun_info
34+
35+
scheduler.submit_dryrun = submit_dryrun
36+
scheduler.build_workspace_image.return_value = "$img"
37+
runner = WorkspaceRunner(
38+
"workspaces_test",
39+
schedulers={
40+
"mock": scheduler,
41+
},
42+
)
43+
app_args = ["--image", "dummy_image", "--script", "test.py"]
44+
workspace = "memory:///foo"
45+
ret = runner.run_component("dist.ddp", app_args, "mock", workspace)
46+
47+
self.assertEqual(
48+
scheduler.build_workspace_image.mock_calls,
49+
[
50+
call("dummy_image", workspace),
51+
],
52+
)

torchx/runner/workspaces.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Facebook, Inc. and its affiliates.
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under the BSD-style license found in the
6+
# LICENSE file in the root directory of this source tree.
7+
8+
"""
9+
This contains an experimental patching runner that can overlay a workspace on
10+
top of the provided image. This allows for fast iterations without having to
11+
rebuild a new image with your application code.
12+
13+
The workspace is a fsspec filesystem that gets walked and overlaid on the image.
14+
This allows having multiple different interfaces such as from Jupyter Notebooks
15+
as well as local file systems.
16+
"""
17+
18+
import logging
19+
import warnings
20+
from typing import List, Mapping, Optional
21+
22+
from torchx.runner.api import Runner
23+
from torchx.schedulers import get_schedulers
24+
from torchx.schedulers.api import WorkspaceScheduler
25+
from torchx.specs import (
26+
from_function,
27+
AppDef,
28+
SchedulerBackend,
29+
AppHandle,
30+
CfgVal,
31+
AppDryRunInfo,
32+
)
33+
from torchx.specs.finder import get_component
34+
35+
log: logging.Logger = logging.getLogger(__name__)
36+
37+
38+
class WorkspaceRunner(Runner):
39+
"""
40+
WorkspaceRunner is a special runner that takes an optional workspace
41+
argument for the run and dryrun_component methods. If a workspace is
42+
specified a new image will be built with the workspace overlaid on top.
43+
44+
WARNING: This is in prototype stage and may have backwards incompatible
45+
changes made without notice.
46+
"""
47+
48+
def run_component(
49+
self,
50+
component: str,
51+
component_args: List[str],
52+
scheduler: SchedulerBackend,
53+
workspace: Optional[str],
54+
cfg: Optional[Mapping[str, CfgVal]] = None,
55+
) -> AppHandle:
56+
dryrun_info = self.dryrun_component(
57+
component,
58+
component_args,
59+
scheduler,
60+
workspace,
61+
cfg,
62+
)
63+
return self.schedule(dryrun_info)
64+
65+
def dryrun_component(
66+
self,
67+
component: str,
68+
component_args: List[str],
69+
scheduler: SchedulerBackend,
70+
workspace: Optional[str],
71+
cfg: Optional[Mapping[str, CfgVal]] = None,
72+
) -> AppDryRunInfo:
73+
component_def = get_component(component)
74+
app = from_function(component_def.fn, component_args)
75+
return self.dryrun(app, scheduler, workspace, cfg)
76+
77+
def dryrun(
78+
self,
79+
app: AppDef,
80+
scheduler: SchedulerBackend,
81+
workspace: Optional[str],
82+
cfg: Optional[Mapping[str, CfgVal]] = None,
83+
) -> AppDryRunInfo:
84+
if workspace:
85+
self._patch_app(app, scheduler, workspace)
86+
87+
return super().dryrun(app, scheduler, cfg)
88+
89+
def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None:
90+
sched = self._scheduler(scheduler)
91+
if not isinstance(sched, WorkspaceScheduler):
92+
warnings.warn(
93+
f"can't apply workspace to image since {sched} is not a "
94+
"WorkspaceScheduler"
95+
)
96+
return
97+
98+
log.info(f"building patch images for workspace: {workspace}...")
99+
100+
images = {}
101+
for role in app.roles:
102+
img = images.get(role.image)
103+
if not img:
104+
img = sched.build_workspace_image(role.image, workspace)
105+
images[role.image] = img
106+
role.image = img
107+
108+
def __enter__(self) -> "WorkspaceRunner":
109+
return self
110+
111+
112+
def get_workspace_runner(
113+
name: Optional[str] = None, **scheduler_params: object
114+
) -> WorkspaceRunner:
115+
"""
116+
Returns a WorkspaceRunner. See torchx.runner.get_runner for more info.
117+
"""
118+
if not name:
119+
name = "torchx"
120+
121+
schedulers = get_schedulers(session_name=name, **scheduler_params)
122+
return WorkspaceRunner(name, schedulers)

torchx/schedulers/api.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,22 @@ def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None:
280280
)
281281

282282

283+
class WorkspaceScheduler(Scheduler):
284+
"""
285+
WorkspaceScheduler is a Scheduler that has workspace support.
286+
287+
Experimental: this interface may change without notice.
288+
"""
289+
290+
@abc.abstractmethod
291+
def build_workspace_image(self, img: str, workspace: str) -> str:
292+
"""
293+
build_workspace_image builds a new image with the workspace filesystem
294+
overlaid on it and returns the new image name.
295+
"""
296+
...
297+
298+
283299
def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
284300
"""
285301
filter_regex takes a string iterator and returns an iterator that only has

torchx/schedulers/docker_scheduler.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from torchx.schedulers.api import (
3232
AppDryRunInfo,
3333
DescribeAppResponse,
34-
Scheduler,
34+
WorkspaceScheduler,
3535
filter_regex,
3636
Stream,
3737
)
@@ -102,7 +102,7 @@ def has_docker() -> bool:
102102
return False
103103

104104

105-
class DockerScheduler(Scheduler):
105+
class DockerScheduler(WorkspaceScheduler):
106106
"""
107107
DockerScheduler is a TorchX scheduling interface to Docker.
108108
@@ -415,7 +415,6 @@ def build_workspace_image(self, img: str, workspace: str) -> str:
415415
416416
Returns:
417417
The new Docker image ID.
418-
419418
"""
420419
return _build_container_from_workspace(self._client(), img, workspace)
421420

@@ -444,7 +443,6 @@ def _copy_to_tarfile(workspace: str, tf: tarfile.TarFile) -> None:
444443
assert isinstance(dir, str), "path must be str"
445444
relpath = posixpath.relpath(dir, path)
446445
for file, info in files.items():
447-
print(relpath, dir, file, info)
448446
with fs.open(info["name"], "rb") as f:
449447
tinfo = tarfile.TarInfo(posixpath.join(relpath, file))
450448
tinfo.size = info["size"]
@@ -486,5 +484,5 @@ def _build_container_from_workspace(
486484
)
487485
finally:
488486
context.close()
489-
print(image)
487+
490488
return image.id

0 commit comments

Comments
 (0)