diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 3ea24ea77004ad..523ca3eed812c5 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -95,7 +95,8 @@ Executor Objects return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of *wait*, the entire Python program will not exit until all - pending futures are done executing. + pending futures are done executing (see the *daemon* parameter of + :class:`ThreadPoolExecutor` for an exception to this rule). If *cancel_futures* is ``True``, this method will cancel all pending futures that the executor has not started running. Any futures that @@ -159,7 +160,7 @@ And:: executor.submit(wait_on_future) -.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()) +.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), *, daemon=False) An :class:`Executor` subclass that uses a pool of at most *max_workers* threads to execute calls asynchronously. @@ -177,6 +178,13 @@ And:: pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`, as well as any attempt to submit more jobs to the pool. + If *daemon* is ``True``, worker threads are created as + :ref:`daemon threads `, allowing the interpreter to exit + without waiting for them to finish. This is useful when tasks may be stuck + indefinitely and would otherwise prevent the process from exiting. + ``shutdown(wait=True)`` still joins the threads explicitly. Daemon threads + are not supported in subinterpreters and will raise :exc:`RuntimeError`. + .. versionchanged:: 3.5 If *max_workers* is ``None`` or not given, it will default to the number of processors on the machine, @@ -206,6 +214,9 @@ And:: Default value of *max_workers* is changed to ``min(32, (os.process_cpu_count() or 1) + 4)``. + .. versionchanged:: 3.15 + Added the *daemon* parameter. + .. _threadpoolexecutor-example: diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst index 0b5902bb013436..3cf2c8348e7bbf 100644 --- a/Doc/whatsnew/3.15.rst +++ b/Doc/whatsnew/3.15.rst @@ -702,6 +702,11 @@ concurrent.futures terminated process. (Contributed by Jonathan Berg in :gh:`139486`.) +* Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`. + When set to ``True``, worker threads are created as daemon threads, allowing + the interpreter to exit without waiting for them to finish. + (Contributed by Hrvoje Nikšić in :gh:`80961`.) + contextlib ---------- diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 909359b648709f..2fdb53fd008639 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -159,7 +159,8 @@ def prepare_context(cls, initializer, initargs): return WorkerContext.prepare(initializer, initargs) def __init__(self, max_workers=None, thread_name_prefix='', - initializer=None, initargs=(), **ctxkwargs): + initializer=None, initargs=(), *, daemon=False, + **ctxkwargs): """Initializes a new ThreadPoolExecutor instance. Args: @@ -168,8 +169,15 @@ def __init__(self, max_workers=None, thread_name_prefix='', thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. + daemon: If True, worker threads are created as daemon threads. ctxkwargs: Additional arguments to cls.prepare_context(). """ + if daemon: + if not threading._daemon_threads_allowed(): + raise RuntimeError( + 'daemon threads are disabled in this (sub)interpreter') + self._daemon = daemon + if max_workers is None: # ThreadPoolExecutor is often used to: # * CPU bound task which releases GIL @@ -233,10 +241,12 @@ def weakref_cb(_, q=self._work_queue): t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._create_worker_context(), - self._work_queue)) + self._work_queue), + daemon=True if self._daemon else None) t.start() self._threads.add(t) - _threads_queues[t] = self._work_queue + if not self._daemon: + _threads_queues[t] = self._work_queue def _initializer_failed(self): with self._shutdown_lock: diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index c576df1068ed32..1c0a9bb082b4ce 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -261,6 +261,64 @@ def test_cancel_futures_wait_false(self): self.assertIn(out.strip(), [b"apple", b""]) +class ThreadPoolDaemonTest(BaseTestCase): + def test_daemon_worker_threads(self): + executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True) + executor.submit(time.sleep, 0) + executor.shutdown(wait=True) + for t in executor._threads: + self.assertTrue(t.daemon) + + def test_default_non_daemon_workers(self): + executor = futures.ThreadPoolExecutor(max_workers=2) + executor.submit(time.sleep, 0) + executor.shutdown(wait=True) + for t in executor._threads: + self.assertFalse(t.daemon) + + def test_daemon_workers_untracked(self): + from concurrent.futures.thread import _threads_queues + executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True) + executor.submit(time.sleep, 0) + executor.shutdown(wait=True) + for t in executor._threads: + self.assertNotIn(t, _threads_queues) + + def test_daemon_explicit_shutdown_wait(self): + # shutdown(wait=True) should still wait for task completion + results = [] + def append_after_sleep(): + time.sleep(0.1) + results.append(42) + executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True) + executor.submit(append_after_sleep) + executor.shutdown(wait=True) + self.assertEqual(results, [42]) + + def test_daemon_exit_no_block(self): + # Interpreter should exit promptly with daemon=True and + # shutdown(wait=False), even if a task is still running. + rc, out, err = assert_python_ok('-c', """if True: + from concurrent.futures import ThreadPoolExecutor + import time + t = ThreadPoolExecutor(max_workers=1, daemon=True) + t.submit(time.sleep, 60) + t.shutdown(wait=False) + """) + self.assertFalse(err) + + def test_daemon_context_manager_waits(self): + # Context manager calls shutdown(wait=True), so it should + # still wait for tasks to finish. + results = [] + def append_after_sleep(): + time.sleep(0.1) + results.append(42) + with futures.ThreadPoolExecutor(max_workers=2, daemon=True) as e: + e.submit(append_after_sleep) + self.assertEqual(results, [42]) + + class ProcessPoolShutdownTest(ExecutorShutdownTest): @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_processes_terminate(self): diff --git a/Misc/NEWS.d/next/Library/2026-03-06-20-41-41.gh-issue-80961.daemon_tpe.rst b/Misc/NEWS.d/next/Library/2026-03-06-20-41-41.gh-issue-80961.daemon_tpe.rst new file mode 100644 index 00000000000000..0b78e55345549c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-06-20-41-41.gh-issue-80961.daemon_tpe.rst @@ -0,0 +1,3 @@ +Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`. +When set to ``True``, worker threads are created as daemon threads, allowing +the interpreter to exit without waiting for them to finish.