-
Notifications
You must be signed in to change notification settings - Fork 396
Automatically clean thread and loop on fork #1790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
global lock | ||
loop[0] = None | ||
iothread[0] = None | ||
lock = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this works? Why is only the lock
declared as global
. Furthermore, there is no lock
in this file, only _lock
and get_lock
. This got changed in ffe57d6. Furthermore, this function is basically identical to reset_lock
, so why not use that function?
This is a step in the right direction. I need something like this because fusepy forks to daemonize itself. Currently, I am trying this: try:
import fsspec.asyn
def tryCloseFSSpecIOBeforeFork() -> None:
try:
# We only cover a single use case: Only the main thread and a single fsspecIO thread are running.
# Then, we can close it presumably safely. Anything else would possibly not be thread-safe.
# See comments below about resetting the lock.
if (
len(threading.enumerate()) != 2
or not all(thread.name in ["MainThread", "fsspecIO"] for thread in threading.enumerate())
or fsspec.asyn.iothread[0] is None
):
return
# The lock was changed 3 years ago to a _lock and get_lock singleton.
# https://github.com/fsspec/filesystem_spec/commit/ffe57d6eabe517b4c39c27487fc45b804d314b58
# But acquiring the lock does not help us anyway to fix thread-safety in case other threads have
# a reference to the event loop after a call to get_loop. Therefore, do the threading checks above
# and don't bother with locking.
# These are lists with a single element, which is None at first, for some reason Why?
ioThread = fsspec.asyn.iothread[0]
eventLoop = fsspec.asyn.loop[0]
if eventLoop is not None: # Should always be true because else iothread[0] would also be None.
# Calling eventLoop.stop() directly does not work for some reason.
# Probably needs to be called on the executing thread.
eventLoop.call_soon_threadsafe(eventLoop.stop)
if ioThread is not None and ioThread.is_alive():
ioThread.join()
# This should be safe as long as no other thread is using the event loop.
# This should not be done if there are potentially other threads using the event loop
# because get_loop only accounts for race conditions during event loop creation, but after that,
# it simply returns the event loop to be used without any lock, i.e., changing or deleting the
# event loop is not thread-safe!
# https://github.com/fsspec/filesystem_spec/blob/26f1ea75351e39a80b29b27bea792351f3e8da9f/
# fsspec/asyn.py#L141
# The next call to fsspec.asyn.get_loop should simply recreate a new thread and event loop.
# But, for my use case, the next call would only be after a fork anyway.
# Normally, there is no reason to reset the lock on this thread. However, this is to be used before
# forking and we even check against other threads running above, so it should be safe to reset the
# lock.
# See also https://github.com/fsspec/filesystem_spec/pull/1790
reset_after_fork = getattr(fsspec.asyn, 'reset_after_fork', None)
reset_lock = getattr(fsspec.asyn, 'reset_lock', None)
if reset_lock:
reset_lock()
elif reset_after_fork:
# Theoretically, this call is redundant because fsspec registers it to be called on fork if it exists.
reset_after_fork()
else:
fsspec.asyn.iothread[0] = None
fsspec.asyn.loop[0] = None
fsspec.asyn.lock = None
except Exception:
pass Also, consider the comment here: def reset_lock():
"""Reset the global lock.
This should be called only on the init of a forked process to reset the lock to
None, enabling the new forked process to get a new lock.
""" I think you could also test for correct usage by checking that there are no threads started for the current process, e.g., the way I do it in ratarmount to check before forking: # Note that this will not detect threads started in shared libraries, only those started via "threading".
if not foreground and len(threading.enumerate()) > 1:
threadNames = [thread.name for thread in threading.enumerate() if thread.name != "MainThread"]
# Fix FUSE hangs with: https://unix.stackexchange.com/a/713621/111050
raise ValueError(
"Daemonizing FUSE into the background may result in errors or unkillable hangs because "
f"there are threads still open: {', '.join(threadNames)}!\nCall ratarmount with -f or --foreground."
) |
Unfortunately, the above does not work because it triggers: the |
Please submit your ideas as a new PR so that we can discuss it in isolation.
Not everyone will want this, sometimes the original process wants to keep using those resources.
If we are sure the child process can work, setting up its new resources, then we no longer need the check.
The other objects are mutated in-place
This may well be a typo. |
No description provided.