diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 73f1f5f1aa1..cc7f745afaa 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -106,9 +106,7 @@ env: test_weakref test_yield_from ENV_POLLUTING_TESTS_COMMON: >- - test_threading ENV_POLLUTING_TESTS_LINUX: >- - test.test_concurrent_futures.test_thread_pool test.test_multiprocessing_fork.test_processes test.test_multiprocessing_fork.test_threads test.test_multiprocessing_forkserver.test_processes @@ -116,7 +114,6 @@ env: test.test_multiprocessing_spawn.test_processes test.test_multiprocessing_spawn.test_threads ENV_POLLUTING_TESTS_MACOS: >- - test.test_concurrent_futures.test_thread_pool test.test_multiprocessing_forkserver.test_processes test.test_multiprocessing_forkserver.test_threads test.test_multiprocessing_spawn.test_processes @@ -297,21 +294,21 @@ jobs: env: RUSTPYTHON_SKIP_ENV_POLLUTERS: true run: target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed --timeout 600 -v ${{ env.PLATFORM_INDEPENDENT_TESTS }} - timeout-minutes: 35 + timeout-minutes: 45 - if: runner.os == 'Linux' name: run cpython platform-dependent tests (Linux) env: RUSTPYTHON_SKIP_ENV_POLLUTERS: true run: target/release/rustpython -m test -j 1 -u all --slowest --fail-env-changed --timeout 600 -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} - timeout-minutes: 35 + timeout-minutes: 45 - if: runner.os == 'macOS' name: run cpython platform-dependent tests (MacOS) env: RUSTPYTHON_SKIP_ENV_POLLUTERS: true run: target/release/rustpython -m test -j 1 --slowest --fail-env-changed --timeout 600 -v -x ${{ env.PLATFORM_INDEPENDENT_TESTS }} - timeout-minutes: 35 + timeout-minutes: 45 - if: runner.os == 'Windows' name: run cpython platform-dependent tests (windows partial - fixme) diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py index 424b0b3be5e..0630d4e59fa 100644 --- a/Lib/_dummy_thread.py +++ b/Lib/_dummy_thread.py @@ -11,15 +11,35 @@ import _dummy_thread as _thread """ + # Exports only things specified by thread documentation; # skipping obsolete synonyms allocate(), start_new(), exit_thread(). -__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock', - 'interrupt_main', 'LockType', 'RLock', - '_count'] +__all__ = [ + "error", + "start_new_thread", + "exit", + "get_ident", + "allocate_lock", + "interrupt_main", + "LockType", + "RLock", + "_count", + "start_joinable_thread", + "daemon_threads_allowed", + "_shutdown", + "_make_thread_handle", + "_ThreadHandle", + "_get_main_thread_ident", + "_is_main_interpreter", + "_local", +] # A dummy value TIMEOUT_MAX = 2**31 +# Main thread ident for dummy implementation +_MAIN_THREAD_IDENT = -1 + # NOTE: this module can be imported early in the extension building process, # and so top level imports of other modules should be avoided. Instead, all # imports are done when needed on a function-by-function basis. Since threads @@ -27,6 +47,7 @@ error = RuntimeError + def start_new_thread(function, args, kwargs={}): """Dummy implementation of _thread.start_new_thread(). @@ -52,6 +73,7 @@ def start_new_thread(function, args, kwargs={}): pass except: import traceback + traceback.print_exc() _main = True global _interrupt @@ -59,10 +81,58 @@ def start_new_thread(function, args, kwargs={}): _interrupt = False raise KeyboardInterrupt + +def start_joinable_thread(function, handle=None, daemon=True): + """Dummy implementation of _thread.start_joinable_thread(). + + In dummy thread, we just run the function synchronously. + """ + if handle is None: + handle = _ThreadHandle() + try: + function() + except SystemExit: + pass + except: + import traceback + + traceback.print_exc() + handle._set_done() + return handle + + +def daemon_threads_allowed(): + """Dummy implementation of _thread.daemon_threads_allowed().""" + return True + + +def _shutdown(): + """Dummy implementation of _thread._shutdown().""" + pass + + +def _make_thread_handle(ident): + """Dummy implementation of _thread._make_thread_handle().""" + handle = _ThreadHandle() + handle._ident = ident + return handle + + +def _get_main_thread_ident(): + """Dummy implementation of _thread._get_main_thread_ident().""" + return _MAIN_THREAD_IDENT + + +def _is_main_interpreter(): + """Dummy implementation of _thread._is_main_interpreter().""" + return True + + def exit(): """Dummy implementation of _thread.exit().""" raise SystemExit + def get_ident(): """Dummy implementation of _thread.get_ident(). @@ -70,26 +140,31 @@ def get_ident(): available, it is safe to assume that the current process is the only thread. Thus a constant can be safely returned. """ - return -1 + return _MAIN_THREAD_IDENT + def allocate_lock(): """Dummy implementation of _thread.allocate_lock().""" return LockType() + def stack_size(size=None): """Dummy implementation of _thread.stack_size().""" if size is not None: raise error("setting thread stack size not supported") return 0 + def _set_sentinel(): """Dummy implementation of _thread._set_sentinel().""" return LockType() + def _count(): """Dummy implementation of _thread._count().""" return 0 + class LockType(object): """Class implementing dummy implementation of _thread.LockType. @@ -125,6 +200,7 @@ def acquire(self, waitflag=None, timeout=-1): else: if timeout > 0: import time + time.sleep(timeout) return False @@ -153,14 +229,41 @@ def __repr__(self): "locked" if self.locked_status else "unlocked", self.__class__.__module__, self.__class__.__qualname__, - hex(id(self)) + hex(id(self)), ) + +class _ThreadHandle: + """Dummy implementation of _thread._ThreadHandle.""" + + def __init__(self): + self._ident = _MAIN_THREAD_IDENT + self._done = False + + @property + def ident(self): + return self._ident + + def _set_done(self): + self._done = True + + def is_done(self): + return self._done + + def join(self, timeout=None): + # In dummy thread, thread is always done + return + + def __repr__(self): + return f"<_ThreadHandle ident={self._ident}>" + + # Used to signal that interrupt_main was called in a "thread" _interrupt = False # True when not executing in a "thread" _main = True + def interrupt_main(): """Set _interrupt flag to True to have start_new_thread raise KeyboardInterrupt upon exiting.""" @@ -170,6 +273,7 @@ def interrupt_main(): global _interrupt _interrupt = True + class RLock: def __init__(self): self.locked_count = 0 @@ -190,7 +294,7 @@ def release(self): return True def locked(self): - return self.locked_status != 0 + return self.locked_count != 0 def __repr__(self): return "<%s %s.%s object owner=%s count=%s at %s>" % ( @@ -199,5 +303,36 @@ def __repr__(self): self.__class__.__qualname__, get_ident() if self.locked_count else 0, self.locked_count, - hex(id(self)) + hex(id(self)), ) + + +class _local: + """Dummy implementation of _thread._local (thread-local storage).""" + + def __init__(self): + object.__setattr__(self, "_local__impl", {}) + + def __getattribute__(self, name): + if name.startswith("_local__"): + return object.__getattribute__(self, name) + impl = object.__getattribute__(self, "_local__impl") + try: + return impl[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value): + if name.startswith("_local__"): + return object.__setattr__(self, name, value) + impl = object.__getattribute__(self, "_local__impl") + impl[name] = value + + def __delattr__(self, name): + if name.startswith("_local__"): + return object.__delattr__(self, name) + impl = object.__getattribute__(self, "_local__impl") + try: + del impl[name] + except KeyError: + raise AttributeError(name) diff --git a/Lib/dummy_threading.py b/Lib/dummy_threading.py index 1bb7eee338a..662f3b89a9a 100644 --- a/Lib/dummy_threading.py +++ b/Lib/dummy_threading.py @@ -6,6 +6,7 @@ regardless of whether ``_thread`` was available which is not desired. """ + from sys import modules as sys_modules import _dummy_thread @@ -19,35 +20,38 @@ # Could have checked if ``_thread`` was not in sys.modules and gone # a different route, but decided to mirror technique used with # ``threading`` below. - if '_thread' in sys_modules: - held_thread = sys_modules['_thread'] + if "_thread" in sys_modules: + held_thread = sys_modules["_thread"] holding_thread = True # Must have some module named ``_thread`` that implements its API # in order to initially import ``threading``. - sys_modules['_thread'] = sys_modules['_dummy_thread'] + sys_modules["_thread"] = sys_modules["_dummy_thread"] - if 'threading' in sys_modules: + if "threading" in sys_modules: # If ``threading`` is already imported, might as well prevent # trying to import it more than needed by saving it if it is # already imported before deleting it. - held_threading = sys_modules['threading'] + held_threading = sys_modules["threading"] holding_threading = True - del sys_modules['threading'] + del sys_modules["threading"] - if '_threading_local' in sys_modules: + if "_threading_local" in sys_modules: # If ``_threading_local`` is already imported, might as well prevent # trying to import it more than needed by saving it if it is # already imported before deleting it. - held__threading_local = sys_modules['_threading_local'] + held__threading_local = sys_modules["_threading_local"] holding__threading_local = True - del sys_modules['_threading_local'] + del sys_modules["_threading_local"] import threading + # Need a copy of the code kept somewhere... - sys_modules['_dummy_threading'] = sys_modules['threading'] - del sys_modules['threading'] - sys_modules['_dummy__threading_local'] = sys_modules['_threading_local'] - del sys_modules['_threading_local'] + sys_modules["_dummy_threading"] = sys_modules["threading"] + del sys_modules["threading"] + # _threading_local may not be imported if _thread._local is available + if "_threading_local" in sys_modules: + sys_modules["_dummy__threading_local"] = sys_modules["_threading_local"] + del sys_modules["_threading_local"] from _dummy_threading import * from _dummy_threading import __all__ @@ -55,23 +59,23 @@ # Put back ``threading`` if we overwrote earlier if holding_threading: - sys_modules['threading'] = held_threading + sys_modules["threading"] = held_threading del held_threading del holding_threading # Put back ``_threading_local`` if we overwrote earlier if holding__threading_local: - sys_modules['_threading_local'] = held__threading_local + sys_modules["_threading_local"] = held__threading_local del held__threading_local del holding__threading_local # Put back ``thread`` if we overwrote, else del the entry we made if holding_thread: - sys_modules['_thread'] = held_thread + sys_modules["_thread"] = held_thread del held_thread else: - del sys_modules['_thread'] + del sys_modules["_thread"] del holding_thread del _dummy_thread diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index 09b91147801..4031bfaeb64 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -2,6 +2,7 @@ Various tests for synchronization primitives. """ +import gc import sys import time from _thread import start_new_thread, TIMEOUT_MAX @@ -13,54 +14,79 @@ from test.support import threading_helper -def _wait(): - # A crude wait/yield function not relying on synchronization primitives. - time.sleep(0.01) +requires_fork = unittest.skipUnless(support.has_fork_support, + "platform doesn't support fork " + "(no _at_fork_reinit method)") + + +def wait_threads_blocked(nthread): + # Arbitrary sleep to wait until N threads are blocked, + # like waiting for a lock. + time.sleep(0.010 * nthread) + class Bunch(object): """ A bunch of threads. """ - def __init__(self, f, n, wait_before_exit=False): + def __init__(self, func, nthread, wait_before_exit=False): """ - Construct a bunch of `n` threads running the same function `f`. + Construct a bunch of `nthread` threads running the same function `func`. If `wait_before_exit` is True, the threads won't terminate until do_finish() is called. """ - self.f = f - self.n = n + self.func = func + self.nthread = nthread self.started = [] self.finished = [] + self.exceptions = [] self._can_exit = not wait_before_exit - self.wait_thread = threading_helper.wait_threads_exit() - self.wait_thread.__enter__() + self._wait_thread = None - def task(): - tid = threading.get_ident() - self.started.append(tid) - try: - f() - finally: - self.finished.append(tid) - while not self._can_exit: - _wait() + def task(self): + tid = threading.get_ident() + self.started.append(tid) + try: + self.func() + except BaseException as exc: + self.exceptions.append(exc) + finally: + self.finished.append(tid) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self._can_exit: + break + + def __enter__(self): + self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT) + self._wait_thread.__enter__() try: - for i in range(n): - start_new_thread(task, ()) + for _ in range(self.nthread): + start_new_thread(self.task, ()) except: self._can_exit = True raise - def wait_for_started(self): - while len(self.started) < self.n: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.started) >= self.nthread: + break + + return self + + def __exit__(self, exc_type, exc_value, traceback): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.finished) >= self.nthread: + break + + # Wait until threads completely exit according to _thread._count() + self._wait_thread.__exit__(None, None, None) - def wait_for_finished(self): - while len(self.finished) < self.n: - _wait() - # Wait for threads exit - self.wait_thread.__exit__(None, None, None) + # Break reference cycle + exceptions = self.exceptions + self.exceptions = None + if exceptions: + raise ExceptionGroup(f"{self.func} threads raised exceptions", + exceptions) def do_finish(self): self._can_exit = True @@ -88,6 +114,12 @@ class BaseLockTests(BaseTestCase): Tests for both recursive and non-recursive locks. """ + def wait_phase(self, phase, expected): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(phase) >= expected: + break + self.assertEqual(len(phase), expected) + def test_constructor(self): lock = self.locktype() del lock @@ -125,44 +157,60 @@ def test_try_acquire_contended(self): result = [] def f(): result.append(lock.acquire(False)) - Bunch(f, 1).wait_for_finished() + with Bunch(f, 1): + pass self.assertFalse(result[0]) lock.release() - @unittest.skip("TODO: RUSTPYTHON, sometimes hangs") + @unittest.skip('TODO: RUSTPYTHON; sometimes hangs') def test_acquire_contended(self): lock = self.locktype() lock.acquire() - N = 5 def f(): lock.acquire() lock.release() - b = Bunch(f, N) - b.wait_for_started() - _wait() - self.assertEqual(len(b.finished), 0) - lock.release() - b.wait_for_finished() - self.assertEqual(len(b.finished), N) + N = 5 + with Bunch(f, N) as bunch: + # Threads block on lock.acquire() + wait_threads_blocked(N) + self.assertEqual(len(bunch.finished), 0) + + # Threads unblocked + lock.release() + + self.assertEqual(len(bunch.finished), N) def test_with(self): lock = self.locktype() def f(): lock.acquire() lock.release() - def _with(err=None): + + def with_lock(err=None): with lock: if err is not None: raise err - _with() - # Check the lock is unacquired - Bunch(f, 1).wait_for_finished() - self.assertRaises(TypeError, _with, TypeError) - # Check the lock is unacquired - Bunch(f, 1).wait_for_finished() - @unittest.skip("TODO: RUSTPYTHON, sometimes hangs") + # Acquire the lock, do nothing, with releases the lock + with lock: + pass + + # Check that the lock is unacquired + with Bunch(f, 1): + pass + + # Acquire the lock, raise an exception, with releases the lock + with self.assertRaises(TypeError): + with lock: + raise TypeError + + # Check that the lock is unacquired even if after an exception + # was raised in the previous "with lock:" block + with Bunch(f, 1): + pass + + @unittest.skip('TODO: RUSTPYTHON; sometimes hangs') def test_thread_leak(self): # The lock shouldn't leak a Thread instance when used from a foreign # (non-threading) thread. @@ -170,22 +218,16 @@ def test_thread_leak(self): def f(): lock.acquire() lock.release() - n = len(threading.enumerate()) + # We run many threads in the hope that existing threads ids won't # be recycled. - Bunch(f, 15).wait_for_finished() - if len(threading.enumerate()) != n: - # There is a small window during which a Thread instance's - # target function has finished running, but the Thread is still - # alive and registered. Avoid spurious failures by waiting a - # bit more (seen on a buildbot). - time.sleep(0.4) - self.assertEqual(n, len(threading.enumerate())) + with Bunch(f, 15): + pass def test_timeout(self): lock = self.locktype() # Can't set timeout if not blocking - self.assertRaises(ValueError, lock.acquire, 0, 1) + self.assertRaises(ValueError, lock.acquire, False, 1) # Invalid timeout values self.assertRaises(ValueError, lock.acquire, timeout=-100) self.assertRaises(OverflowError, lock.acquire, timeout=1e100) @@ -204,7 +246,8 @@ def f(): results.append(lock.acquire(timeout=0.5)) t2 = time.monotonic() results.append(t2 - t1) - Bunch(f, 1).wait_for_finished() + with Bunch(f, 1): + pass self.assertFalse(results[0]) self.assertTimeout(results[1], 0.5) @@ -217,6 +260,7 @@ def test_weakref_deleted(self): lock = self.locktype() ref = weakref.ref(lock) del lock + gc.collect() # For PyPy or other GCs. self.assertIsNone(ref()) @@ -237,15 +281,13 @@ def f(): phase.append(None) with threading_helper.wait_threads_exit(): + # Thread blocked on lock.acquire() start_new_thread(f, ()) - while len(phase) == 0: - _wait() - _wait() - self.assertEqual(len(phase), 1) + self.wait_phase(phase, 1) + + # Thread unblocked lock.release() - while len(phase) == 1: - _wait() - self.assertEqual(len(phase), 2) + self.wait_phase(phase, 2) def test_different_thread(self): # Lock can be released from a different thread. @@ -253,8 +295,8 @@ def test_different_thread(self): lock.acquire() def f(): lock.release() - b = Bunch(f, 1) - b.wait_for_finished() + with Bunch(f, 1): + pass lock.acquire() lock.release() @@ -268,6 +310,25 @@ def test_state_after_timeout(self): self.assertFalse(lock.locked()) self.assertTrue(lock.acquire(blocking=False)) + @requires_fork + def test_at_fork_reinit(self): + def use_lock(lock): + # make sure that the lock still works normally + # after _at_fork_reinit() + lock.acquire() + lock.release() + + # unlocked + lock = self.locktype() + lock._at_fork_reinit() + use_lock(lock) + + # locked: _at_fork_reinit() resets the lock to the unlocked state + lock2 = self.locktype() + lock2.acquire() + lock2._at_fork_reinit() + use_lock(lock2) + class RLockTests(BaseLockTests): """ @@ -306,17 +367,52 @@ def test_release_save_unacquired(self): lock.release() self.assertRaises(RuntimeError, lock._release_save) + def test_recursion_count(self): + lock = self.locktype() + self.assertEqual(0, lock._recursion_count()) + lock.acquire() + self.assertEqual(1, lock._recursion_count()) + lock.acquire() + lock.acquire() + self.assertEqual(3, lock._recursion_count()) + lock.release() + self.assertEqual(2, lock._recursion_count()) + lock.release() + lock.release() + self.assertEqual(0, lock._recursion_count()) + + phase = [] + + def f(): + lock.acquire() + phase.append(None) + + self.wait_phase(phase, 2) + lock.release() + phase.append(None) + + with threading_helper.wait_threads_exit(): + # Thread blocked on lock.acquire() + start_new_thread(f, ()) + self.wait_phase(phase, 1) + self.assertEqual(0, lock._recursion_count()) + + # Thread unblocked + phase.append(None) + self.wait_phase(phase, 3) + self.assertEqual(0, lock._recursion_count()) + def test_different_thread(self): # Cannot release from a different thread lock = self.locktype() def f(): lock.acquire() - b = Bunch(f, 1, True) - try: - self.assertRaises(RuntimeError, lock.release) - finally: - b.do_finish() - b.wait_for_finished() + + with Bunch(f, 1, True) as bunch: + try: + self.assertRaises(RuntimeError, lock.release) + finally: + bunch.do_finish() def test__is_owned(self): lock = self.locktype() @@ -328,7 +424,8 @@ def test__is_owned(self): result = [] def f(): result.append(lock._is_owned()) - Bunch(f, 1).wait_for_finished() + with Bunch(f, 1): + pass self.assertFalse(result[0]) lock.release() self.assertTrue(lock._is_owned()) @@ -361,12 +458,15 @@ def _check_notify(self, evt): def f(): results1.append(evt.wait()) results2.append(evt.wait()) - b = Bunch(f, N) - b.wait_for_started() - _wait() - self.assertEqual(len(results1), 0) - evt.set() - b.wait_for_finished() + + with Bunch(f, N): + # Threads blocked on first evt.wait() + wait_threads_blocked(N) + self.assertEqual(len(results1), 0) + + # Threads unblocked + evt.set() + self.assertEqual(results1, [True] * N) self.assertEqual(results2, [True] * N) @@ -389,46 +489,61 @@ def f(): r = evt.wait(0.5) t2 = time.monotonic() results2.append((r, t2 - t1)) - Bunch(f, N).wait_for_finished() + + with Bunch(f, N): + pass + self.assertEqual(results1, [False] * N) for r, dt in results2: self.assertFalse(r) self.assertTimeout(dt, 0.5) + # The event is set results1 = [] results2 = [] evt.set() - Bunch(f, N).wait_for_finished() + with Bunch(f, N): + pass + self.assertEqual(results1, [True] * N) for r, dt in results2: self.assertTrue(r) def test_set_and_clear(self): - # Issue #13502: check that wait() returns true even when the event is + # gh-57711: check that wait() returns true even when the event is # cleared before the waiting thread is woken up. - evt = self.eventtype() + event = self.eventtype() results = [] - timeout = 0.250 - N = 5 def f(): - results.append(evt.wait(timeout * 4)) - b = Bunch(f, N) - b.wait_for_started() - time.sleep(timeout) - evt.set() - evt.clear() - b.wait_for_finished() + results.append(event.wait(support.LONG_TIMEOUT)) + + N = 5 + with Bunch(f, N): + # Threads blocked on event.wait() + wait_threads_blocked(N) + + # Threads unblocked + event.set() + event.clear() + self.assertEqual(results, [True] * N) - def test_reset_internal_locks(self): + @requires_fork + def test_at_fork_reinit(self): # ensure that condition is still using a Lock after reset evt = self.eventtype() with evt._cond: self.assertFalse(evt._cond.acquire(False)) - evt._reset_internal_locks() + evt._at_fork_reinit() with evt._cond: self.assertFalse(evt._cond.acquire(False)) + def test_repr(self): + evt = self.eventtype() + self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>") + evt.set() + self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>") + class ConditionTests(BaseTestCase): """ @@ -466,15 +581,14 @@ def _check_notify(self, cond): # Note that this test is sensitive to timing. If the worker threads # don't execute in a timely fashion, the main thread may think they # are further along then they are. The main thread therefore issues - # _wait() statements to try to make sure that it doesn't race ahead - # of the workers. + # wait_threads_blocked() statements to try to make sure that it doesn't + # race ahead of the workers. # Secondly, this test assumes that condition variables are not subject # to spurious wakeups. The absence of spurious wakeups is an implementation # detail of Condition Variables in current CPython, but in general, not # a guaranteed property of condition variables as a programming # construct. In particular, it is possible that this can no longer # be conveniently guaranteed should their implementation ever change. - N = 5 ready = [] results1 = [] results2 = [] @@ -483,58 +597,83 @@ def f(): cond.acquire() ready.append(phase_num) result = cond.wait() + cond.release() results1.append((result, phase_num)) + cond.acquire() ready.append(phase_num) + result = cond.wait() cond.release() results2.append((result, phase_num)) - b = Bunch(f, N) - b.wait_for_started() - # first wait, to ensure all workers settle into cond.wait() before - # we continue. See issues #8799 and #30727. - while len(ready) < 5: - _wait() - ready.clear() - self.assertEqual(results1, []) - # Notify 3 threads at first - cond.acquire() - cond.notify(3) - _wait() - phase_num = 1 - cond.release() - while len(results1) < 3: - _wait() - self.assertEqual(results1, [(True, 1)] * 3) - self.assertEqual(results2, []) - # make sure all awaken workers settle into cond.wait() - while len(ready) < 3: - _wait() - # Notify 5 threads: they might be in their first or second wait - cond.acquire() - cond.notify(5) - _wait() - phase_num = 2 - cond.release() - while len(results1) + len(results2) < 8: - _wait() - self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) - self.assertEqual(results2, [(True, 2)] * 3) - # make sure all workers settle into cond.wait() - while len(ready) < 5: - _wait() - # Notify all threads: they are all in their second wait - cond.acquire() - cond.notify_all() - _wait() - phase_num = 3 - cond.release() - while len(results2) < 5: - _wait() - self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) - self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) - b.wait_for_finished() + + N = 5 + with Bunch(f, N): + # first wait, to ensure all workers settle into cond.wait() before + # we continue. See issues #8799 and #30727. + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + + ready.clear() + self.assertEqual(results1, []) + + # Notify 3 threads at first + count1 = 3 + cond.acquire() + cond.notify(count1) + wait_threads_blocked(count1) + + # Phase 1 + phase_num = 1 + cond.release() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) >= count1: + break + + self.assertEqual(results1, [(True, 1)] * count1) + self.assertEqual(results2, []) + + # Wait until awaken workers are blocked on cond.wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= count1 : + break + + # Notify 5 threads: they might be in their first or second wait + cond.acquire() + cond.notify(5) + wait_threads_blocked(N) + + # Phase 2 + phase_num = 2 + cond.release() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (N + count1): + break + + count2 = N - count1 + self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2) + self.assertEqual(results2, [(True, 2)] * count1) + + # Make sure all workers settle into cond.wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + + # Notify all threads: they are all in their second wait + cond.acquire() + cond.notify_all() + wait_threads_blocked(N) + + # Phase 3 + phase_num = 3 + cond.release() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results2) >= N: + break + self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2) + self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2) def test_notify(self): cond = self.condtype() @@ -544,19 +683,23 @@ def test_notify(self): def test_timeout(self): cond = self.condtype() + timeout = 0.5 results = [] - N = 5 def f(): cond.acquire() t1 = time.monotonic() - result = cond.wait(0.5) + result = cond.wait(timeout) t2 = time.monotonic() cond.release() results.append((t2 - t1, result)) - Bunch(f, N).wait_for_finished() + + N = 5 + with Bunch(f, N): + pass self.assertEqual(len(results), N) + for dt, result in results: - self.assertTimeout(dt, 0.5) + self.assertTimeout(dt, timeout) # Note that conceptually (that"s the condition variable protocol) # a wait() may succeed even if no one notifies us and before any # timeout occurs. Spurious wakeups can occur. @@ -569,17 +712,16 @@ def test_waitfor(self): state = 0 def f(): with cond: - result = cond.wait_for(lambda : state==4) + result = cond.wait_for(lambda: state == 4) self.assertTrue(result) self.assertEqual(state, 4) - b = Bunch(f, 1) - b.wait_for_started() - for i in range(4): - time.sleep(0.01) - with cond: - state += 1 - cond.notify() - b.wait_for_finished() + + with Bunch(f, 1): + for i in range(4): + time.sleep(0.010) + with cond: + state += 1 + cond.notify() def test_waitfor_timeout(self): cond = self.condtype() @@ -593,15 +735,15 @@ def f(): self.assertFalse(result) self.assertTimeout(dt, 0.1) success.append(None) - b = Bunch(f, 1) - b.wait_for_started() - # Only increment 3 times, so state == 4 is never reached. - for i in range(3): - time.sleep(0.01) - with cond: - state += 1 - cond.notify() - b.wait_for_finished() + + with Bunch(f, 1): + # Only increment 3 times, so state == 4 is never reached. + for i in range(3): + time.sleep(0.010) + with cond: + state += 1 + cond.notify() + self.assertEqual(len(success), 1) @@ -630,41 +772,107 @@ def test_acquire_destroy(self): del sem def test_acquire_contended(self): - sem = self.semtype(7) + sem_value = 7 + sem = self.semtype(sem_value) sem.acquire() - N = 10 + sem_results = [] results1 = [] results2 = [] phase_num = 0 - def f(): + + def func(): sem_results.append(sem.acquire()) results1.append(phase_num) + sem_results.append(sem.acquire()) results2.append(phase_num) - b = Bunch(f, 10) - b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() - self.assertEqual(results1 + results2, [0] * 6) - phase_num = 1 - for i in range(7): + + def wait_count(count): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= count: + break + + N = 10 + with Bunch(func, N): + # Phase 0 + count1 = sem_value - 1 + wait_count(count1) + self.assertEqual(results1 + results2, [0] * count1) + + # Phase 1 + phase_num = 1 + for i in range(sem_value): + sem.release() + count2 = sem_value + wait_count(count1 + count2) + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2) + + # Phase 2 + phase_num = 2 + count3 = (sem_value - 1) + for i in range(count3): + sem.release() + wait_count(count1 + count2 + count3) + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2 + [2] * count3) + # The semaphore is still locked + self.assertFalse(sem.acquire(False)) + + # Final release, to let the last thread finish + count4 = 1 sem.release() - while len(results1) + len(results2) < 13: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) - phase_num = 2 - for i in range(6): + + self.assertEqual(sem_results, + [True] * (count1 + count2 + count3 + count4)) + + def test_multirelease(self): + sem_value = 7 + sem = self.semtype(sem_value) + sem.acquire() + + results1 = [] + results2 = [] + phase_num = 0 + def func(): + sem.acquire() + results1.append(phase_num) + + sem.acquire() + results2.append(phase_num) + + def wait_count(count): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= count: + break + + with Bunch(func, 10): + # Phase 0 + count1 = sem_value - 1 + wait_count(count1) + self.assertEqual(results1 + results2, [0] * count1) + + # Phase 1 + phase_num = 1 + count2 = sem_value + sem.release(count2) + wait_count(count1 + count2) + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2) + + # Phase 2 + phase_num = 2 + count3 = sem_value - 1 + sem.release(count3) + wait_count(count1 + count2 + count3) + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2 + [2] * count3) + # The semaphore is still locked + self.assertFalse(sem.acquire(False)) + + # Final release, to let the last thread finish sem.release() - while len(results1) + len(results2) < 19: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) - # The semaphore is still locked - self.assertFalse(sem.acquire(False)) - # Final release, to let the last thread finish - sem.release() - b.wait_for_finished() - self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) def test_try_acquire(self): sem = self.semtype(2) @@ -681,7 +889,8 @@ def test_try_acquire_contended(self): def f(): results.append(sem.acquire(False)) results.append(sem.acquire(False)) - Bunch(f, 5).wait_for_finished() + with Bunch(f, 5): + pass # There can be a thread switch between acquiring the semaphore and # appending the result, therefore results will not necessarily be # ordered. @@ -707,12 +916,14 @@ def test_default_value(self): def f(): sem.acquire() sem.release() - b = Bunch(f, 1) - b.wait_for_started() - _wait() - self.assertFalse(b.finished) - sem.release() - b.wait_for_finished() + + with Bunch(f, 1) as bunch: + # Thread blocked on sem.acquire() + wait_threads_blocked(1) + self.assertFalse(bunch.finished) + + # Thread unblocked + sem.release() def test_with(self): sem = self.semtype(2) @@ -744,6 +955,15 @@ def test_release_unacquired(self): sem.acquire() sem.release() + def test_repr(self): + sem = self.semtype(3) + self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>") + sem.acquire() + self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>") + sem.release() + sem.release() + self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>") + class BoundedSemaphoreTests(BaseSemaphoreTests): """ @@ -758,6 +978,12 @@ def test_release_unacquired(self): sem.release() self.assertRaises(ValueError, sem.release) + def test_repr(self): + sem = self.semtype(3) + self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>") + sem.acquire() + self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>") + class BarrierTests(BaseTestCase): """ @@ -768,13 +994,13 @@ class BarrierTests(BaseTestCase): def setUp(self): self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) + def tearDown(self): self.barrier.abort() def run_threads(self, f): - b = Bunch(f, self.N-1) - f() - b.wait_for_finished() + with Bunch(f, self.N): + pass def multipass(self, results, n): m = self.barrier.parties @@ -789,6 +1015,10 @@ def multipass(self, results, n): self.assertEqual(self.barrier.n_waiting, 0) self.assertFalse(self.barrier.broken) + def test_constructor(self): + self.assertRaises(ValueError, self.barriertype, parties=0) + self.assertRaises(ValueError, self.barriertype, parties=-1) + def test_barrier(self, passes=1): """ Test that a barrier is passed in lockstep @@ -865,8 +1095,9 @@ def f(): i = self.barrier.wait() if i == self.N//2: # Wait until the other threads are all in the barrier. - while self.barrier.n_waiting < self.N-1: - time.sleep(0.001) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self.barrier.n_waiting >= (self.N - 1): + break self.barrier.reset() else: try: @@ -926,27 +1157,56 @@ def f(): i = self.barrier.wait() if i == self.N // 2: # One thread is late! - time.sleep(1.0) + time.sleep(self.defaultTimeout / 2) # Default timeout is 2.0, so this is shorter. self.assertRaises(threading.BrokenBarrierError, - self.barrier.wait, 0.5) + self.barrier.wait, self.defaultTimeout / 4) self.run_threads(f) def test_default_timeout(self): """ Test the barrier's default timeout """ - # create a barrier with a low default timeout - barrier = self.barriertype(self.N, timeout=0.3) + timeout = 0.100 + barrier = self.barriertype(2, timeout=timeout) def f(): - i = barrier.wait() - if i == self.N // 2: - # One thread is later than the default timeout of 0.3s. - time.sleep(1.0) - self.assertRaises(threading.BrokenBarrierError, barrier.wait) - self.run_threads(f) + self.assertRaises(threading.BrokenBarrierError, + barrier.wait) + + start_time = time.monotonic() + with Bunch(f, 1): + pass + dt = time.monotonic() - start_time + self.assertGreaterEqual(dt, timeout) def test_single_thread(self): b = self.barriertype(1) b.wait() b.wait() + + def test_repr(self): + barrier = self.barriertype(3) + timeout = support.LONG_TIMEOUT + self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>") + def f(): + barrier.wait(timeout) + + N = 2 + with Bunch(f, N): + # Threads blocked on barrier.wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if barrier.n_waiting >= N: + break + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=2/3>") + + # Threads unblocked + barrier.wait(timeout) + + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=0/3>") + + # Abort the barrier + barrier.abort() + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: broken>") diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 99b71676ae7..9d3c771eaeb 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -190,7 +190,6 @@ def test_max_tasks_early_shutdown(self): for i, future in enumerate(futures): self.assertEqual(future.result(), mul(i, i)) - @unittest.expectedFailure # TODO: RUSTPYTHON AttributeError: module 'threading' has no attribute '_start_joinable_thread'. Did you mean: '_start_new_thread'? def test_python_finalization_error(self): # gh-109047: Catch RuntimeError on thread creation # during Python finalization. diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index c879b6539b1..820ea6cf253 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -108,7 +108,6 @@ def test_cancel_futures(self): # one finished. self.assertGreater(len(others), 0) - @unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: b'' != b'apple' def test_hang_gh83386(self): """shutdown(wait=False) doesn't hang at exit with running futures. @@ -256,6 +255,11 @@ def test_cancel_futures_wait_false(self): class ProcessPoolShutdownTest(ExecutorShutdownTest): + # TODO: RUSTPYTHON - flaky, dict changed size during iteration race condition + @unittest.skip("TODO: RUSTPYTHON - flaky race condition on macOS") + def test_cancel_futures(self): + return super().test_cancel_futures() + def test_processes_terminate(self): def acquire_lock(lock): lock.acquire() diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index 52df01b376c..4324241b374 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -66,14 +66,8 @@ def submit(pool): with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: workers.submit(tuple) - import sys # TODO: RUSTPYTHON - @unittest.skipIf( - 'RUSTPYTHON_SKIP_ENV_POLLUTERS' in os.environ, - 'TODO: RUSTPYTHON environment pollution when running rustpython -m test --fail-env-changed due to unknown reason' - ) @support.requires_fork() @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') - @unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: DeprecationWarning not triggered def test_process_fork_from_a_threadpool(self): # bpo-43944: clear concurrent.futures.thread._threads_queues after fork, # otherwise child process will try to join parent thread diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index 17cce741cce..edf0329c753 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -29,6 +29,8 @@ class ModuleLockAsRLockTests: test_timeout = None # _release_save() unsupported test_release_save_unacquired = None + # _recursion_count() unsupported + test_recursion_count = None # lock status in repr unsupported test_repr = None test_locked_repr = None @@ -92,7 +94,8 @@ def f(): b.release() if ra: a.release() - lock_tests.Bunch(f, NTHREADS).wait_for_finished() + with lock_tests.Bunch(f, NTHREADS): + pass self.assertEqual(len(results), NTHREADS) return results diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 969c2866aac..e5f88b325e0 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -920,7 +920,7 @@ def test_sys_getwindowsversion_no_instantiation(self): def test_clear_type_cache(self): sys._clear_type_cache() - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.skip("TODO: RUSTPYTHON; cp424 encoding not supported, causes panic") @force_not_colorized @support.requires_subprocess() def test_ioencoding(self): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index eabc872583f..910797afa8f 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -3,10 +3,11 @@ """ import test.support -from test.support import threading_helper, requires_subprocess +from test.support import threading_helper, requires_subprocess, requires_gil_enabled from test.support import verbose, cpython_only, os_helper from test.support.import_helper import import_module from test.support.script_helper import assert_python_ok, assert_python_failure +from test.support import force_not_colorized import random import sys @@ -20,11 +21,17 @@ import signal import textwrap import traceback +import warnings from unittest import mock from test import lock_tests from test import support +try: + from test.support import interpreters +except ImportError: + interpreters = None + threading_helper.requires_working_threading(module=True) # Between fork() and exec(), only async-safe functions are allowed (issues @@ -34,6 +41,24 @@ platforms_to_skip = ('netbsd5', 'hp-ux11') +def skip_unless_reliable_fork(test): + if not support.has_fork_support: + return unittest.skip("requires working os.fork()")(test) + if sys.platform in platforms_to_skip: + return unittest.skip("due to known OS bug related to thread+fork")(test) + if support.HAVE_ASAN_FORK_BUG: + return unittest.skip("libasan has a pthread_create() dead lock related to thread+fork")(test) + if support.check_sanitizer(thread=True): + return unittest.skip("TSAN doesn't support threads after fork")(test) + return test + + +def requires_subinterpreters(meth): + """Decorator to skip a test if subinterpreters are not supported.""" + return unittest.skipIf(interpreters is None, + 'subinterpreters required')(meth) + + def restore_default_excepthook(testcase): testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook) threading.excepthook = threading.__excepthook__ @@ -93,6 +118,7 @@ def tearDown(self): class ThreadTests(BaseTestCase): + maxDiff = 9999 @cpython_only def test_name(self): @@ -148,11 +174,21 @@ def test_args_argument(self): t.start() t.join() - @cpython_only - def test_disallow_instantiation(self): - # Ensure that the type disallows instantiation (bpo-43916) - lock = threading.Lock() - test.support.check_disallow_instantiation(self, type(lock)) + def test_lock_no_args(self): + threading.Lock() # works + self.assertRaises(TypeError, threading.Lock, 1) + self.assertRaises(TypeError, threading.Lock, a=1) + self.assertRaises(TypeError, threading.Lock, 1, 2, a=1, b=2) + + def test_lock_no_subclass(self): + # Intentionally disallow subclasses of threading.Lock because they have + # never been allowed, so why start now just because the type is public? + with self.assertRaises(TypeError): + class MyLock(threading.Lock): pass + + def test_lock_or_none(self): + import types + self.assertIsInstance(threading.Lock | None, types.UnionType) # Create a bunch of threads, let each do some work, wait until all are # done. @@ -204,8 +240,6 @@ def f(): tid = _thread.start_new_thread(f, ()) done.wait() self.assertEqual(ident[0], tid) - # Kill the "immortal" _DummyThread - del threading._active[ident[0]] # run with a small(ish) thread stack size (256 KiB) def test_various_ops_small_stack(self): @@ -233,11 +267,29 @@ def test_various_ops_large_stack(self): def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. + dummy_thread = None + error = None def f(mutex): - # Calling current_thread() forces an entry for the foreign - # thread to get made in the threading._active map. - threading.current_thread() - mutex.release() + try: + nonlocal dummy_thread + nonlocal error + # Calling current_thread() forces an entry for the foreign + # thread to get made in the threading._active map. + dummy_thread = threading.current_thread() + tid = dummy_thread.ident + self.assertIn(tid, threading._active) + self.assertIsInstance(dummy_thread, threading._DummyThread) + self.assertIs(threading._active.get(tid), dummy_thread) + # gh-29376 + self.assertTrue( + dummy_thread.is_alive(), + 'Expected _DummyThread to be considered alive.' + ) + self.assertIn('_DummyThread', repr(dummy_thread)) + except BaseException as e: + error = e + finally: + mutex.release() mutex = threading.Lock() mutex.acquire() @@ -245,16 +297,29 @@ def f(mutex): tid = _thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() - self.assertIn(tid, threading._active) - self.assertIsInstance(threading._active[tid], threading._DummyThread) - #Issue 29376 - self.assertTrue(threading._active[tid].is_alive()) - self.assertRegex(repr(threading._active[tid]), '_DummyThread') - del threading._active[tid] + if error is not None: + raise error + self.assertEqual(tid, dummy_thread.ident) + # Issue gh-106236: + with self.assertRaises(RuntimeError): + dummy_thread.join() + dummy_thread._started.clear() + with self.assertRaises(RuntimeError): + dummy_thread.is_alive() + # Busy wait for the following condition: after the thread dies, the + # related dummy thread must be removed from threading._active. + timeout = 5 + timeout_at = time.monotonic() + timeout + while time.monotonic() < timeout_at: + if threading._active.get(dummy_thread.ident) is not dummy_thread: + break + time.sleep(.1) + else: + self.fail('It was expected that the created threading._DummyThread was removed from threading._active.') # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. - @unittest.skip("TODO: RUSTPYTHON; expects @cpython_only") + @unittest.skip('TODO: RUSTPYTHON; expects @cpython_only') def test_PyThreadState_SetAsyncExc(self): ctypes = import_module("ctypes") @@ -343,12 +408,13 @@ def run(self): t.join() # else the thread is still running, and we have no way to kill it + @unittest.skip('TODO: RUSTPYTHON; threading._start_new_thread not exposed') def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. - def fail_new_thread(*args): + def fail_new_thread(*args, **kwargs): raise threading.ThreadError() - _start_new_thread = threading._start_new_thread - threading._start_new_thread = fail_new_thread + _start_joinable_thread = threading._start_joinable_thread + threading._start_joinable_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(threading.ThreadError, t.start) @@ -356,13 +422,17 @@ def fail_new_thread(*args): t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: - threading._start_new_thread = _start_new_thread + threading._start_joinable_thread = _start_joinable_thread - @unittest.skip("TODO: RUSTPYTHON; ctypes.pythonapi is not supported") + @unittest.skip('TODO: RUSTPYTHON; ctypes.pythonapi is not supported') def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called # very late on python exit: on deallocation of a running thread for # example. + if support.check_sanitizer(thread=True): + # the thread running `time.sleep(100)` below will still be alive + # at process exit + self.skipTest("TSAN would report thread leak") import_module("ctypes") rc, out, err = assert_python_failure("-c", """if 1: @@ -395,6 +465,11 @@ def waitingThread(): def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown + if support.check_sanitizer(thread=True): + # the thread running `time.sleep(2)` below will still be alive + # at process exit + self.skipTest("TSAN would report thread leak") + assert_python_ok("-c", """if 1: import sys, threading @@ -417,8 +492,7 @@ def func(frame, event, arg): sys.settrace(func) """) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown @@ -446,7 +520,7 @@ def test_enumerate_after_join(self): old_interval = sys.getswitchinterval() try: for i in range(1, 100): - sys.setswitchinterval(i * 0.0002) + support.setswitchinterval(i * 0.0002) t = threading.Thread(target=lambda: None) t.start() t.join() @@ -456,6 +530,48 @@ def test_enumerate_after_join(self): finally: sys.setswitchinterval(old_interval) + @support.bigmemtest(size=20, memuse=72*2**20, dry_run=False) + def test_join_from_multiple_threads(self, size): + # Thread.join() should be thread-safe + errors = [] + + def worker(): + time.sleep(0.005) + + def joiner(thread): + try: + thread.join() + except Exception as e: + errors.append(e) + + for N in range(2, 20): + threads = [threading.Thread(target=worker)] + for i in range(N): + threads.append(threading.Thread(target=joiner, + args=(threads[0],))) + for t in threads: + t.start() + time.sleep(0.01) + for t in threads: + t.join() + if errors: + raise errors[0] + + def test_join_with_timeout(self): + lock = _thread.allocate_lock() + lock.acquire() + + def worker(): + lock.acquire() + + thread = threading.Thread(target=worker) + thread.start() + thread.join(timeout=0.01) + assert thread.is_alive() + lock.release() + thread.join() + assert not thread.is_alive() + def test_no_refcycle_through_target(self): class RunSelfFunction(object): def __init__(self, should_raise): @@ -534,40 +650,12 @@ def test_daemon_param(self): t = threading.Thread(daemon=True) self.assertTrue(t.daemon) - @support.requires_fork() - def test_fork_at_exit(self): - # bpo-42350: Calling os.fork() after threading._shutdown() must - # not log an error. - code = textwrap.dedent(""" - import atexit - import os - import sys - from test.support import wait_process - - # Import the threading module to register its "at fork" callback - import threading - - def exit_handler(): - pid = os.fork() - if not pid: - print("child process ok", file=sys.stderr, flush=True) - # child process - else: - wait_process(pid, exitcode=0) - - # exit_handler() will be called after threading._shutdown() - atexit.register(exit_handler) - """) - _, out, err = assert_python_ok("-c", code) - self.assertEqual(out, b'') - self.assertEqual(err.rstrip(), b'child process ok') - - @support.requires_fork() + @skip_unless_reliable_fork def test_dummy_thread_after_fork(self): # Issue #14308: a dummy thread in the active list doesn't mess up # the after-fork mechanism. code = """if 1: - import _thread, threading, os, time + import _thread, threading, os, time, warnings def background_thread(evt): # Creates and registers the _DummyThread instance @@ -579,18 +667,23 @@ def background_thread(evt): _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() - if os.fork() == 0: - assert threading.active_count() == 1, threading.active_count() - os._exit(0) - else: - os.wait() + with warnings.catch_warnings(record=True) as ws: + warnings.filterwarnings( + "always", category=DeprecationWarning) + if os.fork() == 0: + assert threading.active_count() == 1, threading.active_count() + os._exit(0) + else: + assert ws[0].category == DeprecationWarning, ws[0] + assert 'fork' in str(ws[0].message), ws[0] + os.wait() """ _, out, err = assert_python_ok("-c", code) self.assertEqual(out, b'') self.assertEqual(err, b'') - @unittest.skip("TODO: RUSTPYTHON; flaky") - @support.requires_fork() + @unittest.skip('TODO: RUSTPYTHON; flaky') + @skip_unless_reliable_fork def test_is_alive_after_fork(self): # Try hard to trigger #18418: is_alive() could sometimes be True on # threads that vanished after a fork. @@ -603,13 +696,15 @@ def test_is_alive_after_fork(self): for i in range(20): t = threading.Thread(target=lambda: None) t.start() - pid = os.fork() - if pid == 0: - os._exit(11 if t.is_alive() else 10) - else: - t.join() + # Ignore the warning about fork with threads. + with warnings.catch_warnings(category=DeprecationWarning, + action="ignore"): + if (pid := os.fork()) == 0: + os._exit(11 if t.is_alive() else 10) + else: + t.join() - support.wait_process(pid, exitcode=10) + support.wait_process(pid, exitcode=10) def test_main_thread(self): main = threading.main_thread() @@ -624,48 +719,59 @@ def f(): th.start() th.join() - @support.requires_fork() + @skip_unless_reliable_fork @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") def test_main_thread_after_fork(self): code = """if 1: import os, threading from test import support + ident = threading.get_ident() pid = os.fork() if pid == 0: + print("current ident", threading.get_ident() == ident) main = threading.main_thread() - print(main.name) - print(main.ident == threading.current_thread().ident) - print(main.ident == threading.get_ident()) + print("main", main.name) + print("main ident", main.ident == ident) + print("current is main", threading.current_thread() is main) else: support.wait_process(pid, exitcode=0) """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") - self.assertEqual(data, "MainThread\nTrue\nTrue\n") + self.assertEqual(data, + "current ident True\n" + "main MainThread\n" + "main ident True\n" + "current is main True\n") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - @support.requires_fork() + @skip_unless_reliable_fork @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") - @unittest.skipIf(os.name != 'posix', "test needs POSIX semantics") def test_main_thread_after_fork_from_nonmain_thread(self): code = """if 1: - import os, threading, sys + import os, threading, sys, warnings from test import support def func(): - pid = os.fork() - if pid == 0: - main = threading.main_thread() - print(main.name) - print(main.ident == threading.current_thread().ident) - print(main.ident == threading.get_ident()) - # stdout is fully buffered because not a tty, - # we have to flush before exit. - sys.stdout.flush() - else: - support.wait_process(pid, exitcode=0) + ident = threading.get_ident() + with warnings.catch_warnings(record=True) as ws: + warnings.filterwarnings( + "always", category=DeprecationWarning) + pid = os.fork() + if pid == 0: + print("current ident", threading.get_ident() == ident) + main = threading.main_thread() + print("main", main.name, type(main).__name__) + print("main ident", main.ident == ident) + print("current is main", threading.current_thread() is main) + # stdout is fully buffered because not a tty, + # we have to flush before exit. + sys.stdout.flush() + else: + assert ws[0].category == DeprecationWarning, ws[0] + assert 'fork' in str(ws[0].message), ws[0] + support.wait_process(pid, exitcode=0) th = threading.Thread(target=func) th.start() @@ -673,11 +779,82 @@ def func(): """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') - self.assertEqual(err, b"") - self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n") + self.assertEqual(err.decode('utf-8'), "") + self.assertEqual(data, + "current ident True\n" + "main Thread-1 (func) Thread\n" + "main ident True\n" + "current is main True\n" + ) + + @skip_unless_reliable_fork + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork_from_foreign_thread(self, create_dummy=False): + code = """if 1: + import os, threading, sys, traceback, _thread + from test import support - # TODO: RUSTPYTHON - @unittest.expectedFailure + def func(lock): + ident = threading.get_ident() + if %s: + # call current_thread() before fork to allocate DummyThread + current = threading.current_thread() + print("current", current.name, type(current).__name__) + print("ident in _active", ident in threading._active) + # flush before fork, so child won't flush it again + sys.stdout.flush() + pid = os.fork() + if pid == 0: + print("current ident", threading.get_ident() == ident) + main = threading.main_thread() + print("main", main.name, type(main).__name__) + print("main ident", main.ident == ident) + print("current is main", threading.current_thread() is main) + print("_dangling", [t.name for t in list(threading._dangling)]) + # stdout is fully buffered because not a tty, + # we have to flush before exit. + sys.stdout.flush() + try: + threading._shutdown() + os._exit(0) + except: + traceback.print_exc() + sys.stderr.flush() + os._exit(1) + else: + try: + support.wait_process(pid, exitcode=0) + except Exception: + # avoid 'could not acquire lock for + # <_io.BufferedWriter name=''> at interpreter shutdown,' + traceback.print_exc() + sys.stderr.flush() + finally: + lock.release() + + join_lock = _thread.allocate_lock() + join_lock.acquire() + th = _thread.start_new_thread(func, (join_lock,)) + join_lock.acquire() + """ % create_dummy + # "DeprecationWarning: This process is multi-threaded, use of fork() + # may lead to deadlocks in the child" + _, out, err = assert_python_ok("-W", "ignore::DeprecationWarning", "-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err.decode(), "") + self.assertEqual(data, + ("current Dummy-1 _DummyThread\n" if create_dummy else "") + + f"ident in _active {create_dummy!s}\n" + + "current ident True\n" + "main MainThread _MainThread\n" + "main ident True\n" + "current is main True\n" + "_dangling ['MainThread']\n") + + def test_main_thread_after_fork_from_dummy_thread(self, create_dummy=False): + self.test_main_thread_after_fork_from_foreign_thread(create_dummy=True) + + @unittest.expectedFailure # TODO: RUSTPYTHON def test_main_thread_during_shutdown(self): # bpo-31516: current_thread() should still point to the main thread # at shutdown @@ -742,41 +919,6 @@ def f(): rc, out, err = assert_python_ok("-c", code) self.assertEqual(err, b"") - def test_tstate_lock(self): - # Test an implementation detail of Thread objects. - started = _thread.allocate_lock() - finish = _thread.allocate_lock() - started.acquire() - finish.acquire() - def f(): - started.release() - finish.acquire() - time.sleep(0.01) - # The tstate lock is None until the thread is started - t = threading.Thread(target=f) - self.assertIs(t._tstate_lock, None) - t.start() - started.acquire() - self.assertTrue(t.is_alive()) - # The tstate lock can't be acquired when the thread is running - # (or suspended). - tstate_lock = t._tstate_lock - self.assertFalse(tstate_lock.acquire(timeout=0), False) - finish.release() - # When the thread ends, the state_lock can be successfully - # acquired. - self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) - # But is_alive() is still True: we hold _tstate_lock now, which - # prevents is_alive() from knowing the thread's end-of-life C code - # is done. - self.assertTrue(t.is_alive()) - # Let is_alive() find out the C code is done. - tstate_lock.release() - self.assertFalse(t.is_alive()) - # And verify the thread disposed of _tstate_lock. - self.assertIsNone(t._tstate_lock) - t.join() - def test_repr_stopped(self): # Verify that "stopped" shows up in repr(Thread) appropriately. started = _thread.allocate_lock() @@ -824,6 +966,7 @@ def test_BoundedSemaphore_limit(self): @cpython_only def test_frame_tstate_tracing(self): + _testcapi = import_module("_testcapi") # Issue #14432: Crash when a generator is created in a C thread that is # destroyed while the generator is still used. The issue was that a # generator contains a frame, and the frame kept a reference to the @@ -851,7 +994,6 @@ def callback(): threading.settrace(noop_trace) # Create a generator in a C thread which exits after the call - import _testcapi _testcapi.call_in_temporary_c_thread(callback) # Call the generator in a different Python thread, check that the @@ -875,12 +1017,7 @@ def noop_trace(frame, event, arg): finally: threading.settrace(old_trace) - @unittest.skipIf( - 'RUSTPYTHON_SKIP_ENV_POLLUTERS' in os.environ, - "TODO: RUSTPYTHON environment pollution when running rustpython -m test --fail-env-changed due to unknown reason" - ) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gettrace_all_threads(self): def fn(*args): pass old_trace = threading.gettrace() @@ -919,12 +1056,7 @@ def fn(*args): pass finally: threading.setprofile(old_profile) - @unittest.skipIf( - 'RUSTPYTHON_SKIP_ENV_POLLUTERS' in os.environ, - "TODO: RUSTPYTHON environment pollution when running rustpython -m test --fail-env-changed due to unknown reason" - ) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_getprofile_all_threads(self): def fn(*args): pass old_profile = threading.getprofile() @@ -954,32 +1086,7 @@ def checker(): self.assertEqual(threading.getprofile(), old_profile) self.assertEqual(sys.getprofile(), old_profile) - @cpython_only - def test_shutdown_locks(self): - for daemon in (False, True): - with self.subTest(daemon=daemon): - event = threading.Event() - thread = threading.Thread(target=event.wait, daemon=daemon) - - # Thread.start() must add lock to _shutdown_locks, - # but only for non-daemon thread - thread.start() - tstate_lock = thread._tstate_lock - if not daemon: - self.assertIn(tstate_lock, threading._shutdown_locks) - else: - self.assertNotIn(tstate_lock, threading._shutdown_locks) - - # unblock the thread and join it - event.set() - thread.join() - - # Thread._stop() must remove tstate_lock from _shutdown_locks. - # Daemon threads must never add it to _shutdown_locks. - self.assertNotIn(tstate_lock, threading._shutdown_locks) - - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_locals_at_exit(self): # bpo-19466: thread locals must not be deleted before destructors # are called @@ -1057,6 +1164,164 @@ def import_threading(): self.assertEqual(out, b'') self.assertEqual(err, b'') + # TODO: RUSTPYTHON - __del__ not called during interpreter finalization (no cyclic GC) + @unittest.expectedFailure + def test_start_new_thread_at_finalization(self): + code = """if 1: + import _thread + + def f(): + print("shouldn't be printed") + + class AtFinalization: + def __del__(self): + print("OK") + _thread.start_new_thread(f, ()) + at_finalization = AtFinalization() + """ + _, out, err = assert_python_ok("-c", code) + self.assertEqual(out.strip(), b"OK") + self.assertIn(b"can't create new thread at interpreter shutdown", err) + + def test_start_new_thread_failed(self): + # gh-109746: if Python fails to start newly created thread + # due to failure of underlying PyThread_start_new_thread() call, + # its state should be removed from interpreter' thread states list + # to avoid its double cleanup + try: + from resource import setrlimit, RLIMIT_NPROC + except ImportError as err: + self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD + code = """if 1: + import resource + import _thread + + def f(): + print("shouldn't be printed") + + limits = resource.getrlimit(resource.RLIMIT_NPROC) + [_, hard] = limits + resource.setrlimit(resource.RLIMIT_NPROC, (0, hard)) + + try: + handle = _thread.start_joinable_thread(f) + except RuntimeError: + print('ok') + else: + print('!skip!') + handle.join() + """ + _, out, err = assert_python_ok("-u", "-c", code) + out = out.strip() + if b'!skip!' in out: + self.skipTest('RLIMIT_NPROC had no effect; probably superuser') + self.assertEqual(out, b'ok') + self.assertEqual(err, b'') + + + @skip_unless_reliable_fork + @unittest.skipUnless(hasattr(threading, 'get_native_id'), "test needs threading.get_native_id()") + def test_native_id_after_fork(self): + script = """if True: + import threading + import os + from test import support + + parent_thread_native_id = threading.current_thread().native_id + print(parent_thread_native_id, flush=True) + assert parent_thread_native_id == threading.get_native_id() + childpid = os.fork() + if childpid == 0: + print(threading.current_thread().native_id, flush=True) + assert threading.current_thread().native_id == threading.get_native_id() + else: + try: + assert parent_thread_native_id == threading.current_thread().native_id + assert parent_thread_native_id == threading.get_native_id() + finally: + support.wait_process(childpid, exitcode=0) + """ + rc, out, err = assert_python_ok('-c', script) + self.assertEqual(rc, 0) + self.assertEqual(err, b"") + native_ids = out.strip().splitlines() + self.assertEqual(len(native_ids), 2) + self.assertNotEqual(native_ids[0], native_ids[1]) + + @cpython_only + def test_finalize_daemon_thread_hang(self): + if support.check_sanitizer(thread=True, memory=True): + # the thread running `time.sleep(100)` below will still be alive + # at process exit + self.skipTest( + "https://github.com/python/cpython/issues/124878 - Known" + " race condition that TSAN identifies.") + # gh-87135: tests that daemon threads hang during finalization + script = textwrap.dedent(''' + import os + import sys + import threading + import time + import _testcapi + + lock = threading.Lock() + lock.acquire() + thread_started_event = threading.Event() + def thread_func(): + try: + thread_started_event.set() + _testcapi.finalize_thread_hang(lock.acquire) + finally: + # Control must not reach here. + os._exit(2) + + t = threading.Thread(target=thread_func) + t.daemon = True + t.start() + thread_started_event.wait() + # Sleep to ensure daemon thread is blocked on `lock.acquire` + # + # Note: This test is designed so that in the unlikely case that + # `0.1` seconds is not sufficient time for the thread to become + # blocked on `lock.acquire`, the test will still pass, it just + # won't be properly testing the thread behavior during + # finalization. + time.sleep(0.1) + + def run_during_finalization(): + # Wake up daemon thread + lock.release() + # Sleep to give the daemon thread time to crash if it is going + # to. + # + # Note: If due to an exceptionally slow execution this delay is + # insufficient, the test will still pass but will simply be + # ineffective as a test. + time.sleep(0.1) + # If control reaches here, the test succeeded. + os._exit(0) + + # Replace sys.stderr.flush as a way to run code during finalization + orig_flush = sys.stderr.flush + def do_flush(*args, **kwargs): + orig_flush(*args, **kwargs) + if not sys.is_finalizing: + return + sys.stderr.flush = orig_flush + run_during_finalization() + + sys.stderr.flush = do_flush + + # If the follow exit code is retained, `run_during_finalization` + # did not run. + sys.exit(1) + ''') + assert_python_ok("-c", script) + + @unittest.skip('TODO: RUSTPYTHON; Thread._tstate_lock not implemented') + def test_tstate_lock(self): + return super().test_tstate_lock() + class ThreadJoinOnShutdown(BaseTestCase): @@ -1077,8 +1342,6 @@ def joiningfunc(mainthread): data = out.decode().replace('\r', '') self.assertEqual(data, "end of main\nend of thread\n") - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread script = """if 1: @@ -1091,10 +1354,7 @@ def test_1_join_on_shutdown(self): """ self._run_and_join(script) - @support.requires_fork() - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - # TODO: RUSTPYTHON need to fix test_1_join_on_shutdown then this might work - @unittest.expectedFailure + @skip_unless_reliable_fork def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter script = """if 1: @@ -1114,9 +1374,8 @@ def test_2_join_in_forked_process(self): """ self._run_and_join(script) - @support.requires_fork() - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - @unittest.skip("TODO: RUSTPYTHON, flaky test") + @unittest.skip('TODO: RUSTPYTHON; flaky test') + @skip_unless_reliable_fork def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. @@ -1145,10 +1404,16 @@ def worker(): self._run_and_join(script) @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_4_daemon_threads(self): + @support.bigmemtest(size=40, memuse=70*2**20, dry_run=False) + def test_4_daemon_threads(self, size): # Check that a daemon thread cannot crash the interpreter on shutdown # by manipulating internal structures that are being disposed of in # the main thread. + if support.check_sanitizer(thread=True): + # some of the threads running `random_io` below will still be alive + # at process exit + self.skipTest("TSAN would report thread leak") + script = """if True: import os import random @@ -1186,9 +1451,33 @@ def main(): rc, out, err = assert_python_ok('-c', script) self.assertFalse(err) - @unittest.skip('TODO: RUSTPYTHON, flaky') - @support.requires_fork() - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + def test_thread_from_thread(self): + script = """if True: + import threading + import time + + def thread2(): + time.sleep(0.05) + print("OK") + + def thread1(): + time.sleep(0.05) + t2 = threading.Thread(target=thread2) + t2.start() + + t = threading.Thread(target=thread1) + t.start() + # do not join() -- the interpreter waits for non-daemon threads to + # finish. + """ + rc, out, err = assert_python_ok('-c', script) + self.assertEqual(err, b"") + self.assertEqual(out.strip(), b"OK") + self.assertEqual(rc, 0) + + # TODO: RUSTPYTHON - parking_lot mutex not fork-safe, child may SIGSEGV + @unittest.skip("TODO: RUSTPYTHON - flaky, parking_lot mutex not fork-safe") + @skip_unless_reliable_fork def test_reinit_tls_after_fork(self): # Issue #13817: fork() would deadlock in a multithreaded program with # the ad-hoc TLS implementation. @@ -1201,20 +1490,20 @@ def do_fork_and_wait(): else: os._exit(50) - # start a bunch of threads that will fork() child processes - threads = [] - for i in range(16): - t = threading.Thread(target=do_fork_and_wait) - threads.append(t) - t.start() + # Ignore the warning about fork with threads. + with warnings.catch_warnings(category=DeprecationWarning, + action="ignore"): + # start a bunch of threads that will fork() child processes + threads = [] + for i in range(16): + t = threading.Thread(target=do_fork_and_wait) + threads.append(t) + t.start() - for t in threads: - t.join() + for t in threads: + t.join() - @unittest.skip('TODO: RUSTPYTHON FAILURE, WORKER BUG') - @support.requires_fork() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @skip_unless_reliable_fork def test_clear_threads_states_after_fork(self): # Issue #17094: check that threads states are cleared after fork() @@ -1225,18 +1514,22 @@ def test_clear_threads_states_after_fork(self): threads.append(t) t.start() - pid = os.fork() - if pid == 0: - # check that threads states have been cleared - if len(sys._current_frames()) == 1: - os._exit(51) - else: - os._exit(52) - else: - support.wait_process(pid, exitcode=51) - - for t in threads: - t.join() + try: + # Ignore the warning about fork with threads. + with warnings.catch_warnings(category=DeprecationWarning, + action="ignore"): + pid = os.fork() + if pid == 0: + # check that threads states have been cleared + if len(sys._current_frames()) == 1: + os._exit(51) + else: + os._exit(52) + else: + support.wait_process(pid, exitcode=51) + finally: + for t in threads: + t.join() class SubinterpThreadingTests(BaseTestCase): @@ -1248,8 +1541,7 @@ def pipe(self): os.set_blocking(r, False) return (r, w) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_threads_join(self): # Non-daemon threads should be joined at subinterpreter shutdown # (issue #18808) @@ -1278,8 +1570,7 @@ def f(): # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_threads_join_2(self): # Same as above, but a delay gets introduced after the thread's # Python code returned but before the thread state is deleted. @@ -1317,8 +1608,47 @@ def f(): # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") + @requires_subinterpreters + def test_threads_join_with_no_main(self): + r_interp, w_interp = self.pipe() + + INTERP = b'I' + FINI = b'F' + DONE = b'D' + + interp = interpreters.create() + interp.exec(f"""if True: + import os + import threading + import time + + done = False + + def notify_fini(): + global done + done = True + os.write({w_interp}, {FINI!r}) + t.join() + threading._register_atexit(notify_fini) + + def task(): + while not done: + time.sleep(0.1) + os.write({w_interp}, {DONE!r}) + t = threading.Thread(target=task) + t.start() + + os.write({w_interp}, {INTERP!r}) + """) + interp.close() + + self.assertEqual(os.read(r_interp, 1), INTERP) + self.assertEqual(os.read(r_interp, 1), FINI) + self.assertEqual(os.read(r_interp, 1), DONE) + @cpython_only def test_daemon_threads_fatal_error(self): + import_module("_testcapi") subinterp_code = f"""if 1: import os import threading @@ -1340,6 +1670,67 @@ def f(): self.assertIn("Fatal Python error: Py_EndInterpreter: " "not the last thread", err.decode()) + def _check_allowed(self, before_start='', *, + allowed=True, + daemon_allowed=True, + daemon=False, + ): + import_module("_testinternalcapi") + subinterp_code = textwrap.dedent(f""" + import test.support + import threading + def func(): + print('this should not have run!') + t = threading.Thread(target=func, daemon={daemon}) + {before_start} + t.start() + """) + check_multi_interp_extensions = bool(support.Py_GIL_DISABLED) + script = textwrap.dedent(f""" + import test.support + test.support.run_in_subinterp_with_config( + {subinterp_code!r}, + use_main_obmalloc=True, + allow_fork=True, + allow_exec=True, + allow_threads={allowed}, + allow_daemon_threads={daemon_allowed}, + check_multi_interp_extensions={check_multi_interp_extensions}, + own_gil=False, + ) + """) + with test.support.SuppressCrashReport(): + _, _, err = assert_python_ok("-c", script) + return err.decode() + + @cpython_only + def test_threads_not_allowed(self): + err = self._check_allowed( + allowed=False, + daemon_allowed=False, + daemon=False, + ) + self.assertIn('RuntimeError', err) + + @cpython_only + def test_daemon_threads_not_allowed(self): + with self.subTest('via Thread()'): + err = self._check_allowed( + allowed=True, + daemon_allowed=False, + daemon=True, + ) + self.assertIn('RuntimeError', err) + + with self.subTest('via Thread.daemon setter'): + err = self._check_allowed( + 't.daemon = True', + allowed=True, + daemon_allowed=False, + daemon=False, + ) + self.assertIn('RuntimeError', err) + class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called @@ -1368,7 +1759,7 @@ def test_releasing_unacquired_lock(self): lock = threading.Lock() self.assertRaises(RuntimeError, lock.release) - @unittest.skip("TODO: RUSTPYTHON, flaky test") + @unittest.skip('TODO: RUSTPYTHON; flaky test') @requires_subprocess() def test_recursion_limit(self): # Issue 9670 @@ -1378,13 +1769,6 @@ def test_recursion_limit(self): # for threads script = """if True: import threading - # TODO: RUSTPYTHON - # Following lines set the recursion limit to previous default of 512 - # for the execution of this process. Without this, the test runners - # on Github fail. Ideally, at a future point this should be removed. - import os, sys - if os.getenv("CI"): - sys.setrecursionlimit(512) def recurse(): return recurse() @@ -1489,6 +1873,37 @@ def run(): self.assertEqual(out, b'') self.assertNotIn("Unhandled exception", err.decode()) + def test_print_exception_gh_102056(self): + # This used to crash. See gh-102056. + script = r"""if True: + import time + import threading + import _thread + + def f(): + try: + f() + except RecursionError: + f() + + def g(): + try: + raise ValueError() + except* ValueError: + f() + + def h(): + time.sleep(1) + _thread.interrupt_main() + + t = threading.Thread(target=h) + t.start() + g() + t.join() + """ + + assert_python_failure("-c", script) + def test_bare_raise_in_brand_new_thread(self): def bare_raise(): raise @@ -1526,6 +1941,23 @@ def modify_file(): t.start() t.join() + def test_dummy_thread_on_interpreter_shutdown(self): + # GH-130522: When `threading` held a reference to itself and then a + # _DummyThread() object was created, destruction of the dummy thread + # would emit an unraisable exception at shutdown, due to a lock being + # destroyed. + code = """if True: + import sys + import threading + + threading.x = sys.modules[__name__] + x = threading._DummyThread() + """ + rc, out, err = assert_python_ok("-c", code) + self.assertEqual(rc, 0) + self.assertEqual(out, b"") + self.assertEqual(err, b"") + class ThreadRunFail(threading.Thread): def run(self): @@ -1537,7 +1969,7 @@ def setUp(self): restore_default_excepthook(self) super().setUp() - @unittest.skip('TODO: RUSTPYTHON, flaky') + @force_not_colorized def test_excepthook(self): with support.captured_output("stderr") as stderr: thread = ThreadRunFail(name="excepthook thread") @@ -1551,6 +1983,7 @@ def test_excepthook(self): self.assertIn('ValueError: run failed', stderr) @support.cpython_only + @force_not_colorized def test_excepthook_thread_None(self): # threading.excepthook called with thread=None: log the thread # identifier in this case. @@ -1686,32 +2119,51 @@ class PyRLockTests(lock_tests.RLockTests): class CRLockTests(lock_tests.RLockTests): locktype = staticmethod(threading._CRLock) - # TODO: RUSTPYTHON - @unittest.skip("TODO: RUSTPYTHON, flaky test") - def test_different_thread(self): - super().test_different_thread() + def test_signature(self): # gh-102029 + with warnings.catch_warnings(record=True) as warnings_log: + threading.RLock() + self.assertEqual(warnings_log, []) - # TODO: RUSTPYTHON - @unittest.expectedFailure + arg_types = [ + ((1,), {}), + ((), {'a': 1}), + ((1, 2), {'a': 1}), + ] + for args, kwargs in arg_types: + with self.subTest(args=args, kwargs=kwargs): + with self.assertWarns(DeprecationWarning): + threading.RLock(*args, **kwargs) + + # Subtypes with custom `__init__` are allowed (but, not recommended): + class CustomRLock(self.locktype): + def __init__(self, a, *, b) -> None: + super().__init__() + + with warnings.catch_warnings(record=True) as warnings_log: + CustomRLock(1, b=2) + self.assertEqual(warnings_log, []) + + @unittest.expectedFailure # TODO: RUSTPYTHON def test_release_save_unacquired(self): - super().test_release_save_unacquired() + return super().test_release_save_unacquired() + + @unittest.skip('TODO: RUSTPYTHON; flaky test') + def test_different_thread(self): + return super().test_different_thread() class EventTests(lock_tests.EventTests): eventtype = staticmethod(threading.Event) - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reset_internal_locks(): # TODO: RUSTPYTHON; remove this when done - super().test_reset_internal_locks() - class ConditionAsRLockTests(lock_tests.RLockTests): # Condition uses an RLock by default and exports its API. locktype = staticmethod(threading.Condition) - # TODO: RUSTPYTHON - @unittest.skip("TODO: RUSTPYTHON, flaky test") + def test_recursion_count(self): + self.skipTest("Condition does not expose _recursion_count()") + + @unittest.skip('TODO: RUSTPYTHON; flaky test') def test_different_thread(self): - super().test_different_thread() + return super().test_different_thread() class ConditionTests(lock_tests.ConditionTests): condtype = staticmethod(threading.Condition) @@ -1727,8 +2179,6 @@ class BarrierTests(lock_tests.BarrierTests): class MiscTestCase(unittest.TestCase): - # TODO: RUSTPYTHON - @unittest.expectedFailure def test__all__(self): restore_default_excepthook(self) @@ -1762,7 +2212,8 @@ def check_interrupt_main_noerror(self, signum): # Restore original handler signal.signal(signum, handler) - @unittest.skip("TODO: RUSTPYTHON; flaky") + @unittest.skip('TODO: RUSTPYTHON; flaky') + @requires_gil_enabled("gh-118433: Flaky due to a longstanding bug") def test_interrupt_main_subthread(self): # Calling start_new_thread with a function that executes interrupt_main # should raise KeyboardInterrupt upon completion. @@ -1821,8 +2272,6 @@ def worker(started, cont, interrupted): class AtexitTests(unittest.TestCase): - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_atexit_output(self): rc, out, err = assert_python_ok("-c", """if True: import threading @@ -1851,8 +2300,6 @@ def test_atexit_called_once(self): self.assertFalse(err) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_atexit_after_shutdown(self): # The only way to do this is by registering an atexit within # an atexit, which is intended to raise an exception. diff --git a/Lib/threading.py b/Lib/threading.py index 811ea94a126..15bf786a6a9 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,11 +3,11 @@ import os as _os import sys as _sys import _thread -import functools +import warnings from time import monotonic as _time from _weakrefset import WeakSet -from itertools import islice as _islice, count as _count +from itertools import count as _count try: from _collections import deque as _deque except ImportError: @@ -28,13 +28,20 @@ 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', - 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile'] + 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', + 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe -_start_new_thread = _thread.start_new_thread +_start_joinable_thread = _thread.start_joinable_thread +_daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock -_set_sentinel = _thread._set_sentinel +_LockType = _thread.LockType +_thread_shutdown = _thread._shutdown +_make_thread_handle = _thread._make_thread_handle +_ThreadHandle = _thread._ThreadHandle get_ident = _thread.get_ident +_get_main_thread_ident = _thread._get_main_thread_ident +_is_main_interpreter = _thread._is_main_interpreter try: get_native_id = _thread.get_native_id _HAVE_THREAD_NATIVE_ID = True @@ -49,6 +56,13 @@ TIMEOUT_MAX = _thread.TIMEOUT_MAX del _thread +# get thread-local implementation, either from the thread +# module, or from the python fallback + +try: + from _thread import _local as local +except ImportError: + from _threading_local import local # Support for profile and trace hooks @@ -60,11 +74,20 @@ def setprofile(func): The func will be passed to sys.setprofile() for each thread, before its run() method is called. - """ global _profile_hook _profile_hook = func +def setprofile_all_threads(func): + """Set a profile function for all threads started from the threading module + and all Python threads that are currently executing. + + The func will be passed to sys.setprofile() for each thread, before its + run() method is called. + """ + setprofile(func) + _sys._setprofileallthreads(func) + def getprofile(): """Get the profiler function as set by threading.setprofile().""" return _profile_hook @@ -74,12 +97,10 @@ def settrace(func): The func will be passed to sys.settrace() for each thread, before its run() method is called. - """ global _trace_hook _trace_hook = func -# TODO: RUSTPYTHON def settrace_all_threads(func): """Set a trace function for all threads started from the threading module and all Python threads that are currently executing. @@ -96,7 +117,7 @@ def gettrace(): # Synchronization classes -Lock = _allocate_lock +Lock = _LockType def RLock(*args, **kwargs): """Factory function that returns a new reentrant lock. @@ -107,6 +128,12 @@ def RLock(*args, **kwargs): acquired it. """ + if args or kwargs: + warnings.warn( + 'Passing arguments to RLock is deprecated and will be removed in 3.15', + DeprecationWarning, + stacklevel=2, + ) if _CRLock is None: return _PyRLock(*args, **kwargs) return _CRLock(*args, **kwargs) @@ -229,6 +256,13 @@ def _release_save(self): def _is_owned(self): return self._owner == get_ident() + # Internal method used for reentrancy checks + + def _recursion_count(self): + if self._owner != get_ident(): + return 0 + return self._count + _PyRLock = _RLock @@ -254,18 +288,12 @@ def __init__(self, lock=None): # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). - try: + if hasattr(lock, '_release_save'): self._release_save = lock._release_save - except AttributeError: - pass - try: + if hasattr(lock, '_acquire_restore'): self._acquire_restore = lock._acquire_restore - except AttributeError: - pass - try: + if hasattr(lock, '_is_owned'): self._is_owned = lock._is_owned - except AttributeError: - pass self._waiters = _deque() def _at_fork_reinit(self): @@ -308,7 +336,7 @@ def wait(self, timeout=None): awakened or timed out, it re-acquires the lock and returns. When the timeout argument is present and not None, it should be a - floating point number specifying a timeout for the operation in seconds + floating-point number specifying a timeout for the operation in seconds (or fractions thereof). When the underlying lock is an RLock, it is not released using its @@ -436,6 +464,11 @@ def __init__(self, value=1): self._cond = Condition(Lock()) self._value = value + def __repr__(self): + cls = self.__class__ + return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" + f" value={self._value}>") + def acquire(self, blocking=True, timeout=None): """Acquire a semaphore, decrementing the internal counter by one. @@ -494,8 +527,7 @@ def release(self, n=1): raise ValueError('n must be one or more') with self._cond: self._value += n - for i in range(n): - self._cond.notify() + self._cond.notify(n) def __exit__(self, t, v, tb): self.release() @@ -519,9 +551,14 @@ class BoundedSemaphore(Semaphore): """ def __init__(self, value=1): - Semaphore.__init__(self, value) + super().__init__(value) self._initial_value = value + def __repr__(self): + cls = self.__class__ + return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" + f" value={self._value}/{self._initial_value}>") + def release(self, n=1): """Release a semaphore, incrementing the internal counter by one or more. @@ -538,8 +575,7 @@ def release(self, n=1): if self._value + n > self._initial_value: raise ValueError("Semaphore released too many times") self._value += n - for i in range(n): - self._cond.notify() + self._cond.notify(n) class Event: @@ -557,8 +593,13 @@ def __init__(self): self._cond = Condition(Lock()) self._flag = False + def __repr__(self): + cls = self.__class__ + status = 'set' if self._flag else 'unset' + return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" + def _at_fork_reinit(self): - # Private method called by Thread._reset_internal_locks() + # Private method called by Thread._after_fork() self._cond._at_fork_reinit() def is_set(self): @@ -568,7 +609,7 @@ def is_set(self): def isSet(self): """Return true if and only if the internal flag is true. - This method is deprecated, use notify_all() instead. + This method is deprecated, use is_set() instead. """ import warnings @@ -605,11 +646,12 @@ def wait(self, timeout=None): the optional timeout occurs. When the timeout argument is present and not None, it should be a - floating point number specifying a timeout for the operation in seconds + floating-point number specifying a timeout for the operation in seconds (or fractions thereof). This method returns the internal flag on exit, so it will always return - True except if a timeout is given and the operation times out. + ``True`` except if a timeout is given and the operation times out, when + it will return ``False``. """ with self._cond: @@ -648,6 +690,8 @@ def __init__(self, parties, action=None, timeout=None): default for all subsequent 'wait()' calls. """ + if parties < 1: + raise ValueError("parties must be >= 1") self._cond = Condition(Lock()) self._action = action self._timeout = timeout @@ -655,6 +699,13 @@ def __init__(self, parties, action=None, timeout=None): self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken self._count = 0 + def __repr__(self): + cls = self.__class__ + if self.broken: + return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>" + return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" + f" waiters={self.n_waiting}/{self.parties}>") + def wait(self, timeout=None): """Wait for the barrier. @@ -802,25 +853,6 @@ def _newname(name_template): _limbo = {} _dangling = WeakSet() -# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() -# to wait until all Python thread states get deleted: -# see Thread._set_tstate_lock(). -_shutdown_locks_lock = _allocate_lock() -_shutdown_locks = set() - -def _maintain_shutdown_locks(): - """ - Drop any shutdown locks that don't correspond to running threads anymore. - - Calling this from time to time avoids an ever-growing _shutdown_locks - set when Thread objects are not joined explicitly. See bpo-37788. - - This must be called with _shutdown_locks_lock acquired. - """ - # If a lock was released, the corresponding thread has exited - to_remove = [lock for lock in _shutdown_locks if not lock.locked()] - _shutdown_locks.difference_update(to_remove) - # Main class for threads @@ -848,7 +880,7 @@ class is implemented. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. - *args* is the argument tuple for the target invocation. Defaults to (). + *args* is a list or tuple of arguments for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. @@ -877,15 +909,16 @@ class is implemented. self._args = args self._kwargs = kwargs if daemon is not None: + if daemon and not _daemon_threads_allowed(): + raise RuntimeError('daemon threads are disabled in this (sub)interpreter') self._daemonic = daemon else: self._daemonic = current_thread().daemon self._ident = None if _HAVE_THREAD_NATIVE_ID: self._native_id = None - self._tstate_lock = None + self._handle = _ThreadHandle() self._started = Event() - self._is_stopped = False self._initialized = True # Copy of sys.stderr used by self._invoke_excepthook() self._stderr = _sys.stderr @@ -893,30 +926,26 @@ class is implemented. # For debugging and _after_fork() _dangling.add(self) - def _reset_internal_locks(self, is_alive): - # private! Called by _after_fork() to reset our internal locks as - # they may be in an invalid state leading to a deadlock or crash. + def _after_fork(self, new_ident=None): + # Private! Called by threading._after_fork(). self._started._at_fork_reinit() - if is_alive: - # bpo-42350: If the fork happens when the thread is already stopped - # (ex: after threading._shutdown() has been called), _tstate_lock - # is None. Do nothing in this case. - if self._tstate_lock is not None: - self._tstate_lock._at_fork_reinit() - self._tstate_lock.acquire() + if new_ident is not None: + # This thread is alive. + self._ident = new_ident + assert self._handle.ident == new_ident + if _HAVE_THREAD_NATIVE_ID: + self._set_native_id() else: - # The thread isn't alive after fork: it doesn't have a tstate - # anymore. - self._is_stopped = True - self._tstate_lock = None + # Otherwise, the thread is dead, Jim. _PyThread_AfterFork() + # already marked our handle done. + pass def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" if self._started.is_set(): status = "started" - self.is_alive() # easy way to get ._is_stopped set when appropriate - if self._is_stopped: + if self._handle.is_done(): status = "stopped" if self._daemonic: status += " daemon" @@ -943,12 +972,14 @@ def start(self): with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + # Start joinable thread + _start_joinable_thread(self._bootstrap, handle=self._handle, + daemon=self.daemon) except Exception: with _active_limbo_lock: del _limbo[self] raise - self._started.wait() + self._started.wait() # Will set ident and native_id def run(self): """Method representing the thread's activity. @@ -994,23 +1025,9 @@ def _set_ident(self): def _set_native_id(self): self._native_id = get_native_id() - def _set_tstate_lock(self): - """ - Set a lock object which will be released by the interpreter when - the underlying thread state (see pystate.h) gets deleted. - """ - self._tstate_lock = _set_sentinel() - self._tstate_lock.acquire() - - if not self.daemon: - with _shutdown_locks_lock: - _maintain_shutdown_locks() - _shutdown_locks.add(self._tstate_lock) - def _bootstrap_inner(self): try: self._set_ident() - self._set_tstate_lock() if _HAVE_THREAD_NATIVE_ID: self._set_native_id() self._started.set() @@ -1028,40 +1045,7 @@ def _bootstrap_inner(self): except: self._invoke_excepthook(self) finally: - with _active_limbo_lock: - try: - # We don't call self._delete() because it also - # grabs _active_limbo_lock. - del _active[get_ident()] - except: - pass - - def _stop(self): - # After calling ._stop(), .is_alive() returns False and .join() returns - # immediately. ._tstate_lock must be released before calling ._stop(). - # - # Normal case: C code at the end of the thread's life - # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and - # that's detected by our ._wait_for_tstate_lock(), called by .join() - # and .is_alive(). Any number of threads _may_ call ._stop() - # simultaneously (for example, if multiple threads are blocked in - # .join() calls), and they're not serialized. That's harmless - - # they'll just make redundant rebindings of ._is_stopped and - # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the - # "assert self._is_stopped" in ._wait_for_tstate_lock() always works - # (the assert is executed only if ._tstate_lock is None). - # - # Special case: _main_thread releases ._tstate_lock via this - # module's _shutdown() function. - lock = self._tstate_lock - if lock is not None: - assert not lock.locked() - self._is_stopped = True - self._tstate_lock = None - if not self.daemon: - with _shutdown_locks_lock: - # Remove our lock and other released locks from _shutdown_locks - _maintain_shutdown_locks() + self._delete() def _delete(self): "Remove current thread from the dict of currently running threads." @@ -1080,7 +1064,7 @@ def join(self, timeout=None): or until the optional timeout occurs. When the timeout argument is present and not None, it should be a - floating point number specifying a timeout for the operation in seconds + floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out. @@ -1103,39 +1087,12 @@ def join(self, timeout=None): if self is current_thread(): raise RuntimeError("cannot join current thread") - if timeout is None: - self._wait_for_tstate_lock() - else: - # the behavior of a negative timeout isn't documented, but - # historically .join(timeout=x) for x<0 has acted as if timeout=0 - self._wait_for_tstate_lock(timeout=max(timeout, 0)) - - def _wait_for_tstate_lock(self, block=True, timeout=-1): - # Issue #18808: wait for the thread state to be gone. - # At the end of the thread's life, after all knowledge of the thread - # is removed from C data structures, C code releases our _tstate_lock. - # This method passes its arguments to _tstate_lock.acquire(). - # If the lock is acquired, the C code is done, and self._stop() is - # called. That sets ._is_stopped to True, and ._tstate_lock to None. - lock = self._tstate_lock - if lock is None: - # already determined that the C code is done - assert self._is_stopped - return + # the behavior of a negative timeout isn't documented, but + # historically .join(timeout=x) for x<0 has acted as if timeout=0 + if timeout is not None: + timeout = max(timeout, 0) - try: - if lock.acquire(block, timeout): - lock.release() - self._stop() - except: - if lock.locked(): - # bpo-45274: lock.acquire() acquired the lock, but the function - # was interrupted with an exception before reaching the - # lock.release(). It can happen if a signal handler raises an - # exception, like CTRL+C which raises KeyboardInterrupt. - lock.release() - self._stop() - raise + self._handle.join(timeout) @property def name(self): @@ -1186,10 +1143,7 @@ def is_alive(self): """ assert self._initialized, "Thread.__init__() not called" - if self._is_stopped or not self._started.is_set(): - return False - self._wait_for_tstate_lock(False) - return not self._is_stopped + return self._started.is_set() and not self._handle.is_done() @property def daemon(self): @@ -1210,6 +1164,8 @@ def daemon(self): def daemon(self, daemonic): if not self._initialized: raise RuntimeError("Thread.__init__() not called") + if daemonic and not _daemon_threads_allowed(): + raise RuntimeError('daemon threads are disabled in this interpreter') if self._started.is_set(): raise RuntimeError("cannot set daemon status of active thread") self._daemonic = daemonic @@ -1396,19 +1352,45 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread", daemon=False) - self._set_tstate_lock() self._started.set() - self._set_ident() + self._ident = _get_main_thread_ident() + self._handle = _make_thread_handle(self._ident) if _HAVE_THREAD_NATIVE_ID: self._set_native_id() with _active_limbo_lock: _active[self._ident] = self +# Helper thread-local instance to detect when a _DummyThread +# is collected. Not a part of the public API. +_thread_local_info = local() + + +class _DeleteDummyThreadOnDel: + ''' + Helper class to remove a dummy thread from threading._active on __del__. + ''' + + def __init__(self, dummy_thread): + self._dummy_thread = dummy_thread + self._tident = dummy_thread.ident + # Put the thread on a thread local variable so that when + # the related thread finishes this instance is collected. + # + # Note: no other references to this instance may be created. + # If any client code creates a reference to this instance, + # the related _DummyThread will be kept forever! + _thread_local_info._track_dummy_thread_ref = self + + def __del__(self, _active_limbo_lock=_active_limbo_lock, _active=_active): + with _active_limbo_lock: + if _active.get(self._tident) is self._dummy_thread: + _active.pop(self._tident, None) + + # Dummy thread class to represent threads not started here. -# These aren't garbage collected when they die, nor can they be waited for. -# If they invoke anything in threading.py that calls current_thread(), they -# leave an entry in the _active dict forever after. +# These should be added to `_active` and removed automatically +# when they die, although they can't be waited for. # Their purpose is to return *something* from current_thread(). # They are marked as daemon threads so we won't wait for them # when we exit (conform previous semantics). @@ -1416,24 +1398,31 @@ def __init__(self): class _DummyThread(Thread): def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) - + Thread.__init__(self, name=_newname("Dummy-%d"), + daemon=_daemon_threads_allowed()) self._started.set() self._set_ident() + self._handle = _make_thread_handle(self._ident) if _HAVE_THREAD_NATIVE_ID: self._set_native_id() with _active_limbo_lock: _active[self._ident] = self - - def _stop(self): - pass + _DeleteDummyThreadOnDel(self) def is_alive(self): - assert not self._is_stopped and self._started.is_set() - return True + if not self._handle.is_done() and self._started.is_set(): + return True + raise RuntimeError("thread is not alive") def join(self, timeout=None): - assert False, "cannot join a dummy thread" + raise RuntimeError("cannot join a dummy thread") + + def _after_fork(self, new_ident=None): + if new_ident is not None: + self.__class__ = _MainThread + self._name = 'MainThread' + self._daemonic = False + Thread._after_fork(self, new_ident=new_ident) # Global API functions @@ -1468,6 +1457,8 @@ def active_count(): enumerate(). """ + # NOTE: if the logic in here ever changes, update Modules/posixmodule.c + # warn_about_fork_with_threads() to match. with _active_limbo_lock: return len(_active) + len(_limbo) @@ -1514,8 +1505,7 @@ def _register_atexit(func, *arg, **kwargs): if _SHUTTING_DOWN: raise RuntimeError("can't register atexit after shutdown") - call = functools.partial(func, *arg, **kwargs) - _threading_atexits.append(call) + _threading_atexits.append(lambda: func(*arg, **kwargs)) from _thread import stack_size @@ -1530,12 +1520,11 @@ def _shutdown(): """ Wait until the Python thread state of all non-daemon threads get deleted. """ - # Obscure: other threads may be waiting to join _main_thread. That's - # dubious, but some code does it. We can't wait for C code to release - # the main thread's tstate_lock - that won't happen until the interpreter - # is nearly dead. So we release it here. Note that just calling _stop() - # isn't enough: other threads may already be waiting on _tstate_lock. - if _main_thread._is_stopped: + # Obscure: other threads may be waiting to join _main_thread. That's + # dubious, but some code does it. We can't wait for it to be marked as done + # normally - that won't happen until the interpreter is nearly dead. So + # mark it done here. + if _main_thread._handle.is_done() and _is_main_interpreter(): # _shutdown() was already called return @@ -1547,39 +1536,11 @@ def _shutdown(): for atexit_call in reversed(_threading_atexits): atexit_call() - # Main thread - if _main_thread.ident == get_ident(): - tlock = _main_thread._tstate_lock - # The main thread isn't finished yet, so its thread state lock can't - # have been released. - assert tlock is not None - assert tlock.locked() - tlock.release() - _main_thread._stop() - else: - # bpo-1596321: _shutdown() must be called in the main thread. - # If the threading module was not imported by the main thread, - # _main_thread is the thread which imported the threading module. - # In this case, ignore _main_thread, similar behavior than for threads - # spawned by C libraries or using _thread.start_new_thread(). - pass - - # Join all non-deamon threads - while True: - with _shutdown_locks_lock: - locks = list(_shutdown_locks) - _shutdown_locks.clear() - - if not locks: - break - - for lock in locks: - # mimic Thread.join() - lock.acquire() - lock.release() - - # new threads can be spawned while we were waiting for the other - # threads to complete + if _is_main_interpreter(): + _main_thread._handle._set_done() + + # Wait for all non-daemon threads to exit. + _thread_shutdown() def main_thread(): @@ -1588,16 +1549,9 @@ def main_thread(): In normal conditions, the main thread is the thread from which the Python interpreter was started. """ + # XXX Figure this out for subinterpreters. (See gh-75698.) return _main_thread -# get thread-local implementation, either from the thread -# module, or from the python fallback - -try: - from _thread import _local as local -except ImportError: - from _threading_local import local - def _after_fork(): """ @@ -1606,7 +1560,6 @@ def _after_fork(): # Reset _active_limbo_lock, in case we forked while the lock was held # by another (non-forked) thread. http://bugs.python.org/issue874900 global _active_limbo_lock, _main_thread - global _shutdown_locks_lock, _shutdown_locks _active_limbo_lock = RLock() # fork() only copied the current thread; clear references to others. @@ -1622,10 +1575,6 @@ def _after_fork(): _main_thread = current - # reset _shutdown() locks: threads re-register their _tstate_lock below - _shutdown_locks_lock = _allocate_lock() - _shutdown_locks = set() - with _active_limbo_lock: # Dangling thread instances must still have their locks reset, # because someone may join() them. @@ -1635,16 +1584,13 @@ def _after_fork(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. if thread is current: - # There is only one active thread. We reset the ident to - # its new value since it can have changed. - thread._reset_internal_locks(True) + # This is the one and only active thread. ident = get_ident() - thread._ident = ident + thread._after_fork(new_ident=ident) new_active[ident] = thread else: # All the others are already stopped. - thread._reset_internal_locks(False) - thread._stop() + thread._after_fork() _limbo.clear() _active.clear() diff --git a/crates/vm/src/builtins/asyncgenerator.rs b/crates/vm/src/builtins/asyncgenerator.rs index e16cbc6d18d..891083f3e6b 100644 --- a/crates/vm/src/builtins/asyncgenerator.rs +++ b/crates/vm/src/builtins/asyncgenerator.rs @@ -57,14 +57,14 @@ impl PyAsyncGen { zelf.ag_hooks_inited.store(true); - // Get and store finalizer from thread-local storage - let finalizer = crate::vm::thread::ASYNC_GEN_FINALIZER.with_borrow(|f| f.as_ref().cloned()); + // Get and store finalizer from VM + let finalizer = vm.async_gen_finalizer.borrow().clone(); if let Some(finalizer) = finalizer { *zelf.ag_finalizer.lock() = Some(finalizer); } // Call firstiter hook - let firstiter = crate::vm::thread::ASYNC_GEN_FIRSTITER.with_borrow(|f| f.as_ref().cloned()); + let firstiter = vm.async_gen_firstiter.borrow().clone(); if let Some(firstiter) = firstiter { let obj: PyObjectRef = zelf.to_owned().into(); firstiter.call((obj,), vm)?; diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index 05f435649ad..7ac5129d805 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -661,6 +661,11 @@ pub mod module { } fn py_os_after_fork_child(vm: &VirtualMachine) { + // Mark all other threads as done before running Python callbacks + // See _PyThread_AfterFork behavior + #[cfg(feature = "threading")] + crate::stdlib::thread::after_fork_child(vm); + let after_forkers_child: Vec = vm.state.after_forkers_child.lock().clone(); run_at_forkers(after_forkers_child, false, vm); } @@ -670,8 +675,64 @@ pub mod module { run_at_forkers(after_forkers_parent, false, vm); } + /// Warn if forking from a multi-threaded process + fn warn_if_multi_threaded(name: &str, vm: &VirtualMachine) { + // Only check threading if it was already imported + // Avoid vm.import() which can execute arbitrary Python code in the fork path + let threading = match vm + .sys_module + .get_attr("modules", vm) + .and_then(|m| m.get_item("threading", vm)) + { + Ok(m) => m, + Err(_) => return, + }; + let active = threading.get_attr("_active", vm).ok(); + let limbo = threading.get_attr("_limbo", vm).ok(); + + let count_dict = |obj: Option| -> usize { + obj.and_then(|o| o.length_opt(vm)) + .and_then(|r| r.ok()) + .unwrap_or(0) + }; + + let num_threads = count_dict(active) + count_dict(limbo); + if num_threads > 1 { + // Use Python warnings module to ensure filters are applied correctly + let Ok(warnings) = vm.import("warnings", 0) else { + return; + }; + let Ok(warn_fn) = warnings.get_attr("warn", vm) else { + return; + }; + + let pid = unsafe { libc::getpid() }; + let msg = format!( + "This process (pid={}) is multi-threaded, use of {}() may lead to deadlocks in the child.", + pid, name + ); + + // Call warnings.warn(message, DeprecationWarning, stacklevel=2) + // stacklevel=2 to point to the caller of fork() + let args = crate::function::FuncArgs::new( + vec![ + vm.ctx.new_str(msg).into(), + vm.ctx.exceptions.deprecation_warning.as_object().to_owned(), + ], + crate::function::KwArgs::new( + [("stacklevel".to_owned(), vm.ctx.new_int(2).into())] + .into_iter() + .collect(), + ), + ); + let _ = warn_fn.call(args, vm); + } + } + #[pyfunction] fn fork(vm: &VirtualMachine) -> i32 { + warn_if_multi_threaded("fork", vm); + let pid: i32; py_os_before_fork(vm); unsafe { diff --git a/crates/vm/src/stdlib/sys.rs b/crates/vm/src/stdlib/sys.rs index 8295da9486d..8a65a926cb2 100644 --- a/crates/vm/src/stdlib/sys.rs +++ b/crates/vm/src/stdlib/sys.rs @@ -815,6 +815,32 @@ mod sys { } } + /// Return a dictionary mapping each thread's identifier to the topmost stack frame + /// currently active in that thread at the time the function is called. + #[cfg(feature = "threading")] + #[pyfunction] + fn _current_frames(vm: &VirtualMachine) -> PyResult { + use crate::AsObject; + use crate::stdlib::thread::get_all_current_frames; + + let frames = get_all_current_frames(vm); + let dict = vm.ctx.new_dict(); + + for (thread_id, frame) in frames { + let key = vm.ctx.new_int(thread_id); + dict.set_item(key.as_object(), frame.into(), vm)?; + } + + Ok(dict) + } + + /// Stub for non-threading builds - returns empty dict + #[cfg(not(feature = "threading"))] + #[pyfunction] + fn _current_frames(vm: &VirtualMachine) -> PyResult { + Ok(vm.ctx.new_dict()) + } + #[pyfunction] fn gettrace(vm: &VirtualMachine) -> PyObjectRef { vm.trace_func.borrow().clone() @@ -1107,6 +1133,22 @@ mod sys { update_use_tracing(vm); } + #[pyfunction] + fn _settraceallthreads(tracefunc: PyObjectRef, vm: &VirtualMachine) { + let func = (!vm.is_none(&tracefunc)).then(|| tracefunc.clone()); + *vm.state.global_trace_func.lock() = func; + vm.trace_func.replace(tracefunc); + update_use_tracing(vm); + } + + #[pyfunction] + fn _setprofileallthreads(profilefunc: PyObjectRef, vm: &VirtualMachine) { + let func = (!vm.is_none(&profilefunc)).then(|| profilefunc.clone()); + *vm.state.global_profile_func.lock() = func; + vm.profile_func.replace(profilefunc); + update_use_tracing(vm); + } + #[cfg(feature = "threading")] #[pyattr] fn thread_info(vm: &VirtualMachine) -> PyTupleRef { @@ -1187,10 +1229,10 @@ mod sys { } if let Some(finalizer) = args.finalizer.into_option() { - crate::vm::thread::ASYNC_GEN_FINALIZER.set(finalizer); + *vm.async_gen_finalizer.borrow_mut() = finalizer; } if let Some(firstiter) = args.firstiter.into_option() { - crate::vm::thread::ASYNC_GEN_FIRSTITER.set(firstiter); + *vm.async_gen_firstiter.borrow_mut() = firstiter; } Ok(()) @@ -1212,12 +1254,8 @@ mod sys { #[pyfunction] fn get_asyncgen_hooks(vm: &VirtualMachine) -> AsyncgenHooksData { AsyncgenHooksData { - firstiter: crate::vm::thread::ASYNC_GEN_FIRSTITER - .with_borrow(Clone::clone) - .to_pyobject(vm), - finalizer: crate::vm::thread::ASYNC_GEN_FINALIZER - .with_borrow(Clone::clone) - .to_pyobject(vm), + firstiter: vm.async_gen_firstiter.borrow().clone().to_pyobject(vm), + finalizer: vm.async_gen_finalizer.borrow().clone().to_pyobject(vm), } } diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index b7f00537c26..0e14fe0e4d1 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -1,13 +1,16 @@ //! Implementation of the _thread module #[cfg_attr(target_arch = "wasm32", allow(unused_imports))] -pub(crate) use _thread::{RawRMutex, make_module}; +pub(crate) use _thread::{ + CurrentFrameSlot, HandleEntry, RawRMutex, ShutdownEntry, after_fork_child, + get_all_current_frames, get_ident, init_main_thread_ident, make_module, +}; #[pymodule] pub(crate) mod _thread { use crate::{ AsObject, Py, PyPayload, PyRef, PyResult, VirtualMachine, builtins::{PyDictRef, PyStr, PyTupleRef, PyType, PyTypeRef}, - convert::ToPyException, + frame::FrameRef, function::{ArgCallable, Either, FuncArgs, KwArgs, OptionalArg, PySetterValue}, types::{Constructor, GetAttr, Representable, SetAttr}, }; @@ -19,7 +22,6 @@ pub(crate) mod _thread { lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex}, }; use std::thread; - use thread_local::ThreadLocal; // PYTHREAD_NAME: show current thread name pub const PYTHREAD_NAME: Option<&str> = { @@ -110,7 +112,8 @@ pub(crate) mod _thread { } #[pyattr(name = "LockType")] - #[pyclass(module = "thread", name = "lock")] + #[pyattr(name = "lock")] + #[pyclass(module = "_thread", name = "lock")] #[derive(PyPayload)] struct Lock { mu: RawMutex, @@ -172,10 +175,10 @@ pub(crate) mod _thread { } impl Constructor for Lock { - type Args = FuncArgs; + type Args = (); - fn py_new(_cls: &Py, _args: Self::Args, vm: &VirtualMachine) -> PyResult { - Err(vm.new_type_error("cannot create '_thread.lock' instances")) + fn py_new(_cls: &Py, _args: Self::Args, _vm: &VirtualMachine) -> PyResult { + Ok(Self { mu: RawMutex::INIT }) } } @@ -188,7 +191,7 @@ pub(crate) mod _thread { pub type RawRMutex = RawReentrantMutex; #[pyattr] - #[pyclass(module = "thread", name = "RLock")] + #[pyclass(module = "_thread", name = "RLock")] #[derive(PyPayload)] struct RLock { mu: RawRMutex, @@ -201,7 +204,7 @@ pub(crate) mod _thread { } } - #[pyclass(with(Representable))] + #[pyclass(with(Representable), flags(BASETYPE))] impl RLock { #[pyslot] fn slot_new(cls: PyTypeRef, _args: FuncArgs, vm: &VirtualMachine) -> PyResult { @@ -249,9 +252,10 @@ pub(crate) mod _thread { } self.count.store(0, core::sync::atomic::Ordering::Relaxed); let new_mut = RawRMutex::INIT; - - let old_mutex: AtomicCell<&RawRMutex> = AtomicCell::new(&self.mu); - old_mutex.swap(&new_mut); + unsafe { + let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); + old_mutex.swap(new_mut); + } Ok(()) } @@ -283,12 +287,28 @@ pub(crate) mod _thread { } } + /// Get thread identity - uses pthread_self() on Unix for fork compatibility #[pyfunction] - fn get_ident() -> u64 { - thread_to_id(&thread::current()) + pub fn get_ident() -> u64 { + current_thread_id() + } + + /// Get OS-level thread ID (pthread_self on Unix) + /// This is important for fork compatibility - the ID must remain stable after fork + #[cfg(unix)] + fn current_thread_id() -> u64 { + // pthread_self() like CPython for fork compatibility + unsafe { libc::pthread_self() as u64 } } - fn thread_to_id(t: &thread::Thread) -> u64 { + #[cfg(not(unix))] + fn current_thread_id() -> u64 { + thread_to_rust_id(&thread::current()) + } + + /// Convert Rust thread to ID (used for non-unix platforms) + #[cfg(not(unix))] + fn thread_to_rust_id(t: &thread::Thread) -> u64 { use core::hash::{Hash, Hasher}; struct U64Hash { v: Option, @@ -304,13 +324,25 @@ pub(crate) mod _thread { self.v.expect("should have written a u64") } } - // TODO: use id.as_u64() once it's stable, until then, ThreadId is just a wrapper - // around NonZeroU64, so this should work (?) let mut h = U64Hash { v: None }; t.id().hash(&mut h); h.finish() } + /// Get thread ID for a given thread handle (used by start_new_thread) + fn thread_to_id(handle: &thread::JoinHandle<()>) -> u64 { + #[cfg(unix)] + { + // On Unix, use pthread ID from the handle + use std::os::unix::thread::JoinHandleExt; + handle.as_pthread_t() as u64 + } + #[cfg(not(unix))] + { + thread_to_rust_id(handle.thread()) + } + } + #[pyfunction] const fn allocate_lock() -> Lock { Lock { mu: RawMutex::INIT } @@ -343,9 +375,9 @@ pub(crate) mod _thread { ) .map(|handle| { vm.state.thread_count.fetch_add(1); - thread_to_id(handle.thread()) + thread_to_id(&handle) }) - .map_err(|err| err.to_pyexception(vm)) + .map_err(|err| vm.new_runtime_error(format!("can't start new thread: {err}"))) } fn run_thread(func: ArgCallable, args: FuncArgs, vm: &VirtualMachine) { @@ -365,9 +397,24 @@ pub(crate) mod _thread { unsafe { lock.mu.unlock() }; } } + // Clean up thread-local storage while VM context is still active + // This ensures __del__ methods are called properly + cleanup_thread_local_data(); + // Clean up frame tracking + crate::vm::thread::cleanup_current_thread_frames(vm); vm.state.thread_count.fetch_sub(1); } + /// Clean up thread-local data for the current thread. + /// This triggers __del__ on objects stored in thread-local variables. + fn cleanup_thread_local_data() { + // Take all guards - this will trigger LocalGuard::drop for each, + // which removes the thread's dict from each Local instance + LOCAL_GUARDS.with(|guards| { + guards.borrow_mut().clear(); + }); + } + #[cfg(not(target_arch = "wasm32"))] #[pyfunction] fn interrupt_main(signum: OptionalArg, vm: &VirtualMachine) -> PyResult<()> { @@ -400,6 +447,107 @@ pub(crate) mod _thread { vm.state.thread_count.load() } + #[pyfunction] + fn daemon_threads_allowed() -> bool { + // RustPython always allows daemon threads + true + } + + // Registry for non-daemon threads that need to be joined at shutdown + pub type ShutdownEntry = ( + std::sync::Weak>, + std::sync::Weak<(parking_lot::Mutex, parking_lot::Condvar)>, + ); + + #[pyfunction] + fn _shutdown(vm: &VirtualMachine) { + // Wait for all non-daemon threads to finish + let current_ident = get_ident(); + + loop { + // Find a thread that's not finished and not the current thread + let handle_to_join = { + let mut handles = vm.state.shutdown_handles.lock(); + // Clean up finished entries + handles.retain(|(inner_weak, _): &ShutdownEntry| { + inner_weak.upgrade().map_or(false, |inner| { + let guard = inner.lock(); + guard.state != ThreadHandleState::Done && guard.ident != current_ident + }) + }); + + // Find first unfinished handle + handles + .iter() + .find_map(|(inner_weak, done_event_weak): &ShutdownEntry| { + let inner = inner_weak.upgrade()?; + let done_event = done_event_weak.upgrade()?; + let guard = inner.lock(); + if guard.state != ThreadHandleState::Done && guard.ident != current_ident { + Some((inner.clone(), done_event.clone())) + } else { + None + } + }) + }; + + match handle_to_join { + Some((_, done_event)) => { + // Wait for this thread to finish (infinite timeout) + // Only check done flag to avoid lock ordering issues + // (done_event lock vs inner lock) + let (lock, cvar) = &*done_event; + let mut done = lock.lock(); + while !*done { + cvar.wait(&mut done); + } + } + None => break, // No more threads to wait on + } + } + } + + /// Add a non-daemon thread handle to the shutdown registry + fn add_to_shutdown_handles( + vm: &VirtualMachine, + inner: &std::sync::Arc>, + done_event: &std::sync::Arc<(parking_lot::Mutex, parking_lot::Condvar)>, + ) { + let mut handles = vm.state.shutdown_handles.lock(); + handles.push(( + std::sync::Arc::downgrade(inner), + std::sync::Arc::downgrade(done_event), + )); + } + + #[pyfunction] + fn _make_thread_handle(ident: u64, vm: &VirtualMachine) -> PyRef { + let handle = ThreadHandle::new(vm); + { + let mut inner = handle.inner.lock(); + inner.ident = ident; + inner.state = ThreadHandleState::Running; + } + handle.into_ref(&vm.ctx) + } + + #[pyfunction] + fn _get_main_thread_ident(vm: &VirtualMachine) -> u64 { + vm.state.main_thread_ident.load() + } + + #[pyfunction] + fn _is_main_interpreter() -> bool { + // RustPython only has one interpreter + true + } + + /// Initialize the main thread ident. Should be called once at interpreter startup. + pub fn init_main_thread_ident(vm: &VirtualMachine) { + let ident = get_ident(); + vm.state.main_thread_ident.store(ident); + } + /// ExceptHookArgs - simple class to hold exception hook arguments /// This allows threading.py to import _excepthook and _ExceptHookArgs from _thread #[pyattr] @@ -532,23 +680,92 @@ pub(crate) mod _thread { Ok(()) } + // Thread-local storage for cleanup guards + // When a thread terminates, the guard is dropped, which triggers cleanup + thread_local! { + static LOCAL_GUARDS: std::cell::RefCell> = const { std::cell::RefCell::new(Vec::new()) }; + } + + // Guard that removes thread-local data when dropped + struct LocalGuard { + local: std::sync::Weak, + thread_id: std::thread::ThreadId, + } + + impl Drop for LocalGuard { + fn drop(&mut self) { + if let Some(local_data) = self.local.upgrade() { + // Remove from map while holding the lock, but drop the value + // outside the lock to prevent deadlock if __del__ accesses _local + let removed = local_data.data.lock().remove(&self.thread_id); + drop(removed); + } + } + } + + // Shared data structure for Local + struct LocalData { + data: parking_lot::Mutex>, + } + + impl std::fmt::Debug for LocalData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LocalData").finish_non_exhaustive() + } + } + #[pyattr] - #[pyclass(module = "thread", name = "_local")] + #[pyclass(module = "_thread", name = "_local")] #[derive(Debug, PyPayload)] struct Local { - data: ThreadLocal, + inner: std::sync::Arc, } #[pyclass(with(GetAttr, SetAttr), flags(BASETYPE))] impl Local { fn l_dict(&self, vm: &VirtualMachine) -> PyDictRef { - self.data.get_or(|| vm.ctx.new_dict()).clone() + let thread_id = std::thread::current().id(); + + // Fast path: check if dict exists under lock + if let Some(dict) = self.inner.data.lock().get(&thread_id).cloned() { + return dict; + } + + // Slow path: allocate dict outside lock to reduce lock hold time + let new_dict = vm.ctx.new_dict(); + + // Insert with double-check to handle races + let mut data = self.inner.data.lock(); + use std::collections::hash_map::Entry; + let (dict, need_guard) = match data.entry(thread_id) { + Entry::Occupied(e) => (e.get().clone(), false), + Entry::Vacant(e) => { + e.insert(new_dict.clone()); + (new_dict, true) + } + }; + drop(data); // Release lock before TLS access + + // Register cleanup guard only if we inserted a new entry + if need_guard { + let guard = LocalGuard { + local: std::sync::Arc::downgrade(&self.inner), + thread_id, + }; + LOCAL_GUARDS.with(|guards| { + guards.borrow_mut().push(guard); + }); + } + + dict } #[pyslot] fn slot_new(cls: PyTypeRef, _args: FuncArgs, vm: &VirtualMachine) -> PyResult { Self { - data: ThreadLocal::new(), + inner: std::sync::Arc::new(LocalData { + data: parking_lot::Mutex::new(std::collections::HashMap::new()), + }), } .into_ref_with_type(vm, cls) .map(Into::into) @@ -597,4 +814,354 @@ pub(crate) mod _thread { } } } + + // Registry of all ThreadHandles for fork cleanup + // Stores weak references so handles can be garbage collected normally + pub type HandleEntry = ( + std::sync::Weak>, + std::sync::Weak<(parking_lot::Mutex, parking_lot::Condvar)>, + ); + + // Re-export type from vm::thread for PyGlobalState + pub use crate::vm::thread::CurrentFrameSlot; + + /// Get all threads' current frames. Used by sys._current_frames(). + pub fn get_all_current_frames(vm: &VirtualMachine) -> Vec<(u64, FrameRef)> { + let registry = vm.state.thread_frames.lock(); + registry + .iter() + .filter_map(|(id, slot)| slot.lock().clone().map(|f| (*id, f))) + .collect() + } + + /// Called after fork() in child process to mark all other threads as done. + /// This prevents join() from hanging on threads that don't exist in the child. + pub fn after_fork_child(vm: &VirtualMachine) { + let current_ident = get_ident(); + + // Update main thread ident - after fork, the current thread becomes the main thread + vm.state.main_thread_ident.store(current_ident); + + // Reinitialize frame slot for current thread + crate::vm::thread::reinit_frame_slot_after_fork(vm); + + // Clean up thread handles if we can acquire the lock. + // Use try_lock because the mutex might have been held during fork. + // If we can't acquire it, just skip - the child process will work + // correctly with new handles it creates. + if let Some(mut handles) = vm.state.thread_handles.try_lock() { + // Clean up dead weak refs and mark non-current threads as done + handles.retain(|(inner_weak, done_event_weak): &HandleEntry| { + let Some(inner) = inner_weak.upgrade() else { + return false; // Remove dead entries + }; + let Some(done_event) = done_event_weak.upgrade() else { + return false; + }; + + // Try to lock the inner state - skip if we can't + let Some(mut inner_guard) = inner.try_lock() else { + return false; + }; + + // Skip current thread and not-started threads + if inner_guard.ident == current_ident { + return true; + } + if inner_guard.state == ThreadHandleState::NotStarted { + return true; + } + + // Mark as done and notify waiters + inner_guard.state = ThreadHandleState::Done; + inner_guard.join_handle = None; // Can't join OS thread from child + drop(inner_guard); + + // Try to notify waiters - skip if we can't acquire the lock + let (lock, cvar) = &*done_event; + if let Some(mut done) = lock.try_lock() { + *done = true; + cvar.notify_all(); + } + + true + }); + } + } + + // Thread handle state enum + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum ThreadHandleState { + NotStarted, + Starting, + Running, + Done, + } + + // Internal shared state for thread handle + pub struct ThreadHandleInner { + pub state: ThreadHandleState, + pub ident: u64, + pub join_handle: Option>, + pub joining: bool, // True if a thread is currently joining + pub joined: bool, // Track if join has completed + } + + impl fmt::Debug for ThreadHandleInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadHandleInner") + .field("state", &self.state) + .field("ident", &self.ident) + .field("join_handle", &self.join_handle.is_some()) + .field("joining", &self.joining) + .field("joined", &self.joined) + .finish() + } + } + + /// _ThreadHandle - handle for joinable threads + #[pyattr] + #[pyclass(module = "_thread", name = "_ThreadHandle")] + #[derive(Debug, PyPayload)] + struct ThreadHandle { + inner: std::sync::Arc>, + // Event to signal thread completion (for timed join support) + done_event: std::sync::Arc<(parking_lot::Mutex, parking_lot::Condvar)>, + } + + #[pyclass] + impl ThreadHandle { + fn new(vm: &VirtualMachine) -> Self { + let inner = std::sync::Arc::new(parking_lot::Mutex::new(ThreadHandleInner { + state: ThreadHandleState::NotStarted, + ident: 0, + join_handle: None, + joining: false, + joined: false, + })); + let done_event = + std::sync::Arc::new((parking_lot::Mutex::new(false), parking_lot::Condvar::new())); + + // Register in global registry for fork cleanup + vm.state.thread_handles.lock().push(( + std::sync::Arc::downgrade(&inner), + std::sync::Arc::downgrade(&done_event), + )); + + Self { inner, done_event } + } + + #[pygetset] + fn ident(&self) -> u64 { + self.inner.lock().ident + } + + #[pymethod] + fn is_done(&self) -> bool { + self.inner.lock().state == ThreadHandleState::Done + } + + #[pymethod] + fn _set_done(&self) { + self.inner.lock().state = ThreadHandleState::Done; + // Signal waiting threads that this thread is done + let (lock, cvar) = &*self.done_event; + *lock.lock() = true; + cvar.notify_all(); + } + + #[pymethod] + fn join( + &self, + timeout: OptionalArg>>, + vm: &VirtualMachine, + ) -> PyResult<()> { + // Convert timeout to Duration (None or negative = infinite wait) + let timeout_duration = match timeout.flatten() { + Some(Either::A(t)) if t >= 0.0 => Some(Duration::from_secs_f64(t)), + Some(Either::B(t)) if t >= 0 => Some(Duration::from_secs(t as u64)), + _ => None, + }; + + // Check for self-join first + { + let inner = self.inner.lock(); + let current_ident = get_ident(); + if inner.ident == current_ident && inner.state == ThreadHandleState::Running { + return Err(vm.new_runtime_error("cannot join current thread".to_owned())); + } + } + + // Wait for thread completion using Condvar (supports timeout) + // Loop to handle spurious wakeups + let (lock, cvar) = &*self.done_event; + let mut done = lock.lock(); + + while !*done { + if let Some(timeout) = timeout_duration { + let result = cvar.wait_for(&mut done, timeout); + if result.timed_out() && !*done { + // Timeout occurred and done is still false + return Ok(()); + } + } else { + // Infinite wait + cvar.wait(&mut done); + } + } + drop(done); + + // Thread is done, now perform cleanup + let join_handle = { + let mut inner = self.inner.lock(); + + // If already joined, return immediately (idempotent) + if inner.joined { + return Ok(()); + } + + // If another thread is already joining, wait for them to finish + if inner.joining { + drop(inner); + // Wait on done_event + let (lock, cvar) = &*self.done_event; + let mut done = lock.lock(); + while !*done { + cvar.wait(&mut done); + } + return Ok(()); + } + + // Mark that we're joining + inner.joining = true; + + // Take the join handle if available + inner.join_handle.take() + }; + + // Perform the actual join outside the lock + if let Some(handle) = join_handle { + // Ignore the result - panics in spawned threads are already handled + let _ = handle.join(); + } + + // Mark as joined and clear joining flag + { + let mut inner = self.inner.lock(); + inner.joined = true; + inner.joining = false; + } + + Ok(()) + } + + #[pyslot] + fn slot_new(cls: PyTypeRef, _args: FuncArgs, vm: &VirtualMachine) -> PyResult { + ThreadHandle::new(vm) + .into_ref_with_type(vm, cls) + .map(Into::into) + } + } + + #[derive(FromArgs)] + struct StartJoinableThreadArgs { + #[pyarg(positional)] + function: ArgCallable, + #[pyarg(any, optional)] + handle: OptionalArg>, + #[pyarg(any, default = true)] + daemon: bool, + } + + #[pyfunction] + fn start_joinable_thread( + args: StartJoinableThreadArgs, + vm: &VirtualMachine, + ) -> PyResult> { + let handle = match args.handle { + OptionalArg::Present(h) => h, + OptionalArg::Missing => ThreadHandle::new(vm).into_ref(&vm.ctx), + }; + + // Mark as starting + handle.inner.lock().state = ThreadHandleState::Starting; + + // Add non-daemon threads to shutdown registry so _shutdown() will wait for them + if !args.daemon { + add_to_shutdown_handles(vm, &handle.inner, &handle.done_event); + } + + let func = args.function; + let handle_clone = handle.clone(); + let inner_clone = handle.inner.clone(); + let done_event_clone = handle.done_event.clone(); + + let mut thread_builder = thread::Builder::new(); + let stacksize = vm.state.stacksize.load(); + if stacksize != 0 { + thread_builder = thread_builder.stack_size(stacksize); + } + + let join_handle = thread_builder + .spawn(vm.new_thread().make_spawn_func(move |vm| { + // Set ident and mark as running + { + let mut inner = inner_clone.lock(); + inner.ident = get_ident(); + inner.state = ThreadHandleState::Running; + } + + // Ensure cleanup happens even if the function panics + let inner_for_cleanup = inner_clone.clone(); + let done_event_for_cleanup = done_event_clone.clone(); + let vm_state = vm.state.clone(); + scopeguard::defer! { + // Mark as done + inner_for_cleanup.lock().state = ThreadHandleState::Done; + + // Signal waiting threads that this thread is done + { + let (lock, cvar) = &*done_event_for_cleanup; + *lock.lock() = true; + cvar.notify_all(); + } + + // Handle sentinels + for lock in SENTINELS.take() { + if lock.mu.is_locked() { + unsafe { lock.mu.unlock() }; + } + } + + // Clean up thread-local data while VM context is still active + cleanup_thread_local_data(); + + // Clean up frame tracking + crate::vm::thread::cleanup_current_thread_frames(vm); + + vm_state.thread_count.fetch_sub(1); + } + + // Run the function + match func.invoke((), vm) { + Ok(_) => {} + Err(e) if e.fast_isinstance(vm.ctx.exceptions.system_exit) => {} + Err(exc) => { + vm.run_unraisable( + exc, + Some("Exception ignored in thread started by".to_owned()), + func.into(), + ); + } + } + })) + .map_err(|err| vm.new_runtime_error(format!("can't start new thread: {err}")))?; + + vm.state.thread_count.fetch_add(1); + + // Store the join handle + handle.inner.lock().join_handle = Some(join_handle); + + Ok(handle_clone) + } } diff --git a/crates/vm/src/vm/interpreter.rs b/crates/vm/src/vm/interpreter.rs index 8d37ad6c840..6faef040a0e 100644 --- a/crates/vm/src/vm/interpreter.rs +++ b/crates/vm/src/vm/interpreter.rs @@ -110,9 +110,10 @@ impl Interpreter { /// Finalize vm and turns an exception to exit code. /// - /// Finalization steps including 4 steps: + /// Finalization steps including 5 steps: /// 1. Flush stdout and stderr. /// 1. Handle exit exception and turn it to exit code. + /// 1. Wait for non-daemon threads (threading._shutdown). /// 1. Run atexit exit functions. /// 1. Mark vm as finalized. /// @@ -128,6 +129,9 @@ impl Interpreter { 0 }; + // Wait for non-daemon threads (wait_for_thread_shutdown) + wait_for_thread_shutdown(vm); + atexit::_run_exitfuncs(vm); vm.state.finalizing.store(true, Ordering::Release); @@ -139,6 +143,35 @@ impl Interpreter { } } +/// Wait until threading._shutdown completes, provided +/// the threading module was imported in the first place. +/// The shutdown routine will wait until all non-daemon +/// "threading" threads have completed. +fn wait_for_thread_shutdown(vm: &VirtualMachine) { + // Try to get the threading module if it was already imported + // Use sys.modules.get("threading") like PyImport_GetModule + let threading = match (|| -> PyResult<_> { + let sys_modules = vm.sys_module.get_attr("modules", vm)?; + let threading = sys_modules.get_item("threading", vm)?; + Ok(threading) + })() { + Ok(module) => module, + Err(_) => { + // threading not imported, nothing to do + return; + } + }; + + // Call threading._shutdown() + if let Err(e) = vm.call_method(&threading, "_shutdown", ()) { + vm.run_unraisable( + e, + Some("Exception ignored on threading shutdown".to_owned()), + threading, + ); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/vm/src/vm/mod.rs b/crates/vm/src/vm/mod.rs index d2172a43a00..7708b5a9176 100644 --- a/crates/vm/src/vm/mod.rs +++ b/crates/vm/src/vm/mod.rs @@ -82,6 +82,10 @@ pub struct VirtualMachine { pub state: PyRc, pub initialized: bool, recursion_depth: Cell, + /// Async generator firstiter hook (per-thread, set via sys.set_asyncgen_hooks) + pub async_gen_firstiter: RefCell>, + /// Async generator finalizer hook (per-thread, set via sys.set_asyncgen_hooks) + pub async_gen_finalizer: RefCell>, } #[derive(Debug, Default)] @@ -107,6 +111,22 @@ pub struct PyGlobalState { pub after_forkers_parent: PyMutex>, pub int_max_str_digits: AtomicCell, pub switch_interval: AtomicCell, + /// Global trace function for all threads (set by sys._settraceallthreads) + pub global_trace_func: PyMutex>, + /// Global profile function for all threads (set by sys._setprofileallthreads) + pub global_profile_func: PyMutex>, + /// Main thread identifier (pthread_self on Unix) + #[cfg(feature = "threading")] + pub main_thread_ident: AtomicCell, + /// Registry of all threads' current frames for sys._current_frames() + #[cfg(feature = "threading")] + pub thread_frames: parking_lot::Mutex>, + /// Registry of all ThreadHandles for fork cleanup + #[cfg(feature = "threading")] + pub thread_handles: parking_lot::Mutex>, + /// Registry for non-daemon threads that need to be joined at shutdown + #[cfg(feature = "threading")] + pub shutdown_handles: parking_lot::Mutex>, } pub fn process_hash_secret_seed() -> u32 { @@ -191,9 +211,21 @@ impl VirtualMachine { after_forkers_parent: PyMutex::default(), int_max_str_digits, switch_interval: AtomicCell::new(0.005), + global_trace_func: PyMutex::default(), + global_profile_func: PyMutex::default(), + #[cfg(feature = "threading")] + main_thread_ident: AtomicCell::new(0), + #[cfg(feature = "threading")] + thread_frames: parking_lot::Mutex::new(HashMap::new()), + #[cfg(feature = "threading")] + thread_handles: parking_lot::Mutex::new(Vec::new()), + #[cfg(feature = "threading")] + shutdown_handles: parking_lot::Mutex::new(Vec::new()), }), initialized: false, recursion_depth: Cell::new(0), + async_gen_firstiter: RefCell::new(None), + async_gen_finalizer: RefCell::new(None), }; if vm.state.hash_secret.hash_str("") @@ -299,6 +331,10 @@ impl VirtualMachine { panic!("Double Initialize Error"); } + // Initialize main thread ident before any threading operations + #[cfg(feature = "threading")] + stdlib::thread::init_main_thread_ident(self); + stdlib::builtins::init_module(self, &self.builtins); stdlib::sys::init_module(self, &self.sys_module, &self.builtins); @@ -604,6 +640,9 @@ impl VirtualMachine { ) -> PyResult { self.with_recursion("", || { self.frames.borrow_mut().push(frame.clone()); + // Update the current frame slot for sys._current_frames() + #[cfg(feature = "threading")] + crate::vm::thread::update_current_frame(Some(frame.clone())); // Push a new exception context for frame isolation // Each frame starts with no active exception (None) // This prevents exceptions from leaking between function calls @@ -613,6 +652,9 @@ impl VirtualMachine { self.pop_exception(); // defer dec frame let _popped = self.frames.borrow_mut().pop(); + // Update the frame slot to the new top frame (or None if empty) + #[cfg(feature = "threading")] + crate::vm::thread::update_current_frame(self.frames.borrow().last().cloned()); result }) } diff --git a/crates/vm/src/vm/thread.rs b/crates/vm/src/vm/thread.rs index 7e8f0f87e56..7188aa6d270 100644 --- a/crates/vm/src/vm/thread.rs +++ b/crates/vm/src/vm/thread.rs @@ -1,17 +1,27 @@ -use crate::{AsObject, PyObject, PyObjectRef, VirtualMachine}; +#[cfg(feature = "threading")] +use crate::frame::FrameRef; +use crate::{AsObject, PyObject, VirtualMachine}; use core::{ cell::{Cell, RefCell}, ptr::NonNull, }; use itertools::Itertools; +#[cfg(feature = "threading")] +use std::sync::Arc; use std::thread_local; +/// Type for current frame slot - shared between threads for sys._current_frames() +#[cfg(feature = "threading")] +pub type CurrentFrameSlot = Arc>>; + thread_local! { pub(super) static VM_STACK: RefCell>> = Vec::with_capacity(1).into(); pub(crate) static COROUTINE_ORIGIN_TRACKING_DEPTH: Cell = const { Cell::new(0) }; - pub(crate) static ASYNC_GEN_FINALIZER: RefCell> = const { RefCell::new(None) }; - pub(crate) static ASYNC_GEN_FIRSTITER: RefCell> = const { RefCell::new(None) }; + + /// Current thread's frame slot for sys._current_frames() + #[cfg(feature = "threading")] + static CURRENT_FRAME_SLOT: RefCell> = const { RefCell::new(None) }; } scoped_tls::scoped_thread_local!(static VM_CURRENT: VirtualMachine); @@ -26,11 +36,74 @@ pub fn with_current_vm(f: impl FnOnce(&VirtualMachine) -> R) -> R { pub fn enter_vm(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R { VM_STACK.with(|vms| { vms.borrow_mut().push(vm.into()); + + // Initialize frame slot for this thread if not already done + #[cfg(feature = "threading")] + init_frame_slot_if_needed(vm); + scopeguard::defer! { vms.borrow_mut().pop(); } VM_CURRENT.set(vm, f) }) } +/// Initialize frame slot for current thread if not already initialized. +/// Called automatically by enter_vm(). +#[cfg(feature = "threading")] +fn init_frame_slot_if_needed(vm: &VirtualMachine) { + CURRENT_FRAME_SLOT.with(|slot| { + if slot.borrow().is_none() { + let thread_id = crate::stdlib::thread::get_ident(); + let new_slot = Arc::new(parking_lot::Mutex::new(None)); + vm.state + .thread_frames + .lock() + .insert(thread_id, new_slot.clone()); + *slot.borrow_mut() = Some(new_slot); + } + }); +} + +/// Update the current thread's frame. Called when frames are pushed/popped. +/// This is a hot path - uses only thread-local storage, no locks. +#[cfg(feature = "threading")] +pub fn update_current_frame(frame: Option) { + CURRENT_FRAME_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + *s.lock() = frame; + } + }); +} + +/// Cleanup frame tracking for the current thread. Called at thread exit. +#[cfg(feature = "threading")] +pub fn cleanup_current_thread_frames(vm: &VirtualMachine) { + let thread_id = crate::stdlib::thread::get_ident(); + vm.state.thread_frames.lock().remove(&thread_id); + CURRENT_FRAME_SLOT.with(|s| { + *s.borrow_mut() = None; + }); +} + +/// Reinitialize frame slot after fork. Called in child process. +/// Creates a fresh slot and registers it for the current thread. +#[cfg(feature = "threading")] +pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { + let current_ident = crate::stdlib::thread::get_ident(); + let new_slot = Arc::new(parking_lot::Mutex::new(None)); + + // Try to update the global registry. If we can't get the lock + // (parent thread might have been holding it during fork), skip. + if let Some(mut registry) = vm.state.thread_frames.try_lock() { + registry.clear(); + registry.insert(current_ident, new_slot.clone()); + } + + // Always update thread-local to point to the new slot + CURRENT_FRAME_SLOT.with(|s| { + *s.borrow_mut() = Some(new_slot); + }); +} + pub fn with_vm(obj: &PyObject, f: F) -> Option where F: Fn(&VirtualMachine) -> R, @@ -139,6 +212,10 @@ impl VirtualMachine { /// specific guaranteed behavior. #[cfg(feature = "threading")] pub fn new_thread(&self) -> ThreadedVirtualMachine { + let global_trace = self.state.global_trace_func.lock().clone(); + let global_profile = self.state.global_profile_func.lock().clone(); + let use_tracing = global_trace.is_some() || global_profile.is_some(); + let vm = Self { builtins: self.builtins.clone(), sys_module: self.sys_module.clone(), @@ -147,9 +224,9 @@ impl VirtualMachine { wasm_id: self.wasm_id.clone(), exceptions: RefCell::default(), import_func: self.import_func.clone(), - profile_func: RefCell::new(self.ctx.none()), - trace_func: RefCell::new(self.ctx.none()), - use_tracing: Cell::new(false), + profile_func: RefCell::new(global_profile.unwrap_or_else(|| self.ctx.none())), + trace_func: RefCell::new(global_trace.unwrap_or_else(|| self.ctx.none())), + use_tracing: Cell::new(use_tracing), recursion_limit: self.recursion_limit.clone(), signal_handlers: None, signal_rx: None, @@ -157,6 +234,8 @@ impl VirtualMachine { state: self.state.clone(), initialized: self.initialized, recursion_depth: Cell::new(0), + async_gen_firstiter: RefCell::new(None), + async_gen_finalizer: RefCell::new(None), }; ThreadedVirtualMachine { vm } }