Skip to content

schedulers,cli: persist newline breaks in log_iter #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion torchx/cli/cmd_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ def validate(job_identifier: str) -> None:
sys.exit(1)


def _prefix_line(prefix: str, line: str) -> str:
"""
_prefix_line ensure the prefix is still present even when dealing with return characters
"""
if "\r" in line:
line = line.replace("\r", f"\r{prefix}")
if not line.startswith("\r"):
line = f"{prefix}{line}"
return line


def print_log_lines(
file: TextIO,
runner: Runner,
Expand All @@ -55,7 +66,8 @@ def print_log_lines(
should_tail=should_tail,
streams=streams,
):
print(f"{GREEN}{role_name}/{replica_id}{ENDC} {line}", file=file)
prefix = f"{GREEN}{role_name}/{replica_id}{ENDC} "
print(_prefix_line(prefix, line), file=file, end="")
except Exception as e:
exceptions.put(e)
raise
Expand Down
2 changes: 1 addition & 1 deletion torchx/cli/test/cmd_log_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def log_lines(
if regex is None:
regex = ".*"

log_lines = ["INFO foo", "ERROR bar", "WARN baz"]
log_lines = ["INFO foo\n", "ERROR bar\n", "WARN baz\n"]
return iter([line for line in log_lines if re.match(regex, line)])


Expand Down
36 changes: 22 additions & 14 deletions torchx/runner/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,28 +469,36 @@ def log_lines(
if the scheduler has already totally or partially purged log records
for the application.

Return lines will include whitespace characters such as ``\\n`` or
``\\r``. When outputting the lines you should make sure to avoid adding
extra newline characters.

Usage:

::
.. code:: python

app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())

print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
print(line)
print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
# for prints newlines will already be present in the line
print(line, end="")

# when writing to a file nothing extra is necessary
f.write(line)

Discouraged anti-pattern:

::
.. code:: python

# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
if matches_regex(line, "final model_accuracy:[0-9]*"):
accuracy = parse_accuracy(line)
break
report(experiment_name, accuracy)
# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
if matches_regex(line, "final model_accuracy:[0-9]*"):
accuracy = parse_accuracy(line)
break
report(experiment_name, accuracy)

Args:
app_handle: application handle
Expand Down
21 changes: 21 additions & 0 deletions torchx/schedulers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ def log_iter(
7. Some schedulers may support line cursors by supporting ``__getitem__``
(e.g. ``iter[50]`` seeks to the 50th log line).

8. Whitespace is preserved, each new line should include ``\\n``. To
support interactive progress bars the returned lines don't need to
include ``\\n`` but should then be printed without a newline to
correctly handle ``\\r`` carriage returns.

Args:
streams: The IO output streams to select.
One of: combined, stdout, stderr.
Expand Down Expand Up @@ -302,3 +307,19 @@ def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:

r = re.compile(regex)
return filter(lambda datum: r.search(datum), data)


def split_lines(text: str) -> List[str]:
"""
split_lines splits the string by new lines and keeps the new line characters.
"""
lines = []
while len(text) > 0:
idx = text.find("\n")
if idx >= 0:
lines.append(text[: idx + 1])
text = text[idx + 1 :]
else:
lines.append(text)
break
return lines
2 changes: 1 addition & 1 deletion torchx/schedulers/aws_batch_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def _stream_events(
next_token = response["nextForwardToken"]

for event in response["events"]:
yield event["message"]
yield event["message"] + "\n"


def create_scheduler(session_name: str, **kwargs: object) -> AWSBatchScheduler:
Expand Down
5 changes: 2 additions & 3 deletions torchx/schedulers/docker_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Scheduler,
Stream,
filter_regex,
split_lines,
)
from torchx.schedulers.ids import make_unique
from torchx.specs.api import (
Expand Down Expand Up @@ -425,7 +426,7 @@ def log_iter(
if len(logs) == 0:
logs = []
else:
logs = logs.split("\n")
logs = split_lines(logs)

logs = map(_to_str, logs)

Expand All @@ -438,8 +439,6 @@ def log_iter(
def _to_str(a: Union[str, bytes]) -> str:
if isinstance(a, bytes):
a = a.decode("utf-8")
if a.endswith("\n"):
a = a[:-1]
return a


Expand Down
3 changes: 2 additions & 1 deletion torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
Scheduler,
Stream,
filter_regex,
split_lines,
)
from torchx.schedulers.ids import make_unique
from torchx.specs.api import (
Expand Down Expand Up @@ -640,7 +641,7 @@ def log_iter(
iterator = w.stream(core_api.read_namespaced_pod_log, **args)
else:
resp = core_api.read_namespaced_pod_log(**args)
iterator = resp.strip().split("\n")
iterator = split_lines(resp)

if regex:
return filter_regex(regex, iterator)
Expand Down
3 changes: 1 addition & 2 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ def __iter__(self) -> "LogIterator":
self._check_finished() # check to see if app has finished running

if os.path.isfile(self._log_file):
self._log_fp = open(self._log_file, "r") # noqa: P201
self._log_fp = open(self._log_file, "rt", newline="\n") # noqa: P201
break

if self._app_finished:
Expand Down Expand Up @@ -1049,7 +1049,6 @@ def __next__(self) -> str:
time.sleep(0.1)
self._check_finished()
else:
line = line.rstrip("\n") # strip the trailing newline
if re.match(self._regex, line):
return line

Expand Down
3 changes: 2 additions & 1 deletion torchx/schedulers/ray_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
DescribeAppResponse,
Scheduler,
Stream,
split_lines,
)
from torchx.schedulers.ids import make_unique
from torchx.schedulers.ray.ray_common import RayActor
Expand Down Expand Up @@ -350,7 +351,7 @@ def log_iter(
addr, app_id = app_id.split("-")
client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}")
logs: str = client.get_job_logs(app_id)
return logs.split("\n")
return split_lines(logs)

def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler:
if not has_ray(): # pragma: no cover
Expand Down
13 changes: 12 additions & 1 deletion torchx/schedulers/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from typing import Iterable, Mapping, Optional, Union
from unittest.mock import MagicMock, patch

from torchx.schedulers.api import DescribeAppResponse, Scheduler, Stream
from torchx.schedulers.api import (
DescribeAppResponse,
Scheduler,
Stream,
split_lines,
)
from torchx.specs.api import (
NULL_RESOURCE,
AppDef,
Expand Down Expand Up @@ -152,3 +157,9 @@ def test_close_twice(self) -> None:
scheduler_mock.close()
scheduler_mock.close()
# nothing to validate explicitly, just that no errors are raised

def test_split_lines(self) -> None:
self.assertEqual(split_lines(""), [])
self.assertEqual(split_lines("\n"), ["\n"])
self.assertEqual(split_lines("foo\nbar"), ["foo\n", "bar"])
self.assertEqual(split_lines("foo\nbar\n"), ["foo\n", "bar\n"])
4 changes: 2 additions & 2 deletions torchx/schedulers/test/aws_batch_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,8 @@ def test_log_iter(self) -> None:
self.assertEqual(
list(logs),
[
"foo",
"foobar",
"foo\n",
"foobar\n",
],
)

Expand Down
22 changes: 11 additions & 11 deletions torchx/schedulers/test/docker_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ def test_docker_logs(self) -> None:
self.assertEqual(
logs,
[
"foo",
"bar",
"foo\n",
"bar\n",
],
)
logs = list(
Expand All @@ -234,7 +234,7 @@ def test_docker_logs(self) -> None:
self.assertEqual(
logs,
[
"bar",
"bar\n",
],
)

Expand Down Expand Up @@ -267,8 +267,8 @@ def test_docker_logs(self) -> None:
self.assertEqual(
logs,
[
"foo",
"bar",
"foo\n",
"bar\n",
],
)

Expand All @@ -286,8 +286,8 @@ def test_docker_logs_streams(self) -> None:
self.assertEqual(
logs,
{
"stdout",
"stderr",
"stdout\n",
"stderr\n",
},
)

Expand All @@ -299,8 +299,8 @@ def test_docker_logs_streams(self) -> None:
self.assertEqual(
logs,
{
"stdout",
"stderr",
"stdout\n",
"stderr\n",
},
)

Expand All @@ -312,7 +312,7 @@ def test_docker_logs_streams(self) -> None:
self.assertEqual(
logs,
[
"stderr",
"stderr\n",
],
)

Expand All @@ -324,7 +324,7 @@ def test_docker_logs_streams(self) -> None:
self.assertEqual(
logs,
[
"stdout",
"stdout\n",
],
)

Expand Down
4 changes: 2 additions & 2 deletions torchx/schedulers/test/kubernetes_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
self.assertEqual(
list(lines),
[
"foo reg",
"bar reg",
"foo reg\n",
"bar reg\n",
],
)
call = read_namespaced_pod_log.call_args
Expand Down
10 changes: 5 additions & 5 deletions torchx/schedulers/test/local_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_submit_inherit_parent_envs(self) -> None:
app = AppDef(name="check_foo_env_var", roles=[role])
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
for line in self.scheduler.log_iter(app_id, "echo_foo"):
self.assertEqual("bar", line)
self.assertEqual("bar\n", line)

desc = self.wait(app_id, self.scheduler)
assert desc is not None
Expand Down Expand Up @@ -431,7 +431,7 @@ def test_submit_override_parent_env(self) -> None:
app = AppDef(name="check_foo_env_var", roles=[role])
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
for line in self.scheduler.log_iter(app_id, "echo_foo"):
self.assertEqual("new_bar", line)
self.assertEqual("new_bar\n", line)

desc = self.wait(app_id, self.scheduler)
assert desc is not None
Expand Down Expand Up @@ -600,20 +600,20 @@ def test_log_iterator(self) -> None:
app_id = self.scheduler.submit(app, cfg)

for i, line in enumerate(self.scheduler.log_iter(app_id, "role1", k=0)):
self.assertEqual(str(i), line)
self.assertEqual(str(i), line.strip())

# since and until ignored
for i, line in enumerate(
self.scheduler.log_iter(
app_id, "role1", k=0, since=datetime.now(), until=datetime.now()
)
):
self.assertEqual(str(i), line)
self.assertEqual(str(i), line.strip())

for i, line in enumerate(
self.scheduler.log_iter(app_id, "role1", k=0, regex=r"[02468]")
):
self.assertEqual(str(i * 2), line)
self.assertEqual(str(i * 2), line.strip())

def test_log_iterator_no_log_dir(self) -> None:
role = Role(
Expand Down
6 changes: 3 additions & 3 deletions torchx/schedulers/test/slurm_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def test_log_iter(self, run: MagicMock) -> None:
since=datetime.datetime.now(),
)
)
self.assertEqual(logs, ["hello", "world"])
self.assertEqual(logs, ["hello\n", "world\n"])

with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
f.write("foo\nbar\n")
Expand All @@ -387,7 +387,7 @@ def test_log_iter(self, run: MagicMock) -> None:
)
)

self.assertEqual(logs, ["foo", "bar"])
self.assertEqual(logs, ["foo\n", "bar\n"])

# no stream specified should default to STDERR
logs = list(
Expand All @@ -397,7 +397,7 @@ def test_log_iter(self, run: MagicMock) -> None:
1,
)
)
self.assertEqual(logs, ["foo", "bar"])
self.assertEqual(logs, ["foo\n", "bar\n"])

with self.assertRaises(ValueError):
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)
Expand Down