diff --git a/nipype/__init__.py b/nipype/__init__.py index 18449c5f81..43e9011175 100644 --- a/nipype/__init__.py +++ b/nipype/__init__.py @@ -14,11 +14,7 @@ import os from distutils.version import LooseVersion -from .info import ( - URL as __url__, - STATUS as __status__, - __version__, -) +from .info import URL as __url__, STATUS as __status__, __version__ from .utils.config import NipypeConfig from .utils.logger import Logging from .refs import due @@ -105,6 +101,8 @@ def check_latest_version(raise_exception=False): packname="nipype", version=__version__, latest=latest["version"] ) ) + else: + logger.info("No new version available.") if latest["bad_versions"] and any( [ LooseVersion(__version__) == LooseVersion(ver) @@ -126,7 +124,7 @@ def check_latest_version(raise_exception=False): if config.getboolean("execution", "check_version"): import __main__ - if not hasattr(__main__, "__file__"): + if not hasattr(__main__, "__file__") and "NIPYPE_NO_ET" not in os.environ: from .interfaces.base import BaseInterface if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index 6c11084032..82da393a84 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -168,7 +168,10 @@ class BaseInterface(Interface): def __init__( self, from_file=None, resource_monitor=None, ignore_exception=False, **inputs ): - if config.getboolean("execution", "check_version"): + if ( + config.getboolean("execution", "check_version") + and "NIPYPE_NO_ET" not in os.environ + ): from ... import check_latest_version if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index 620aadb422..528184472d 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -153,6 +153,12 @@ class NonDaemonPool(pool.Pool): Process = NonDaemonProcess +def process_initializer(cwd): + """Initializes the environment of the child process""" + os.chdir(cwd) + os.environ["NIPYPE_NO_ET"] = "1" + + class LegacyMultiProcPlugin(DistributedPluginBase): """ Execute workflow with multiprocessing, not sending more jobs at once @@ -223,7 +229,7 @@ def __init__(self, plugin_args=None): self.pool = NipypePool( processes=self.processors, maxtasksperchild=maxtasks, - initializer=os.chdir, + initializer=process_initializer, initargs=(self._cwd,), ) except TypeError: diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index dc950385b1..eac662533c 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -10,7 +10,7 @@ # Import packages import os import multiprocessing as mp -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, wait from traceback import format_exception import sys from logging import INFO @@ -73,6 +73,12 @@ def run_node(node, updatehash, taskid): return result +def process_initializer(cwd): + """Initializes the environment of the child process""" + os.chdir(cwd) + os.environ["NIPYPE_NO_ET"] = "1" + + class MultiProcPlugin(DistributedPluginBase): """ Execute workflow with multiprocessing, not sending more jobs at once @@ -134,16 +140,18 @@ def __init__(self, plugin_args=None): ) try: - mp_context = mp.context.get_context(self.plugin_args.get("mp_context")) + mp_context = mp.get_context(self.plugin_args.get("mp_context")) self.pool = ProcessPoolExecutor( max_workers=self.processors, - initializer=os.chdir, + initializer=process_initializer, initargs=(self._cwd,), mp_context=mp_context, ) except (AttributeError, TypeError): # Python < 3.7 does not support initialization or contexts self.pool = ProcessPoolExecutor(max_workers=self.processors) + result_future = self.pool.submit(process_initializer, self._cwd) + wait([result_future], timeout=5) self._stats = None diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 2bb31de564..ef213be36d 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -125,7 +125,13 @@ def create_pyscript(node, updatehash=False, store_exception=True): can_import_matplotlib = False pass +import os +value = os.environ.get('NIPYPE_NO_ET', None) +if value is None: + # disable ET for any submitted job + os.environ['NIPYPE_NO_ET'] = "1" from nipype import config, logging + from nipype.utils.filemanip import loadpkl, savepkl from socket import gethostname from traceback import format_exception diff --git a/nipype/tests/test_nipype.py b/nipype/tests/test_nipype.py index ab3499c8db..60fa92d141 100644 --- a/nipype/tests/test_nipype.py +++ b/nipype/tests/test_nipype.py @@ -19,3 +19,90 @@ def test_nipype_info(): def test_git_hash(): # removing the first "g" from gitversion get_nipype_gitversion()[1:] == get_info()["commit_hash"] + + +def _check_no_et(): + import os + from unittest.mock import patch + + et = os.getenv("NIPYPE_NO_ET") is None + + with patch.dict("os.environ", {"NIPYPE_NO_ET": "1"}): + from nipype.interfaces.base import BaseInterface + + ver_data = BaseInterface._etelemetry_version_data + + if et and ver_data is None: + raise ValueError( + "etelemetry enabled and version data missing - double hits likely" + ) + + return et + + +def test_no_et(tmp_path): + from unittest.mock import patch + from nipype.pipeline import engine as pe + from nipype.interfaces import utility as niu + from nipype.interfaces.base import BaseInterface + + # Pytest doesn't trigger this, so let's pretend it's there + with patch.object(BaseInterface, "_etelemetry_version_data", {}): + + # Direct function call - environment not set + f = niu.Function(function=_check_no_et) + res = f.run() + assert res.outputs.out is True + + # Basic node - environment not set + n = pe.Node( + niu.Function(function=_check_no_et), name="n", base_dir=str(tmp_path) + ) + res = n.run() + assert res.outputs.out is True + + # Linear run - environment not set + wf1 = pe.Workflow(name="wf1", base_dir=str(tmp_path)) + wf1.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf1.run() + assert next(iter(res.nodes)).result.outputs.out is True + + # MultiProc run - environment initialized with NIPYPE_NO_ET + wf2 = pe.Workflow(name="wf2", base_dir=str(tmp_path)) + wf2.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf2.run(plugin="MultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is False + + # LegacyMultiProc run - environment initialized with NIPYPE_NO_ET + wf3 = pe.Workflow(name="wf3", base_dir=str(tmp_path)) + wf3.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf3.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is False + + # run_without_submitting - environment not set + wf4 = pe.Workflow(name="wf4", base_dir=str(tmp_path)) + wf4.add_nodes( + [ + pe.Node( + niu.Function(function=_check_no_et), + run_without_submitting=True, + name="n", + ) + ] + ) + res = wf4.run(plugin="MultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is True + + # LegacyMultiProc run - environment initialized with NIPYPE_NO_ET + wf5 = pe.Workflow(name="wf5", base_dir=str(tmp_path)) + wf5.add_nodes( + [ + pe.Node( + niu.Function(function=_check_no_et), + run_without_submitting=True, + name="n", + ) + ] + ) + res = wf5.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is True