From ad20942d18fea290a65f1c2f5b1a1b89199691dc Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Wed, 4 Mar 2026 02:06:06 +0900 Subject: [PATCH 1/3] Reinit IO buffer locks after fork to prevent deadlocks BufferedReader/Writer/TextIOWrapper use PyThreadMutex internally. If a parent thread held one of these locks during fork(), the child would deadlock on any IO operation. Add reinit_after_fork() to RawThreadMutex and call it on sys.stdin/ stdout/stderr in the child process fork handler, analogous to CPython's _PyIO_Reinit(). --- crates/common/src/lock.rs | 15 +++++++++ crates/common/src/lock/thread_mutex.rs | 22 +++++++++++++ crates/vm/src/stdlib/io.rs | 45 ++++++++++++++++++++++++++ crates/vm/src/stdlib/posix.rs | 6 ++++ 4 files changed, 88 insertions(+) diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index 74679bd9d8f..f1d00627e90 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -96,3 +96,18 @@ pub unsafe fn reinit_rwlock_after_fork(rwlock: &PyRwLock) { core::ptr::write_bytes(raw, 0, core::mem::size_of::()); } } + +/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`. +/// +/// `PyThreadMutex` is used by buffered IO objects (`BufferedReader`, +/// `BufferedWriter`, `TextIOWrapper`). If a dead parent thread held one of +/// these locks during `fork()`, the child would deadlock on any IO operation. +/// +/// # Safety +/// +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_thread_mutex_after_fork(mutex: &PyThreadMutex) { + unsafe { mutex.raw().reinit_after_fork() } +} diff --git a/crates/common/src/lock/thread_mutex.rs b/crates/common/src/lock/thread_mutex.rs index 2cabf7ea4cd..5b5b89f4eb1 100644 --- a/crates/common/src/lock/thread_mutex.rs +++ b/crates/common/src/lock/thread_mutex.rs @@ -72,6 +72,23 @@ impl RawThreadMutex { } } +impl RawThreadMutex { + /// Reset this mutex to its initial (unlocked, unowned) state after `fork()`. + /// + /// # Safety + /// + /// Must only be called from the single-threaded child process immediately + /// after `fork()`, before any other thread is created. + #[cfg(unix)] + pub unsafe fn reinit_after_fork(&self) { + self.owner.store(0, Ordering::Relaxed); + unsafe { + let mutex_ptr = &self.mutex as *const R as *mut u8; + core::ptr::write_bytes(mutex_ptr, 0, core::mem::size_of::()); + } + } +} + unsafe impl Send for RawThreadMutex {} unsafe impl Sync for RawThreadMutex {} @@ -103,6 +120,11 @@ impl From for ThreadMutex { } } impl ThreadMutex { + /// Access the underlying raw thread mutex. + pub fn raw(&self) -> &RawThreadMutex { + &self.raw + } + pub fn lock(&self) -> Option> { if self.raw.lock() { Some(ThreadMutexGuard { diff --git a/crates/vm/src/stdlib/io.rs b/crates/vm/src/stdlib/io.rs index f75a5dd4014..70a4523181c 100644 --- a/crates/vm/src/stdlib/io.rs +++ b/crates/vm/src/stdlib/io.rs @@ -2,6 +2,8 @@ * I/O core tools. */ pub(crate) use _io::module_def; +#[cfg(all(unix, feature = "threading"))] +pub(crate) use _io::reinit_std_streams_after_fork; cfg_if::cfg_if! { if #[cfg(any(not(target_arch = "wasm32"), target_os = "wasi"))] { @@ -4985,6 +4987,49 @@ mod _io { } } + /// Reinit per-object IO buffer locks on std streams after `fork()`. + #[cfg(all(unix, feature = "threading"))] + pub fn reinit_std_streams_after_fork(vm: &VirtualMachine) { + for name in ["stdin", "stdout", "stderr"] { + let Ok(stream) = vm.sys_module.get_attr(name, vm) else { + continue; + }; + reinit_io_locks(&stream); + } + } + + #[cfg(all(unix, feature = "threading"))] + fn reinit_io_locks(obj: &PyObject) { + use crate::common::lock::reinit_thread_mutex_after_fork; + + if let Some(tio) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&tio.data) }; + if let Some(guard) = tio.data.lock() { + if let Some(ref data) = *guard { + reinit_io_locks(&data.buffer); + } + } + return; + } + if let Some(br) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&br.data) }; + return; + } + if let Some(bw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&bw.data) }; + return; + } + if let Some(brw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&brw.data) }; + return; + } + if let Some(brw) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&brw.read.data) }; + unsafe { reinit_thread_mutex_after_fork(&brw.write.data) }; + return; + } + } + pub fn io_open( file: PyObjectRef, mode: Option<&str>, diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index febb3af9b0d..c1b6b51ccee 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -719,6 +719,12 @@ pub mod module { #[cfg(feature = "threading")] reinit_locks_after_fork(vm); + // Reinit per-object IO buffer locks on std streams. + // BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be + // held by dead parent threads, causing deadlocks on any IO in the child. + #[cfg(feature = "threading")] + crate::stdlib::io::reinit_std_streams_after_fork(vm); + // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork(); crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork(); From 874971c3853baf5c0ebe7f86eff1ba41cb42a0b5 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Wed, 4 Mar 2026 17:41:42 +0900 Subject: [PATCH 2/3] Address review: unsafe fn + decoder lock reinit - Mark reinit_std_streams_after_fork as unsafe fn to encode fork-only precondition, update call site in posix.rs - Reinit IncrementalNewlineDecoder's PyThreadMutex via TextIOWrapper's decoder field to prevent child deadlocks --- crates/vm/src/stdlib/io.rs | 14 +++++++++++++- crates/vm/src/stdlib/posix.rs | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/vm/src/stdlib/io.rs b/crates/vm/src/stdlib/io.rs index 70a4523181c..945042bc9e4 100644 --- a/crates/vm/src/stdlib/io.rs +++ b/crates/vm/src/stdlib/io.rs @@ -4988,8 +4988,13 @@ mod _io { } /// Reinit per-object IO buffer locks on std streams after `fork()`. + /// + /// # Safety + /// + /// Must only be called from the single-threaded child process immediately + /// after `fork()`, before any other thread is created. #[cfg(all(unix, feature = "threading"))] - pub fn reinit_std_streams_after_fork(vm: &VirtualMachine) { + pub unsafe fn reinit_std_streams_after_fork(vm: &VirtualMachine) { for name in ["stdin", "stdout", "stderr"] { let Ok(stream) = vm.sys_module.get_attr(name, vm) else { continue; @@ -5006,11 +5011,18 @@ mod _io { unsafe { reinit_thread_mutex_after_fork(&tio.data) }; if let Some(guard) = tio.data.lock() { if let Some(ref data) = *guard { + if let Some(ref decoder) = data.decoder { + reinit_io_locks(decoder); + } reinit_io_locks(&data.buffer); } } return; } + if let Some(nl) = obj.downcast_ref::() { + unsafe { reinit_thread_mutex_after_fork(&nl.data) }; + return; + } if let Some(br) = obj.downcast_ref::() { unsafe { reinit_thread_mutex_after_fork(&br.data) }; return; diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index c1b6b51ccee..09d3cf032f8 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -723,7 +723,7 @@ pub mod module { // BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be // held by dead parent threads, causing deadlocks on any IO in the child. #[cfg(feature = "threading")] - crate::stdlib::io::reinit_std_streams_after_fork(vm); + unsafe { crate::stdlib::io::reinit_std_streams_after_fork(vm) }; // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork(); From 481b31de9978e6c3adb0b4fb1e935d6e0309085a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 4 Mar 2026 08:51:15 +0000 Subject: [PATCH 3/3] Auto-format: cargo fmt --all --- crates/vm/src/stdlib/posix.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index 09d3cf032f8..1adfb8cfe5a 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -723,7 +723,9 @@ pub mod module { // BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be // held by dead parent threads, causing deadlocks on any IO in the child. #[cfg(feature = "threading")] - unsafe { crate::stdlib::io::reinit_std_streams_after_fork(vm) }; + unsafe { + crate::stdlib::io::reinit_std_streams_after_fork(vm) + }; // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork();