Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ Executor Objects
return immediately and the resources associated with the executor will be
freed when all pending futures are done executing. Regardless of the
value of *wait*, the entire Python program will not exit until all
pending futures are done executing.
pending futures are done executing (see the *daemon* parameter of
:class:`ThreadPoolExecutor` for an exception to this rule).

If *cancel_futures* is ``True``, this method will cancel all pending
futures that the executor has not started running. Any futures that
Expand Down Expand Up @@ -159,7 +160,7 @@ And::
executor.submit(wait_on_future)


.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), *, daemon=False)

An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.
Expand All @@ -177,6 +178,13 @@ And::
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well as any attempt to submit more jobs to the pool.

If *daemon* is ``True``, worker threads are created as
:ref:`daemon threads <thread-objects>`, allowing the interpreter to exit
without waiting for them to finish. This is useful when tasks may be stuck
indefinitely and would otherwise prevent the process from exiting.
``shutdown(wait=True)`` still joins the threads explicitly. Daemon threads
are not supported in subinterpreters and will raise :exc:`RuntimeError`.

.. versionchanged:: 3.5
If *max_workers* is ``None`` or
not given, it will default to the number of processors on the machine,
Expand Down Expand Up @@ -206,6 +214,9 @@ And::
Default value of *max_workers* is changed to
``min(32, (os.process_cpu_count() or 1) + 4)``.

.. versionchanged:: 3.15
Added the *daemon* parameter.


.. _threadpoolexecutor-example:

Expand Down
5 changes: 5 additions & 0 deletions Doc/whatsnew/3.15.rst
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,11 @@ concurrent.futures
terminated process.
(Contributed by Jonathan Berg in :gh:`139486`.)

* Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`.
When set to ``True``, worker threads are created as daemon threads, allowing
the interpreter to exit without waiting for them to finish.
(Contributed by Hrvoje Nikšić in :gh:`80961`.)


contextlib
----------
Expand Down
16 changes: 13 additions & 3 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ def prepare_context(cls, initializer, initargs):
return WorkerContext.prepare(initializer, initargs)

def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), **ctxkwargs):
initializer=None, initargs=(), *, daemon=False,
**ctxkwargs):
"""Initializes a new ThreadPoolExecutor instance.

Args:
Expand All @@ -168,8 +169,15 @@ def __init__(self, max_workers=None, thread_name_prefix='',
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
daemon: If True, worker threads are created as daemon threads.
ctxkwargs: Additional arguments to cls.prepare_context().
"""
if daemon:
if not threading._daemon_threads_allowed():
raise RuntimeError(
'daemon threads are disabled in this (sub)interpreter')
self._daemon = daemon

if max_workers is None:
# ThreadPoolExecutor is often used to:
# * CPU bound task which releases GIL
Expand Down Expand Up @@ -233,10 +241,12 @@ def weakref_cb(_, q=self._work_queue):
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue))
self._work_queue),
daemon=True if self._daemon else None)
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
if not self._daemon:
_threads_queues[t] = self._work_queue

def _initializer_failed(self):
with self._shutdown_lock:
Expand Down
58 changes: 58 additions & 0 deletions Lib/test/test_concurrent_futures/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,64 @@ def test_cancel_futures_wait_false(self):
self.assertIn(out.strip(), [b"apple", b""])


class ThreadPoolDaemonTest(BaseTestCase):
def test_daemon_worker_threads(self):
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
executor.submit(time.sleep, 0)
executor.shutdown(wait=True)
for t in executor._threads:
self.assertTrue(t.daemon)

def test_default_non_daemon_workers(self):
executor = futures.ThreadPoolExecutor(max_workers=2)
executor.submit(time.sleep, 0)
executor.shutdown(wait=True)
for t in executor._threads:
self.assertFalse(t.daemon)

def test_daemon_workers_untracked(self):
from concurrent.futures.thread import _threads_queues
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
executor.submit(time.sleep, 0)
executor.shutdown(wait=True)
for t in executor._threads:
self.assertNotIn(t, _threads_queues)

def test_daemon_explicit_shutdown_wait(self):
# shutdown(wait=True) should still wait for task completion
results = []
def append_after_sleep():
time.sleep(0.1)
results.append(42)
executor = futures.ThreadPoolExecutor(max_workers=2, daemon=True)
executor.submit(append_after_sleep)
executor.shutdown(wait=True)
self.assertEqual(results, [42])

def test_daemon_exit_no_block(self):
# Interpreter should exit promptly with daemon=True and
# shutdown(wait=False), even if a task is still running.
rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import ThreadPoolExecutor
import time
t = ThreadPoolExecutor(max_workers=1, daemon=True)
t.submit(time.sleep, 60)
t.shutdown(wait=False)
""")
self.assertFalse(err)

def test_daemon_context_manager_waits(self):
# Context manager calls shutdown(wait=True), so it should
# still wait for tasks to finish.
results = []
def append_after_sleep():
time.sleep(0.1)
results.append(42)
with futures.ThreadPoolExecutor(max_workers=2, daemon=True) as e:
e.submit(append_after_sleep)
self.assertEqual(results, [42])


class ProcessPoolShutdownTest(ExecutorShutdownTest):
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_processes_terminate(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added *daemon* parameter to :class:`concurrent.futures.ThreadPoolExecutor`.
When set to ``True``, worker threads are created as daemon threads, allowing
the interpreter to exit without waiting for them to finish.
Loading