Skip to content

Commit e3a1192

Browse files
authored
Merge pull request #3349 from oesteban/enh/robuster-node
MAINT: Simplify interface execution and better error handling of ``Node``
2 parents 79e2fdf + 5f280da commit e3a1192

File tree

11 files changed

+103
-103
lines changed

11 files changed

+103
-103
lines changed

nipype/interfaces/base/core.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
723723
runtime.stderr = None
724724
runtime.cmdline = self.cmdline
725725
runtime.environ.update(out_environ)
726+
runtime.success_codes = correct_return_codes
726727

727728
# which $cmd
728729
executable_name = shlex.split(self._cmd_prefix + self.cmd)[0]
@@ -742,9 +743,6 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
742743
else "<skipped>"
743744
)
744745
runtime = run_command(runtime, output=self.terminal_output)
745-
if runtime.returncode is None or runtime.returncode not in correct_return_codes:
746-
self.raise_exception(runtime)
747-
748746
return runtime
749747

750748
def _format_arg(self, name, trait_spec, value):

nipype/interfaces/base/support.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ def __exit__(self, exc_type, exc_value, exc_tb):
110110
if self._ignore_exc:
111111
return True
112112

113+
if hasattr(self._runtime, "cmdline"):
114+
retcode = self._runtime.returncode
115+
if retcode not in self._runtime.success_codes:
116+
self._runtime.traceback = (
117+
f"RuntimeError: subprocess exited with code {retcode}."
118+
)
119+
113120
@property
114121
def runtime(self):
115122
return self._runtime

nipype/interfaces/matlab.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,11 @@
1717

1818

1919
def get_matlab_command():
20-
if "NIPYPE_NO_MATLAB" in os.environ:
21-
return None
22-
23-
try:
24-
matlab_cmd = os.environ["MATLABCMD"]
25-
except:
26-
matlab_cmd = "matlab"
27-
28-
try:
29-
res = CommandLine(
30-
command="which",
31-
args=matlab_cmd,
32-
resource_monitor=False,
33-
terminal_output="allatonce",
34-
).run()
35-
matlab_path = res.runtime.stdout.strip()
36-
except Exception:
37-
return None
38-
return matlab_cmd
20+
"""Determine whether Matlab is installed and can be executed."""
21+
if "NIPYPE_NO_MATLAB" not in os.environ:
22+
from nipype.utils.filemanip import which
23+
24+
return which(os.getenv("MATLABCMD", "matlab"))
3925

4026

4127
no_matlab = get_matlab_command() is None

nipype/interfaces/tests/test_matlab.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def test_run_interface(tmpdir):
103103
# bypasses ubuntu dash issue
104104
mc = mlab.MatlabCommand(script="foo;", paths=[tmpdir.strpath], mfile=True)
105105
assert not os.path.exists(default_script_file), "scriptfile should not exist 4."
106-
with pytest.raises(RuntimeError):
106+
with pytest.raises(OSError):
107107
mc.run()
108108
assert os.path.exists(default_script_file), "scriptfile should exist 4."
109109
if os.path.exists(default_script_file): # cleanup

nipype/interfaces/utility/tests/test_wrappers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def should_fail(tmp):
7373

7474

7575
def test_should_fail(tmpdir):
76-
with pytest.raises(NameError):
76+
with pytest.raises(pe.nodes.NodeExecutionError):
7777
should_fail(tmpdir)
7878

7979

nipype/pipeline/engine/nodes.py

Lines changed: 44 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import os
1111
import os.path as op
12+
from pathlib import Path
1213
import shutil
1314
import socket
1415
from copy import deepcopy
@@ -30,7 +31,6 @@
3031
load_json,
3132
emptydirs,
3233
savepkl,
33-
indirectory,
3434
silentrm,
3535
)
3636

@@ -64,6 +64,10 @@
6464
logger = logging.getLogger("nipype.workflow")
6565

6666

67+
class NodeExecutionError(RuntimeError):
68+
"""A nipype-specific name for exceptions when executing a Node."""
69+
70+
6771
class Node(EngineBase):
6872
"""
6973
Wraps interface objects for use in pipeline
@@ -98,7 +102,7 @@ def __init__(
98102
run_without_submitting=False,
99103
n_procs=None,
100104
mem_gb=0.20,
101-
**kwargs
105+
**kwargs,
102106
):
103107
"""
104108
Parameters
@@ -439,7 +443,8 @@ def run(self, updatehash=False):
439443
)
440444

441445
# Check hash, check whether run should be enforced
442-
logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir)
446+
if not isinstance(self, MapNode):
447+
logger.info(f'[Node] Setting-up "{self.fullname}" in "{outdir}".')
443448
cached, updated = self.is_cached()
444449

445450
# If the node is cached, check on pklz files and finish
@@ -530,7 +535,6 @@ def run(self, updatehash=False):
530535
# Tear-up after success
531536
shutil.move(hashfile_unfinished, hashfile_unfinished.replace("_unfinished", ""))
532537
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
533-
logger.info('[Node] Finished "%s".', self.fullname)
534538
return result
535539

