Skip to content

Commit c813a70

Browse files
committed
slurm_scheduler: add support for per replica log files and API access
1 parent 92e6897 commit c813a70

File tree

4 files changed

+136
-42
lines changed

4 files changed

+136
-42
lines changed

scripts/slurmtest.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ source "$VENV"/bin/activate
1818
python --version
1919
pip install "$REMOTE_WHEEL"
2020

21-
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --image /tmp --num_replicas 3)"
21+
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --num_replicas 3)"
2222
torchx status "$APP_ID"
2323
torchx describe "$APP_ID"
24-
LOG_FILE="slurm-$(basename "$APP_ID").out"
25-
cat "$LOG_FILE"
26-
LINES="$(wc -l "$LOG_FILE" | cut -d' ' -f1)"
24+
sacct -j "$(basename "$APP_ID")"
25+
torchx log "$APP_ID"
26+
LINES="$(torchx log "$APP_ID" | wc -l)"
2727

2828
if [ "$LINES" -ne 3 ]
2929
then

torchx/schedulers/local_scheduler.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -909,14 +909,19 @@ def __del__(self) -> None:
909909

910910
class LogIterator:
911911
def __init__(
912-
self, app_id: str, regex: str, log_file: str, scheduler: LocalScheduler
912+
self,
913+
app_id: str,
914+
regex: str,
915+
log_file: str,
916+
scheduler: Scheduler,
917+
should_tail: bool = True,
913918
) -> None:
914919
self._app_id: str = app_id
915920
self._regex: Pattern[str] = re.compile(regex)
916921
self._log_file: str = log_file
917922
self._log_fp: Optional[TextIO] = None
918-
self._scheduler: LocalScheduler = scheduler
919-
self._app_finished: bool = False
923+
self._scheduler: Scheduler = scheduler
924+
self._app_finished: bool = not should_tail
920925

921926
def _check_finished(self) -> None:
922927
# either the app (already finished) was evicted from the LRU cache

torchx/schedulers/slurm_scheduler.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
import shlex
1616
import subprocess
1717
import tempfile
18+
import warnings
1819
from dataclasses import dataclass
19-
from typing import Any, Dict, List, Mapping, Optional, Tuple
20+
from datetime import datetime
21+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Iterable
2022

