diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 909359b648709f..bd8eb33d753598 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -264,6 +264,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): break if work_item is not None: work_item.future.cancel() + work_item.future.set_running_or_notify_cancel() # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index 4324241b374967..ad3488f0407432 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -112,6 +112,56 @@ def log_n_wait(ident): # ident='third' is cancelled because it remained in the collection of futures self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) + def test_shutdown_cancels_pending_futures(self): + def waiter(barrier): + barrier.wait(3) + def noop(): + pass + barrier = threading.Barrier(2) + called_back_1 = threading.Event() + called_back_2 = threading.Event() + with self.executor_type(max_workers=1) as pool: + + # Submit two futures, the first of which will block and prevent the + # second from running + f1 = pool.submit(waiter, barrier) + f2 = pool.submit(noop) + f1.add_done_callback(lambda f: called_back_1.set()) + f2.add_done_callback(lambda f: called_back_2.set()) + fs = {f1, f2} + + completed_iter = futures.as_completed(fs, timeout=0) + self.assertRaises(TimeoutError, next, completed_iter) + + # Shutdown the pool, cancelling unstarted task + pool.shutdown(wait=False, cancel_futures=True) + self.assertTrue(f1.running()) + self.assertTrue(f2.cancelled()) + self.assertFalse(called_back_1.is_set()) + self.assertTrue(called_back_2.is_set()) + + completed_iter = futures.as_completed(fs, timeout=0) + f = next(completed_iter) + self.assertIs(f, f2) + self.assertRaises(TimeoutError, next, completed_iter) + + result = futures.wait(fs, timeout=0) + self.assertEqual(result.not_done, {f1}) + self.assertEqual(result.done, {f2}) + + # Unblock and wait for the first future to complete + barrier.wait(3) + called_back_1.wait(3) + self.assertTrue(f1.done()) + self.assertTrue(called_back_1.is_set()) + + completed = set(futures.as_completed(fs, timeout=0)) + self.assertEqual(set(fs), completed) + + result = futures.wait(fs, timeout=0) + self.assertEqual(result.not_done, set()) + self.assertEqual(result.done, set(fs)) + def setUpModule(): setup_module() diff --git a/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst b/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst new file mode 100644 index 00000000000000..85de1993909453 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst @@ -0,0 +1,2 @@ +Ensure :class:`concurrent.futures.ThreadPoolExecutor` notifies any futures +it cancels on shutdown.