536540
def _get_hashval(self):
@@ -582,7 +586,7 @@ def _get_inputs(self):
582586
logger.critical("%s", e)
583587

584588
if outputs is None:
585-
raise RuntimeError(
589+
raise NodeExecutionError(
586590
"""\
587591
Error populating the inputs of node "%s": the results file of the source node \
588592
(%s) does not contain any outputs."""
@@ -697,79 +701,56 @@ def _run_command(self, execute, copyfiles=True):
697701
)
698702
return result
699703

700-
outdir = self.output_dir()
701-
# Run command: either execute is true or load_results failed.
702-
result = InterfaceResult(
703-
interface=self._interface.__class__,
704-
runtime=Bunch(
705-
cwd=outdir,
706-
returncode=1,
707-
environ=dict(os.environ),
708-
hostname=socket.gethostname(),
709-
),
710-
inputs=self._interface.inputs.get_traitsfree(),
711-
)
712-
704+
outdir = Path(self.output_dir())
713705
if copyfiles:
714706
self._originputs = deepcopy(self._interface.inputs)
715707
self._copyfiles_to_wd(execute=execute)
716708

717-
message = '[Node] Running "{}" ("{}.{}")'.format(
718-
self.name, self._interface.__module__, self._interface.__class__.__name__
709+
# Run command: either execute is true or load_results failed.
710+
logger.info(
711+
f'[Node] Executing "{self.name}" <{self._interface.__module__}'
712+
f".{self._interface.__class__.__name__}>"
713+
)
714+
# Invoke core run method of the interface ignoring exceptions
715+
result = self._interface.run(cwd=outdir, ignore_exception=True)
716+
logger.info(
717+
f'[Node] Finished "{self.name}", elapsed time {result.runtime.duration}s.'
719718
)
719+
720720
if issubclass(self._interface.__class__, CommandLine):
721-
try:
722-
with indirectory(outdir):
723-
cmd = self._interface.cmdline
724-
except Exception as msg:
725-
result.runtime.stderr = "{}\n\n{}".format(
726-
getattr(result.runtime, "stderr", ""), msg
727-
)
728-
_save_resultfile(
729-
result,
730-
outdir,
731-
self.name,
732-
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
733-
)
734-
raise
735-
cmdfile = op.join(outdir, "command.txt")
736-
with open(cmdfile, "wt") as fd:
737-
print(cmd + "\n", file=fd)
738-
message += ", a CommandLine Interface with command:\n{}".format(cmd)
739-
logger.info(message)
740-
try:
741-
result = self._interface.run(cwd=outdir)
742-
except Exception as msg:
743-
result.runtime.stderr = "%s\n\n%s".format(
744-
getattr(result.runtime, "stderr", ""), msg
745-
)
746-
_save_resultfile(
747-
result,
721+
# Write out command line as it happened
722+
Path.write_text(outdir / "command.txt", f"{result.runtime.cmdline}\n")
723+
724+
exc_tb = getattr(result.runtime, "traceback", None)
725+
726+
if not exc_tb:
727+
# Clean working directory if no errors
728+
dirs2keep = None
729+
if isinstance(self, MapNode):
730+
dirs2keep = [op.join(outdir, "mapflow")]
731+
732+
result.outputs = clean_working_directory(
733+
result.outputs,
748734
outdir,
749-
self.name,
750-
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
735+
self._interface.inputs,
736+
self.needed_outputs,
737+
self.config,
738+
dirs2keep=dirs2keep,
751739
)
752-
raise
753-
754-
dirs2keep = None
755-
if isinstance(self, MapNode):
756-
dirs2keep = [op.join(outdir, "mapflow")]
757740

758-
result.outputs = clean_working_directory(
759-
result.outputs,
760-
outdir,
761-
self._interface.inputs,
762-
self.needed_outputs,
763-
self.config,
764-
dirs2keep=dirs2keep,
765-
)
741+
# Store results file under all circumstances
766742
_save_resultfile(
767743
result,
768744
outdir,
769745
self.name,
770746
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
771747
)
772748

749+
if exc_tb:
750+
raise NodeExecutionError(
751+
f"Exception raised while executing Node {self.name}.\n\n{result.runtime.traceback}"
752+
)
753+
773754
return result
774755

775756
def _copyfiles_to_wd(self, execute=True, linksonly=False):
@@ -1290,7 +1271,7 @@ def _collate_results(self, nodes):
12901271
if code is not None:
12911272
msg += ["Subnode %d failed" % i]
12921273
msg += ["Error: %s" % str(code)]
1293-
raise Exception(
1274+
raise NodeExecutionError(
12941275
"Subnodes of node: %s failed:\n%s" % (self.name, "\n".join(msg))
12951276
)
12961277

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def test_mapnode_crash(tmpdir):
183183
node.config = deepcopy(config._sections)
184184
node.config["execution"]["stop_on_first_crash"] = True
185185
node.base_dir = tmpdir.strpath
186-
with pytest.raises(TypeError):
186+
with pytest.raises(pe.nodes.NodeExecutionError):
187187
node.run()
188188
os.chdir(cwd)
189189

nipype/pipeline/plugins/base.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def run(self, graph, config, updatehash=False):
123123
self.mapnodesubids = {}
124124
# setup polling - TODO: change to threaded model
125125
notrun = []
126+
errors = []
126127

127128
old_progress_stats = None
128129
old_presub_stats = None
@@ -146,7 +147,7 @@ def run(self, graph, config, updatehash=False):
146147
"Progress: %d jobs, %d/%d/%d "
147148
"(done/running/ready), %d/%d "
148149
"(pending_tasks/waiting).",
149-
*progress_stats
150+
*progress_stats,
150151
)
151152
old_progress_stats = progress_stats
152153
toappend = []
@@ -155,14 +156,16 @@ def run(self, graph, config, updatehash=False):
155156
taskid, jobid = self.pending_tasks.pop()
156157
try:
157158
result = self._get_result(taskid)
158-
except Exception:
159+
except Exception as exc:
159160
notrun.append(self._clean_queue(jobid, graph))
161+
errors.append(exc)
160162
else:
161163
if result:
162164
if result["traceback"]:
163165
notrun.append(
164166
self._clean_queue(jobid, graph, result=result)
165167
)
168+
errors.append("".join(result["traceback"]))
166169
else:
167170
self._task_finished_cb(jobid)
168171
self._remove_node_dirs()
@@ -194,6 +197,20 @@ def run(self, graph, config, updatehash=False):
194197
# close any open resources
195198
self._postrun_check()
196199

200+
if errors:
201+
# If one or more nodes failed, re-rise first of them
202+
error, cause = errors[0], None
203+
if isinstance(error, str):
204+
error = RuntimeError(error)
205+
206+
if len(errors) > 1:
207+
error, cause = (
208+
RuntimeError(f"{len(errors)} raised. Re-raising first."),
209+
error,
210+
)
211+
212+
raise error from cause
213+
197214
def _get_result(self, taskid):
198215
raise NotImplementedError
199216

nipype/pipeline/plugins/linear.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ def run(self, graph, config, updatehash=False):
3434
old_wd = os.getcwd()
3535
notrun = []
3636
donotrun = []
37+
stop_on_first_crash = str2bool(config["execution"]["stop_on_first_crash"])
38+
errors = []
3739
nodes, _ = topological_sort(graph)
3840
for node in nodes:
3941
endstatus = "end"
@@ -43,27 +45,38 @@ def run(self, graph, config, updatehash=False):
4345
if self._status_callback:
4446
self._status_callback(node, "start")
4547
node.run(updatehash=updatehash)
46-
except:
48+
except Exception as exc:
4749
endstatus = "exception"
4850
# bare except, but i really don't know where a
4951
# node might fail
5052
crashfile = report_crash(node)
51-
if str2bool(config["execution"]["stop_on_first_crash"]):
52-
raise
5353
# remove dependencies from queue
5454
subnodes = [s for s in dfs_preorder(graph, node)]
5555
notrun.append(
5656
{"node": node, "dependents": subnodes, "crashfile": crashfile}
5757
)
5858
donotrun.extend(subnodes)
5959
# Delay raising the crash until we cleaned the house
60-
if str2bool(config["execution"]["stop_on_first_crash"]):
61-
os.chdir(old_wd) # Return wherever we were before
62-
report_nodes_not_run(notrun) # report before raising
63-
raise
60+
errors.append(exc)
61+
62+
if stop_on_first_crash:
63+
break
6464
finally:
6565
if self._status_callback:
6666
self._status_callback(node, endstatus)
6767

6868
os.chdir(old_wd) # Return wherever we were before
6969
report_nodes_not_run(notrun)
70+
if errors:
71+
# If one or more nodes failed, re-rise first of them
72+
error, cause = errors[0], None
73+
if isinstance(error, str):
74+
error = RuntimeError(error)
75+
76+
if len(errors) > 1:
77+
error, cause = (
78+
RuntimeError(f"{len(errors)} raised. Re-raising first."),
79+
error,
80+
)
81+
82+
raise error from cause

nipype/pipeline/plugins/tests/test_sgelike.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def test_crashfile_creation(tmp_path):
2929
sgelike_plugin = SGELikeBatchManagerBase("")
3030
with pytest.raises(RuntimeError) as e:
3131
assert pipe.run(plugin=sgelike_plugin)
32-
assert str(e.value) == "Workflow did not execute cleanly. Check log for details"
3332

34-
crashfiles = tmp_path.glob("crash*crasher*.pklz")
35-
assert len(list(crashfiles)) == 1
33+
crashfiles = list(tmp_path.glob("crash*crasher*.pklz")) + list(
34+
tmp_path.glob("crash*crasher*.txt")
35+
)
36+
assert len(crashfiles) == 1

0 commit comments

Comments
 (0)