Skip to content

Commit dbc8fb2

Browse files
committed
schedulers,cli: persist newline breaks in log_iter
1 parent 4cd94d8 commit dbc8fb2

12 files changed

+70
-31
lines changed

torchx/cli/cmd_log.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ def validate(job_identifier: str) -> None:
3535
sys.exit(1)
3636

3737

38+
def _prefix_line(prefix: str, line: str) -> str:
39+
"""
40+
_prefix_line ensure the prefix is still present even when dealing with return characters
41+
"""
42+
if "\r" in line:
43+
line = line.replace("\r", f"\r{prefix}")
44+
if not line.startswith("\r"):
45+
line = f"{prefix}{line}"
46+
return line
47+
48+
3849
def print_log_lines(
3950
file: TextIO,
4051
runner: Runner,
@@ -55,7 +66,8 @@ def print_log_lines(
5566
should_tail=should_tail,
5667
streams=streams,
5768
):
58-
print(f"{GREEN}{role_name}/{replica_id}{ENDC} {line}", file=file)
69+
prefix = f"{GREEN}{role_name}/{replica_id}{ENDC} "
70+
print(_prefix_line(prefix, line), file=file, end="")
5971
except Exception as e:
6072
exceptions.put(e)
6173
raise

torchx/cli/test/cmd_log_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def log_lines(
6565
if regex is None:
6666
regex = ".*"
6767

68-
log_lines = ["INFO foo", "ERROR bar", "WARN baz"]
68+
log_lines = ["INFO foo\n", "ERROR bar\n", "WARN baz\n"]
6969
return iter([line for line in log_lines if re.match(regex, line)])
7070

7171

torchx/schedulers/api.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,19 @@ def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
302302

303303
r = re.compile(regex)
304304
return filter(lambda datum: r.search(datum), data)
305+
306+
307+
def split_lines(text: str) -> List[str]:
308+
"""
309+
split_lines splits the string by new lines and keeps the new line characters.
310+
"""
311+
lines = []
312+
while len(text) > 0:
313+
idx = text.find("\n")
314+
if idx >= 0:
315+
lines.append(text[: idx + 1])
316+
text = text[idx + 1 :]
317+
else:
318+
lines.append(text)
319+
break
320+
return lines

torchx/schedulers/docker_scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Scheduler,
2121
Stream,
2222
filter_regex,
23+
split_lines,
2324
)
2425
from torchx.schedulers.ids import make_unique
2526
from torchx.specs.api import (
@@ -394,7 +395,7 @@ def log_iter(
394395
if len(logs) == 0:
395396
logs = []
396397
else:
397-
logs = logs.split("\n")
398+
logs = split_lines(logs)
398399

399400
logs = map(_to_str, logs)
400401

@@ -407,8 +408,6 @@ def log_iter(
407408
def _to_str(a: Union[str, bytes]) -> str:
408409
if isinstance(a, bytes):
409410
a = a.decode("utf-8")
410-
if a.endswith("\n"):
411-
a = a[:-1]
412411
return a
413412

414413

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
Scheduler,
5050
Stream,
5151
filter_regex,
52+
split_lines,
5253
)
5354
from torchx.schedulers.ids import make_unique
5455
from torchx.specs.api import (
@@ -599,7 +600,7 @@ def log_iter(
599600
iterator = w.stream(core_api.read_namespaced_pod_log, **args)
600601
else:
601602
resp = core_api.read_namespaced_pod_log(**args)
602-
iterator = resp.strip().split("\n")
603+
iterator = split_lines(resp)
603604

604605
if regex:
605606
return filter_regex(regex, iterator)

torchx/schedulers/local_scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,7 +1020,7 @@ def __iter__(self) -> "LogIterator":
10201020
self._check_finished() # check to see if app has finished running
10211021

10221022
if os.path.isfile(self._log_file):
1023-
self._log_fp = open(self._log_file, "r") # noqa: P201
1023+
self._log_fp = open(self._log_file, "rt", newline="\n") # noqa: P201
10241024
break
10251025

10261026
if self._app_finished:
@@ -1048,7 +1048,6 @@ def __next__(self) -> str:
10481048
time.sleep(0.1)
10491049
self._check_finished()
10501050
else:
1051-
line = line.rstrip("\n") # strip the trailing newline
10521051
if re.match(self._regex, line):
10531052
return line
10541053

torchx/schedulers/ray_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DescribeAppResponse,
2222
Scheduler,
2323
Stream,
24+
split_lines,
2425
)
2526
from torchx.schedulers.ids import make_unique
2627
from torchx.schedulers.ray.ray_common import RayActor
@@ -350,7 +351,7 @@ def log_iter(
350351
addr, app_id = app_id.split("-")
351352
client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}")
352353
logs: str = client.get_job_logs(app_id)
353-
return logs.split("\n")
354+
return split_lines(logs)
354355

355356
def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler:
356357
if not has_ray(): # pragma: no cover

torchx/schedulers/test/api_test.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from typing import Iterable, Mapping, Optional, Union
1212
from unittest.mock import MagicMock, patch
1313

14-
from torchx.schedulers.api import DescribeAppResponse, Scheduler, Stream
14+
from torchx.schedulers.api import (
15+
DescribeAppResponse,
16+
Scheduler,
17+
Stream,
18+
split_lines,
19+
)
1520
from torchx.specs.api import (
1621
NULL_RESOURCE,
1722
AppDef,
@@ -152,3 +157,9 @@ def test_close_twice(self) -> None:
152157
scheduler_mock.close()
153158
scheduler_mock.close()
154159
# nothing to validate explicitly, just that no errors are raised
160+
161+
def test_split_lines(self) -> None:
162+
self.assertEqual(split_lines(""), [])
163+
self.assertEqual(split_lines("\n"), ["\n"])
164+
self.assertEqual(split_lines("foo\nbar"), ["foo\n", "bar"])
165+
self.assertEqual(split_lines("foo\nbar\n"), ["foo\n", "bar\n"])

torchx/schedulers/test/docker_scheduler_test.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ def test_docker_logs(self) -> None:
201201
self.assertEqual(
202202
logs,
203203
[
204-
"foo",
205-
"bar",
204+
"foo\n",
205+
"bar\n",
206206
],
207207
)
208208
logs = list(
@@ -216,7 +216,7 @@ def test_docker_logs(self) -> None:
216216
self.assertEqual(
217217
logs,
218218
[
219-
"bar",
219+
"bar\n",
220220
],
221221
)
222222

@@ -249,8 +249,8 @@ def test_docker_logs(self) -> None:
249249
self.assertEqual(
250250
logs,
251251
[
252-
"foo",
253-
"bar",
252+
"foo\n",
253+
"bar\n",
254254
],
255255
)
256256

@@ -268,8 +268,8 @@ def test_docker_logs_streams(self) -> None:
268268
self.assertEqual(
269269
logs,
270270
{
271-
"stdout",
272-
"stderr",
271+
"stdout\n",
272+
"stderr\n",
273273
},
274274
)
275275

@@ -281,8 +281,8 @@ def test_docker_logs_streams(self) -> None:
281281
self.assertEqual(
282282
logs,
283283
{
284-
"stdout",
285-
"stderr",
284+
"stdout\n",
285+
"stderr\n",
286286
},
287287
)
288288

@@ -294,7 +294,7 @@ def test_docker_logs_streams(self) -> None:
294294
self.assertEqual(
295295
logs,
296296
[
297-
"stderr",
297+
"stderr\n",
298298
],
299299
)
300300

@@ -306,7 +306,7 @@ def test_docker_logs_streams(self) -> None:
306306
self.assertEqual(
307307
logs,
308308
[
309-
"stdout",
309+
"stdout\n",
310310
],
311311
)
312312

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,8 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
511511
self.assertEqual(
512512
list(lines),
513513
[
514-
"foo reg",
515-
"bar reg",
514+
"foo reg\n",
515+
"bar reg\n",
516516
],
517517
)
518518
call = read_namespaced_pod_log.call_args

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def test_submit_inherit_parent_envs(self) -> None:
325325
app = AppDef(name="check_foo_env_var", roles=[role])
326326
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
327327
for line in self.scheduler.log_iter(app_id, "echo_foo"):
328-
self.assertEqual("bar", line)
328+
self.assertEqual("bar\n", line)
329329

330330
desc = self.wait(app_id, self.scheduler)
331331
assert desc is not None
@@ -429,7 +429,7 @@ def test_submit_override_parent_env(self) -> None:
429429
app = AppDef(name="check_foo_env_var", roles=[role])
430430
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
431431
for line in self.scheduler.log_iter(app_id, "echo_foo"):
432-
self.assertEqual("new_bar", line)
432+
self.assertEqual("new_bar\n", line)
433433

434434
desc = self.wait(app_id, self.scheduler)
435435
assert desc is not None
@@ -598,20 +598,20 @@ def test_log_iterator(self) -> None:
598598
app_id = self.scheduler.submit(app, cfg)
599599

600600
for i, line in enumerate(self.scheduler.log_iter(app_id, "role1", k=0)):
601-
self.assertEqual(str(i), line)
601+
self.assertEqual(str(i), line.strip())
602602

603603
# since and until ignored
604604
for i, line in enumerate(
605605
self.scheduler.log_iter(
606606
app_id, "role1", k=0, since=datetime.now(), until=datetime.now()
607607
)
608608
):
609-
self.assertEqual(str(i), line)
609+
self.assertEqual(str(i), line.strip())
610610

611611
for i, line in enumerate(
612612
self.scheduler.log_iter(app_id, "role1", k=0, regex=r"[02468]")
613613
):
614-
self.assertEqual(str(i * 2), line)
614+
self.assertEqual(str(i * 2), line.strip())
615615

616616
def test_log_iterator_no_log_dir(self) -> None:
617617
role = Role(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ def test_log_iter(self, run: MagicMock) -> None:
373373
since=datetime.datetime.now(),
374374
)
375375
)
376-
self.assertEqual(logs, ["hello", "world"])
376+
self.assertEqual(logs, ["hello\n", "world\n"])
377377

378378
with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
379379
f.write("foo\nbar\n")
@@ -387,7 +387,7 @@ def test_log_iter(self, run: MagicMock) -> None:
387387
)
388388
)
389389

390-
self.assertEqual(logs, ["foo", "bar"])
390+
self.assertEqual(logs, ["foo\n", "bar\n"])
391391

392392
# no stream specified should default to STDERR
393393
logs = list(
@@ -397,7 +397,7 @@ def test_log_iter(self, run: MagicMock) -> None:
397397
1,
398398
)
399399
)
400-
self.assertEqual(logs, ["foo", "bar"])
400+
self.assertEqual(logs, ["foo\n", "bar\n"])
401401

402402
with self.assertRaises(ValueError):
403403
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)

0 commit comments

Comments
 (0)