Skip to content

Commit 9f81b85

Browse files
d4l3kfacebook-github-bot
authored andcommitted
schedulers,cli: persist newline breaks in log_iter (#425)
Summary: This resolves #424 This makes it so the torchx scheduler `log_iter` method keeps the line breaks so downstream log streams can handle them gracefully. The current solution strips all `\n` characters and always adds them so it makes it impossible to do streaming visualizations of progress bars which use `\r` without a new line break. WARNING: This is a change in the log_iter interface and all schedulers/downstream consumers will need to be updated. If someone is logging from multiple workers this gets dangerous since the progress bar `\r` lines can clobber each other. Pull Request resolved: #425 Test Plan: (torchx-3.10.2) tristanr@tristanr-arch2 ~/D/torchx-proj> torchx run --scheduler local_docker --wait --log utils.python --script test_tqdm.py torchx 2022-03-15 14:26:42 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2022-03-15 14:26:42 INFO Building workspace: file:///home/tristanr/Developer/torchx-proj for role[0]: python, image: ghcr.io/pytorch/torchx:0.1.2 dev0 torchx 2022-03-15 14:26:43 INFO Done building workspace torchx 2022-03-15 14:26:43 INFO New image: sha256:9cfaf70f7143b4caef383b46c23635eaf001cbd3d9ff55335aa1ff8c5e236388 built from workspace local_docker://torchx/torchx_utils_python-bprr9rb4k764nd torchx 2022-03-15 14:26:44 INFO Waiting for the app to finish... python/0 100%|██████████| 100/100 [00:03<00:00, 32.95it/s] torchx 2022-03-15 14:26:48 INFO Job finished: SUCCEEDED (torchx-3.10.2) tristanr@tristanr-arch2 ~/D/torchx-proj> torchx run --scheduler local_cwd --wait --log utils.python --script test_tqdm.py torchx 2022-03-15 14:26:52 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2022-03-15 14:26:52 INFO Log files located in: /tmp/torchx_0nqvqm1d/torchx/torchx_utils_python-x217jjqhbkkrgd/python/0 local_cwd://torchx/torchx_utils_python-x217jjqhbkkrgd torchx 2022-03-15 14:26:52 INFO Waiting for the app to finish... python/0 100%|██████████| 100/100 [00:03<00:00, 32.95it/s] torchx 2022-03-15 14:26:56 INFO Job finished: SUCCEEDED Differential Revision: D34907682 Pulled By: d4l3k fbshipit-source-id: 2c2619b05366074870434444acf1e0b02787fb77
1 parent 98fb9cc commit 9f81b85

13 files changed

+84
-32
lines changed

torchx/cli/cmd_log.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ def validate(job_identifier: str) -> None:
3535
sys.exit(1)
3636

3737

38+
def _prefix_line(prefix: str, line: str) -> str:
39+
"""
40+
_prefix_line ensure the prefix is still present even when dealing with return characters
41+
"""
42+
if "\r" in line:
43+
line = line.replace("\r", f"\r{prefix}")
44+
if not line.startswith("\r"):
45+
line = f"{prefix}{line}"
46+
return line
47+
48+
3849
def print_log_lines(
3950
file: TextIO,
4051
runner: Runner,
@@ -55,7 +66,8 @@ def print_log_lines(
5566
should_tail=should_tail,
5667
streams=streams,
5768
):
58-
print(f"{GREEN}{role_name}/{replica_id}{ENDC} {line}", file=file)
69+
prefix = f"{GREEN}{role_name}/{replica_id}{ENDC} "
70+
print(_prefix_line(prefix, line), file=file, end="")
5971
except Exception as e:
6072
exceptions.put(e)
6173
raise

torchx/cli/test/cmd_log_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def log_lines(
6565
if regex is None:
6666
regex = ".*"
6767

68-
log_lines = ["INFO foo", "ERROR bar", "WARN baz"]
68+
log_lines = ["INFO foo\n", "ERROR bar\n", "WARN baz\n"]
6969
return iter([line for line in log_lines if re.match(regex, line)])
7070

7171

torchx/runner/api.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,10 @@ def log_lines(
469469
if the scheduler has already totally or partially purged log records
470470
for the application.
471471
472+
Return lines will include whitespace characters such as ``\n`` or
473+
``\r``. When outputting the lines you should make sure to avoid adding
474+
extra newline characters.
475+
472476
Usage:
473477
474478
::
@@ -477,7 +481,11 @@ def log_lines(
477481
478482
print("== trainer node 0 logs ==")
479483
for line in session.log_lines(app_handle, "trainer", k=0):
480-
print(line)
484+
# for prints newlines will already be present in the line
485+
print(line, end="")
486+
487+
# when writing to a file nothing extra is necessary
488+
f.write(line)
481489
482490
Discouraged anti-pattern:
483491

torchx/schedulers/api.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ def log_iter(
263263
7. Some schedulers may support line cursors by supporting ``__getitem__``
264264
(e.g. ``iter[50]`` seeks to the 50th log line).
265265
266+
8. Whitespace is preserved, each new line should include ``\n``. To
267+
support interactive progress bars the returned lines don't need to
268+
include ``\n`` but should then be printed without a newline to
269+
correctly handle ``\r`` carriage returns.
270+
266271
Args:
267272
streams: The IO output streams to select.
268273
One of: combined, stdout, stderr.
@@ -302,3 +307,19 @@ def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
302307

303308
r = re.compile(regex)
304309
return filter(lambda datum: r.search(datum), data)
310+
311+
312+
def split_lines(text: str) -> List[str]:
313+
"""
314+
split_lines splits the string by new lines and keeps the new line characters.
315+
"""
316+
lines = []
317+
while len(text) > 0:
318+
idx = text.find("\n")
319+
if idx >= 0:
320+
lines.append(text[: idx + 1])
321+
text = text[idx + 1 :]
322+
else:
323+
lines.append(text)
324+
break
325+
return lines

torchx/schedulers/docker_scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Scheduler,
2121
Stream,
2222
filter_regex,
23+
split_lines,
2324
)
2425
from torchx.schedulers.ids import make_unique
2526
from torchx.specs.api import (
@@ -425,7 +426,7 @@ def log_iter(
425426
if len(logs) == 0:
426427
logs = []
427428
else:
428-
logs = logs.split("\n")
429+
logs = split_lines(logs)
429430

430431
logs = map(_to_str, logs)
431432

@@ -438,8 +439,6 @@ def log_iter(
438439
def _to_str(a: Union[str, bytes]) -> str:
439440
if isinstance(a, bytes):
440441
a = a.decode("utf-8")
441-
if a.endswith("\n"):
442-
a = a[:-1]
443442
return a
444443

445444

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
Scheduler,
5050
Stream,
5151
filter_regex,
52+
split_lines,
5253
)
5354
from torchx.schedulers.ids import make_unique
5455
from torchx.specs.api import (
@@ -640,7 +641,7 @@ def log_iter(
640641
iterator = w.stream(core_api.read_namespaced_pod_log, **args)
641642
else:
642643
resp = core_api.read_namespaced_pod_log(**args)
643-
iterator = resp.strip().split("\n")
644+
iterator = split_lines(resp)
644645

645646
if regex:
646647
return filter_regex(regex, iterator)

torchx/schedulers/local_scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,7 +1020,7 @@ def __iter__(self) -> "LogIterator":
10201020
self._check_finished() # check to see if app has finished running
10211021

10221022
if os.path.isfile(self._log_file):
1023-
self._log_fp = open(self._log_file, "r") # noqa: P201
1023+
self._log_fp = open(self._log_file, "rt", newline="\n") # noqa: P201
10241024
break
10251025

10261026
if self._app_finished:
@@ -1048,7 +1048,6 @@ def __next__(self) -> str:
10481048
time.sleep(0.1)
10491049
self._check_finished()
10501050
else:
1051-
line = line.rstrip("\n") # strip the trailing newline
10521051
if re.match(self._regex, line):
10531052
return line
10541053

torchx/schedulers/ray_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DescribeAppResponse,
2222
Scheduler,
2323
Stream,
24+
split_lines,
2425
)
2526
from torchx.schedulers.ids import make_unique
2627
from torchx.schedulers.ray.ray_common import RayActor
@@ -350,7 +351,7 @@ def log_iter(
350351
addr, app_id = app_id.split("-")
351352
client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}")
352353
logs: str = client.get_job_logs(app_id)
353-
return logs.split("\n")
354+
return split_lines(logs)
354355

355356
def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler:
356357
if not has_ray(): # pragma: no cover

torchx/schedulers/test/api_test.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from typing import Iterable, Mapping, Optional, Union
1212
from unittest.mock import MagicMock, patch
1313

14-
from torchx.schedulers.api import DescribeAppResponse, Scheduler, Stream
14+
from torchx.schedulers.api import (
15+
DescribeAppResponse,
16+
Scheduler,
17+
Stream,
18+
split_lines,
19+
)
1520
from torchx.specs.api import (
1621
NULL_RESOURCE,
1722
AppDef,
@@ -152,3 +157,9 @@ def test_close_twice(self) -> None:
152157
scheduler_mock.close()
153158
scheduler_mock.close()
154159
# nothing to validate explicitly, just that no errors are raised
160+
161+
def test_split_lines(self) -> None:
162+
self.assertEqual(split_lines(""), [])
163+
self.assertEqual(split_lines("\n"), ["\n"])
164+
self.assertEqual(split_lines("foo\nbar"), ["foo\n", "bar"])
165+
self.assertEqual(split_lines("foo\nbar\n"), ["foo\n", "bar\n"])

torchx/schedulers/test/docker_scheduler_test.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ def test_docker_logs(self) -> None:
219219
self.assertEqual(
220220
logs,
221221
[
222-
"foo",
223-
"bar",
222+
"foo\n",
223+
"bar\n",
224224
],
225225
)
226226
logs = list(
@@ -234,7 +234,7 @@ def test_docker_logs(self) -> None:
234234
self.assertEqual(
235235
logs,
236236
[
237-
"bar",
237+
"bar\n",
238238
],
239239
)
240240

@@ -267,8 +267,8 @@ def test_docker_logs(self) -> None:
267267
self.assertEqual(
268268
logs,
269269
[
270-
"foo",
271-
"bar",
270+
"foo\n",
271+
"bar\n",
272272
],
273273
)
274274

@@ -286,8 +286,8 @@ def test_docker_logs_streams(self) -> None:
286286
self.assertEqual(
287287
logs,
288288
{
289-
"stdout",
290-
"stderr",
289+
"stdout\n",
290+
"stderr\n",
291291
},
292292
)
293293

@@ -299,8 +299,8 @@ def test_docker_logs_streams(self) -> None:
299299
self.assertEqual(
300300
logs,
301301
{
302-
"stdout",
303-
"stderr",
302+
"stdout\n",
303+
"stderr\n",
304304
},
305305
)
306306

@@ -312,7 +312,7 @@ def test_docker_logs_streams(self) -> None:
312312
self.assertEqual(
313313
logs,
314314
[
315-
"stderr",
315+
"stderr\n",
316316
],
317317
)
318318

@@ -324,7 +324,7 @@ def test_docker_logs_streams(self) -> None:
324324
self.assertEqual(
325325
logs,
326326
[
327-
"stdout",
327+
"stdout\n",
328328
],
329329
)
330330

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,8 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
576576
self.assertEqual(
577577
list(lines),
578578
[
579-
"foo reg",
580-
"bar reg",
579+
"foo reg\n",
580+
"bar reg\n",
581581
],
582582
)
583583
call = read_namespaced_pod_log.call_args

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def test_submit_inherit_parent_envs(self) -> None:
325325
app = AppDef(name="check_foo_env_var", roles=[role])
326326
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
327327
for line in self.scheduler.log_iter(app_id, "echo_foo"):
328-
self.assertEqual("bar", line)
328+
self.assertEqual("bar\n", line)
329329

330330
desc = self.wait(app_id, self.scheduler)
331331
assert desc is not None
@@ -429,7 +429,7 @@ def test_submit_override_parent_env(self) -> None:
429429
app = AppDef(name="check_foo_env_var", roles=[role])
430430
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
431431
for line in self.scheduler.log_iter(app_id, "echo_foo"):
432-
self.assertEqual("new_bar", line)
432+
self.assertEqual("new_bar\n", line)
433433

434434
desc = self.wait(app_id, self.scheduler)
435435
assert desc is not None
@@ -598,20 +598,20 @@ def test_log_iterator(self) -> None:
598598
app_id = self.scheduler.submit(app, cfg)
599599

600600
for i, line in enumerate(self.scheduler.log_iter(app_id, "role1", k=0)):
601-
self.assertEqual(str(i), line)
601+
self.assertEqual(str(i), line.strip())
602602

603603
# since and until ignored
604604
for i, line in enumerate(
605605
self.scheduler.log_iter(
606606
app_id, "role1", k=0, since=datetime.now(), until=datetime.now()
607607
)
608608
):
609-
self.assertEqual(str(i), line)
609+
self.assertEqual(str(i), line.strip())
610610

611611
for i, line in enumerate(
612612
self.scheduler.log_iter(app_id, "role1", k=0, regex=r"[02468]")
613613
):
614-
self.assertEqual(str(i * 2), line)
614+
self.assertEqual(str(i * 2), line.strip())
615615

616616
def test_log_iterator_no_log_dir(self) -> None:
617617
role = Role(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ def test_log_iter(self, run: MagicMock) -> None:
373373
since=datetime.datetime.now(),
374374
)
375375
)
376-
self.assertEqual(logs, ["hello", "world"])
376+
self.assertEqual(logs, ["hello\n", "world\n"])
377377

378378
with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
379379
f.write("foo\nbar\n")
@@ -387,7 +387,7 @@ def test_log_iter(self, run: MagicMock) -> None:
387387
)
388388
)
389389

390-
self.assertEqual(logs, ["foo", "bar"])
390+
self.assertEqual(logs, ["foo\n", "bar\n"])
391391

392392
# no stream specified should default to STDERR
393393
logs = list(
@@ -397,7 +397,7 @@ def test_log_iter(self, run: MagicMock) -> None:
397397
1,
398398
)
399399
)
400-
self.assertEqual(logs, ["foo", "bar"])
400+
self.assertEqual(logs, ["foo\n", "bar\n"])
401401

402402
with self.assertRaises(ValueError):
403403
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)

0 commit comments

Comments
 (0)