21-
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler
23+
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream
24+
from torchx.schedulers.local_scheduler import LogIterator
2225
from torchx.specs import (
2326
NONE,
2427
AppDef,
@@ -100,12 +103,16 @@ def from_role(
100103
if resource.gpu > 0:
101104
sbatch_opts.setdefault("gpus-per-task", str(resource.gpu))
102105

106+
srun_opts = {
107+
"output": f"slurm-{macros.app_id}-{name}.out",
108+
}
109+
103110
return cls(
104111
name=name,
105112
entrypoint=role.entrypoint,
106113
args=list(role.args),
107114
sbatch_opts=sbatch_opts,
108-
srun_opts={},
115+
srun_opts=srun_opts,
109116
env=dict(role.env),
110117
)
111118

@@ -176,7 +183,11 @@ class SlurmScheduler(Scheduler):
176183
resource allocations and args and then sbatch is used to launch all of them
177184
together.
178185
179-
Logs are written to the default slurm log file.
186+
Logs are available in combined form via ``torchx log``, the programmatic API
187+
as well as in the job launch directory as
188+
``slurm-<jobid>-<role>-<replica_id>.out``. If TorchX is running in a
189+
different directory than where the job was created the logs won't be able to
190+
be found.
180191
181192
Some of the config options passed to it are added as SBATCH arguments to each
182193
replica. See https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS for info
@@ -203,9 +214,7 @@ class SlurmScheduler(Scheduler):
203214
type: scheduler
204215
features:
205216
cancel: true
206-
logs: |
207-
Logs are accessible via the default slurm log file but not the
208-
programmatic API.
217+
logs: true
209218
distributed: true
210219
describe: |
211220
Partial support. SlurmScheduler will return job and replica
@@ -262,7 +271,7 @@ def _submit_dryrun(
262271
app_id=macros.app_id,
263272
replica_id=str(replica_id),
264273
)
265-
name = f"{app.name}-{role.name}-{replica_id}"
274+
name = f"{role.name}-{replica_id}"
266275
replica_role = values.apply(role)
267276
replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg)
268277
req = SlurmBatchRequest(
@@ -308,19 +317,19 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
308317
), f"failed to translate slurm state {state} to torchx state"
309318
app_state = state_enum
310319

311-
name_parts = row["JobName"].split("-")
312-
if len(name_parts) < 3:
320+
role, _, replica_id = row["JobName"].rpartition("-")
321+
if not replica_id:
313322
# name should always have at least 3 parts but sometimes sacct
314323
# is slow to update
315324
continue
316-
role = name_parts[-2]
317-
replica_id = int(name_parts[-1])
318325
if role not in roles:
319326
roles[role] = Role(name=role, num_replicas=0, image="")
320327
roles_statuses[role] = RoleStatus(role, [])
321328
roles[role].num_replicas += 1
322329
roles_statuses[role].replicas.append(
323-
ReplicaStatus(id=replica_id, role=role, state=app_state, hostname=""),
330+
ReplicaStatus(
331+
id=int(replica_id), role=role, state=app_state, hostname=""
332+
),
324333
)
325334

326335
return DescribeAppResponse(
@@ -331,6 +340,34 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
331340
msg=msg,
332341
)
333342

343+
def log_iter(
344+
self,
345+
app_id: str,
346+
role_name: str,
347+
k: int = 0,
348+
regex: Optional[str] = None,
349+
since: Optional[datetime] = None,
350+
until: Optional[datetime] = None,
351+
should_tail: bool = False,
352+
streams: Optional[Stream] = None,
353+
) -> Iterable[str]:
354+
if since or until:
355+
warnings.warn(
356+
"since and/or until times specified for SlurmScheduler.log_iter."
357+
" These will be ignored and all log lines will be returned"
358+
)
359+
if streams is not None and streams != Stream.COMBINED:
360+
warnings.warn(
361+
"streams specified for SlurmScheduler.log_iter."
362+
" These will be ignored and all log lines will be returned"
363+
)
364+
365+
log_file = f"slurm-{app_id}-{role_name}-{k}.out"
366+
367+
return LogIterator(
368+
app_id, regex or ".*", log_file, self, should_tail=should_tail
369+
)
370+
334371

335372
def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
336373
return SlurmScheduler(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
# This source code is licensed under the BSD-style license found in the
55
# LICENSE file in the root directory of this source tree.
66

7+
import datetime
8+
import os
79
import subprocess
10+
import tempfile
811
import unittest
12+
from contextlib import contextmanager
13+
from typing import Generator
914
from unittest.mock import MagicMock, call, patch
1015

1116
from torchx import specs
12-
from torchx.schedulers.api import DescribeAppResponse
17+
from torchx.schedulers.api import DescribeAppResponse, Stream
1318
from torchx.schedulers.slurm_scheduler import (
1419
SlurmBatchRequest,
1520
SlurmReplicaRequest,
@@ -18,6 +23,18 @@
1823
)
1924

2025

26+
@contextmanager
27+
def tmp_cwd() -> Generator[None, None, None]:
28+
path = tempfile.TemporaryDirectory()
29+
cwd = os.getcwd()
30+
os.chdir(path.name)
31+
try:
32+
yield
33+
finally:
34+
os.chdir(cwd)
35+
path.cleanup()
36+
37+
2138
class SlurmSchedulerTest(unittest.TestCase):
2239
def test_create_scheduler(self) -> None:
2340
scheduler = create_scheduler("foo")
@@ -40,12 +57,12 @@ def test_replica_request(self) -> None:
4057
),
4158
)
4259
sbatch, srun = SlurmReplicaRequest.from_role(
43-
"role-name", role, cfg={}
60+
"role-0", role, cfg={}
4461
).materialize()
4562
self.assertEqual(
4663
sbatch,
4764
[
48-
"--job-name=role-name",
65+
"--job-name=role-0",
4966
"--ntasks-per-node=1",
5067
"--cpus-per-task=2",
5168
"--mem=10",
@@ -54,7 +71,13 @@ def test_replica_request(self) -> None:
5471
)
5572
self.assertEqual(
5673
srun,
57-
["--export=FOO=bar", "echo", "'hello slurm'", "test"],
74+
[
75+
'--output=slurm-"$SLURM_JOB_ID"-role-0.out',
76+
"--export=FOO=bar",
77+
"echo",
78+
"'hello slurm'",
79+
"test",
80+
],
5881
)
5982

6083
# test nomem option
@@ -133,25 +156,25 @@ def test_dryrun_multi_role(self) -> None:
133156
self.assertEqual(req.cmd, ["sbatch", "--parsable"])
134157
self.assertEqual(
135158
set(req.replicas.keys()),
136-
{"foo-a-0", "foo-a-1", "foo-b-0"},
159+
{"a-0", "a-1", "b-0"},
137160
)
138161

139162
script = req.materialize()
140163
self.assertEqual(
141164
script,
142165
"""#!/bin/bash
143-
#SBATCH --job-name=foo-a-0 --ntasks-per-node=1
166+
#SBATCH --job-name=a-0 --ntasks-per-node=1
144167
#SBATCH hetjob
145-
#SBATCH --job-name=foo-a-1 --ntasks-per-node=1
168+
#SBATCH --job-name=a-1 --ntasks-per-node=1
146169
#SBATCH hetjob
147-
#SBATCH --job-name=foo-b-0 --ntasks-per-node=1
170+
#SBATCH --job-name=b-0 --ntasks-per-node=1
148171
149172
# exit on error
150173
set -e
151174
152-
srun echo 0 'hello '"$SLURM_JOB_ID"'' :\\
153-
echo 1 'hello '"$SLURM_JOB_ID"'' :\\
154-
echo
175+
srun --output=slurm-"$SLURM_JOB_ID"-a-0.out echo 0 'hello '"$SLURM_JOB_ID"'' :\\
176+
--output=slurm-"$SLURM_JOB_ID"-a-1.out echo 1 'hello '"$SLURM_JOB_ID"'' :\\
177+
--output=slurm-"$SLURM_JOB_ID"-b-0.out echo
155178
""",
156179
)
157180

@@ -205,33 +228,43 @@ def test_cancel(self, run: MagicMock, describe: MagicMock) -> None:
205228
@patch("subprocess.run")
206229
def test_describe_completed(self, run: MagicMock) -> None:
207230
run.return_value.stdout = b"""
208-
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode|
209-
176+0|echo-echo-0|compute||1|COMPLETED|0:0|
210-
176+0.batch|batch|||1|COMPLETED|0:0|
211-
176+0.0|echo|||1|COMPLETED|0:0|
212-
176+1|echo-echo-1|compute||1|COMPLETED|0:0|
213-
176+1.0|echo|||1|COMPLETED|0:0|
214-
176+2|echo-echo-2|compute||1|COMPLETED|0:0|
215-
176+2.0|echo|||1|COMPLETED|0:0|
231+
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode
232+
1853+0|echo-0|compute||1|COMPLETED|0:0
233+
1853+0.batch|batch|||1|COMPLETED|0:0
234+
1853+0.0|echo|||1|COMPLETED|0:0
235+
1853+1|echo-1|compute||1|COMPLETED|0:0
236+
1853+1.0|echo|||1|COMPLETED|0:0
237+
1853+2|echo-2|compute||1|COMPLETED|0:0
238+
1853+2.0|echo|||1|COMPLETED|0:0
216239
""".strip()
217240

218241
scheduler = create_scheduler("foo")
219-
out = scheduler.describe(app_id="176")
242+
out = scheduler.describe(app_id="1853")
220243

221244
self.assertEqual(run.call_count, 1)
222245
self.assertEqual(
223246
run.call_args,
224247
call(
225-
["sacct", "--parsable2", "-j", "176"],
248+
["sacct", "--parsable2", "-j", "1853"],
226249
stdout=subprocess.PIPE,
227250
check=True,
228251
),
229252
)
230253

231254
self.assertIsNotNone(out)
232-
self.assertEqual(out.app_id, "176")
255+
self.assertEqual(out.app_id, "1853")
233256
self.assertEqual(out.msg, "COMPLETED")
234257
self.assertEqual(out.state, specs.AppState.SUCCEEDED)
258+
self.assertEqual(
259+
out.roles,
260+
[
261+
specs.Role(
262+
name="echo",
263+
image="",
264+
num_replicas=3,
265+
)
266+
],
267+
)
235268

236269
@patch("subprocess.run")
237270
def test_describe_running(self, run: MagicMock) -> None:
@@ -253,3 +286,22 @@ def test_describe_running(self, run: MagicMock) -> None:
253286
self.assertEqual(out.app_id, "54")
254287
self.assertEqual(out.msg, "RUNNING")
255288
self.assertEqual(out.state, specs.AppState.RUNNING)
289+
290+
@patch("subprocess.run")
291+
def test_log_iter(self, run: MagicMock) -> None:
292+
scheduler = create_scheduler("foo")
293+
294+
with tmp_cwd():
295+
with open("slurm-54-echo-1.out", "wt") as f:
296+
f.write("hello\nworld\n")
297+
298+
logs = list(
299+
scheduler.log_iter(
300+
"54",
301+
"echo",
302+
1,
303+
streams=Stream.STDERR,
304+
since=datetime.datetime.now(),
305+
)
306+
)
307+
self.assertEqual(logs, ["hello", "world"])

0 commit comments

Comments
 (0)