From 5f349f9d706b9f3ec6dbf63a6df25a150b3c1ba2 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Wed, 4 Feb 2026 09:23:05 +0900 Subject: [PATCH] Align winapi with CPython behavior --- crates/vm/src/signal.rs | 16 +++ crates/vm/src/stdlib/signal.rs | 8 ++ crates/vm/src/stdlib/winapi.rs | 243 ++++++++++++++++++++++++--------- 3 files changed, 202 insertions(+), 65 deletions(-) diff --git a/crates/vm/src/signal.rs b/crates/vm/src/signal.rs index d0e2997cb72..4846906d611 100644 --- a/crates/vm/src/signal.rs +++ b/crates/vm/src/signal.rs @@ -1,6 +1,8 @@ #![cfg_attr(target_os = "wasi", allow(dead_code))] use crate::{PyResult, VirtualMachine}; use alloc::fmt; +#[cfg(windows)] +use core::sync::atomic::AtomicIsize; use core::sync::atomic::{AtomicBool, Ordering}; use std::cell::Cell; use std::sync::mpsc; @@ -12,6 +14,9 @@ static ANY_TRIGGERED: AtomicBool = AtomicBool::new(false); const ATOMIC_FALSE: AtomicBool = AtomicBool::new(false); pub(crate) static TRIGGERS: [AtomicBool; NSIG] = [ATOMIC_FALSE; NSIG]; +#[cfg(windows)] +static SIGINT_EVENT: AtomicIsize = AtomicIsize::new(0); + thread_local! { /// Prevent recursive signal handler invocation. When a Python signal /// handler is running, new signals are deferred until it completes. @@ -150,3 +155,14 @@ pub fn user_signal_channel() -> (UserSignalSender, UserSignalReceiver) { let (tx, rx) = mpsc::channel(); (UserSignalSender { tx }, UserSignalReceiver { rx }) } + +#[cfg(windows)] +pub fn set_sigint_event(handle: isize) { + SIGINT_EVENT.store(handle, Ordering::Release); +} + +#[cfg(windows)] +pub fn get_sigint_event() -> Option { + let handle = SIGINT_EVENT.load(Ordering::Acquire); + if handle == 0 { None } else { Some(handle) } +} diff --git a/crates/vm/src/stdlib/signal.rs b/crates/vm/src/stdlib/signal.rs index 8b747e04786..33dfc038ef3 100644 --- a/crates/vm/src/stdlib/signal.rs +++ b/crates/vm/src/stdlib/signal.rs @@ -682,6 +682,14 @@ pub(crate) mod _signal { pub extern "C" fn run_signal(signum: i32) { signal::TRIGGERS[signum as usize].store(true, Ordering::Relaxed); signal::set_triggered(); + #[cfg(windows)] + if signum == libc::SIGINT + && let Some(handle) = signal::get_sigint_event() + { + unsafe { + windows_sys::Win32::System::Threading::SetEvent(handle as _); + } + } let wakeup_fd = WAKEUP.load(Ordering::Relaxed); if wakeup_fd != INVALID_WAKEUP { let sigbyte = signum as u8; diff --git a/crates/vm/src/stdlib/winapi.rs b/crates/vm/src/stdlib/winapi.rs index c58a55476a7..16766053058 100644 --- a/crates/vm/src/stdlib/winapi.rs +++ b/crates/vm/src/stdlib/winapi.rs @@ -835,6 +835,7 @@ mod _winapi { pending: bool, completed: bool, read_buffer: Option>, + write_buffer: Option>, } impl std::fmt::Debug for OverlappedInner { @@ -867,23 +868,21 @@ mod _winapi { pending: false, completed: false, read_buffer: None, + write_buffer: None, }), } } #[pymethod] - fn GetOverlappedResult(&self, wait: bool, vm: &VirtualMachine) -> PyResult { - use windows_sys::Win32::Foundation::{ERROR_IO_PENDING, GetLastError}; + fn GetOverlappedResult(&self, wait: bool, vm: &VirtualMachine) -> PyResult<(u32, u32)> { + use windows_sys::Win32::Foundation::{ + ERROR_IO_INCOMPLETE, ERROR_MORE_DATA, ERROR_OPERATION_ABORTED, ERROR_SUCCESS, + GetLastError, + }; use windows_sys::Win32::System::IO::GetOverlappedResult; let mut inner = self.inner.lock(); - // If operation was already completed synchronously (e.g., ERROR_PIPE_CONNECTED), - // return immediately without calling GetOverlappedResult - if inner.completed && !inner.pending { - return Ok(0); - } - let mut transferred: u32 = 0; let ret = unsafe { @@ -895,24 +894,42 @@ mod _winapi { ) }; - if ret == 0 { - let err = unsafe { GetLastError() }; - if err == ERROR_IO_PENDING { + let err = if ret == 0 { + unsafe { GetLastError() } + } else { + ERROR_SUCCESS + }; + + match err { + ERROR_SUCCESS | ERROR_MORE_DATA | ERROR_OPERATION_ABORTED => { + inner.completed = true; + inner.pending = false; + } + ERROR_IO_INCOMPLETE => {} + _ => { + inner.pending = false; return Err(std::io::Error::from_raw_os_error(err as i32).to_pyexception(vm)); } - return Err(std::io::Error::from_raw_os_error(err as i32).to_pyexception(vm)); } - inner.completed = true; - inner.pending = false; - Ok(transferred) + if inner.completed + && let Some(read_buffer) = &mut inner.read_buffer + && transferred != read_buffer.len() as u32 + { + read_buffer.truncate(transferred as usize); + } + + Ok((transferred, err)) } #[pymethod] fn getbuffer(&self, vm: &VirtualMachine) -> PyResult> { let inner = self.inner.lock(); if !inner.completed { - return Err(vm.new_value_error("operation not completed".to_owned())); + return Err(vm.new_value_error( + "can't get read buffer before GetOverlappedResult() signals the operation completed" + .to_owned(), + )); } Ok(inner .read_buffer @@ -924,19 +941,19 @@ mod _winapi { fn cancel(&self, vm: &VirtualMachine) -> PyResult<()> { use windows_sys::Win32::System::IO::CancelIoEx; - let inner = self.inner.lock(); - if !inner.pending { - return Ok(()); - } - - let ret = unsafe { CancelIoEx(inner.handle, &inner.overlapped) }; + let mut inner = self.inner.lock(); + let ret = if inner.pending { + unsafe { CancelIoEx(inner.handle, &inner.overlapped) } + } else { + 1 + }; if ret == 0 { let err = unsafe { windows_sys::Win32::Foundation::GetLastError() }; - // ERROR_NOT_FOUND means operation already completed if err != windows_sys::Win32::Foundation::ERROR_NOT_FOUND { return Err(std::io::Error::from_raw_os_error(err as i32).to_pyexception(vm)); } } + inner.pending = false; Ok(()) } @@ -990,7 +1007,7 @@ mod _winapi { // Overlapped (async) mode let ov = Overlapped::new_with_handle(handle.0); - let ret = { + let _ret = { let mut inner = ov.inner.lock(); unsafe { windows_sys::Win32::System::Pipes::ConnectNamedPipe( @@ -1000,28 +1017,21 @@ mod _winapi { } }; - if ret != 0 { - // Connected immediately - let mut inner = ov.inner.lock(); - inner.completed = true; - } else { - let err = unsafe { GetLastError() }; - match err { - ERROR_IO_PENDING => { - let mut inner = ov.inner.lock(); - inner.pending = true; - } - ERROR_PIPE_CONNECTED => { - // Client was already connected - let mut inner = ov.inner.lock(); - inner.completed = true; - } - _ => { - return Err( - std::io::Error::from_raw_os_error(err as i32).to_pyexception(vm) - ); + let err = unsafe { GetLastError() }; + match err { + ERROR_IO_PENDING => { + let mut inner = ov.inner.lock(); + inner.pending = true; + } + ERROR_PIPE_CONNECTED => { + let inner = ov.inner.lock(); + unsafe { + windows_sys::Win32::System::Threading::SetEvent(inner.overlapped.hEvent); } } + _ => { + return Err(std::io::Error::from_raw_os_error(err as i32).to_pyexception(vm)); + } } Ok(ov.into_pyobject(vm)) @@ -1180,7 +1190,7 @@ mod _winapi { initial_state: bool, name: Option, vm: &VirtualMachine, - ) -> PyResult { + ) -> PyResult> { use windows_sys::Win32::System::Threading::CreateEventW as WinCreateEventW; let _ = security_attributes; // Ignored, always NULL @@ -1197,11 +1207,15 @@ mod _winapi { ) }; - if handle.is_null() { + if handle == INVALID_HANDLE_VALUE { return Err(vm.new_last_os_error()); } - Ok(WinHandle(handle)) + if handle.is_null() { + return Ok(None); + } + + Ok(Some(WinHandle(handle))) } /// SetEvent - Set the specified event object to the signaled state. @@ -1225,21 +1239,54 @@ mod _winapi { buffer: crate::function::ArgBytesLike, use_overlapped: OptionalArg, vm: &VirtualMachine, - ) -> PyResult<(u32, u32)> { + ) -> PyResult { use windows_sys::Win32::Storage::FileSystem::WriteFile as WinWriteFile; let use_overlapped = use_overlapped.unwrap_or(false); + let buf = buffer.borrow_buf(); + let len = core::cmp::min(buf.len(), u32::MAX as usize) as u32; if use_overlapped { - return Err(vm.new_not_implemented_error( - "overlapped WriteFile is not yet implemented in _winapi".to_string(), - )); + use windows_sys::Win32::Foundation::ERROR_IO_PENDING; + + let ov = Overlapped::new_with_handle(handle.0); + let err = { + let mut inner = ov.inner.lock(); + inner.write_buffer = Some(buf.to_vec()); + let write_buf = inner.write_buffer.as_ref().unwrap(); + let mut written: u32 = 0; + let ret = unsafe { + WinWriteFile( + handle.0, + write_buf.as_ptr() as *const _, + len, + &mut written, + &mut inner.overlapped, + ) + }; + + let err = if ret == 0 { + unsafe { windows_sys::Win32::Foundation::GetLastError() } + } else { + 0 + }; + + if ret == 0 && err != ERROR_IO_PENDING { + return Err(vm.new_last_os_error()); + } + if ret == 0 && err == ERROR_IO_PENDING { + inner.pending = true; + } + + err + }; + let result = vm + .ctx + .new_tuple(vec![ov.into_pyobject(vm), vm.ctx.new_int(err).into()]); + return Ok(result.into()); } - let buf = buffer.borrow_buf(); - let len = core::cmp::min(buf.len(), u32::MAX as usize) as u32; let mut written: u32 = 0; - let ret = unsafe { WinWriteFile( handle.0, @@ -1249,18 +1296,21 @@ mod _winapi { null_mut(), ) }; - let err = if ret == 0 { unsafe { windows_sys::Win32::Foundation::GetLastError() } } else { 0 }; - if ret == 0 { return Err(vm.new_last_os_error()); } - - Ok((written, err)) + Ok(vm + .ctx + .new_tuple(vec![ + vm.ctx.new_int(written).into(), + vm.ctx.new_int(err).into(), + ]) + .into()) } const MAXIMUM_WAIT_OBJECTS: usize = 64; @@ -1316,8 +1366,28 @@ mod _winapi { i = end; } + #[cfg(feature = "threading")] + let sigint_event = { + let is_main = crate::stdlib::thread::get_ident() == vm.state.main_thread_ident.load(); + if is_main { + let handle = crate::signal::get_sigint_event().unwrap_or_else(|| { + let handle = unsafe { WinCreateEventW(null(), 1, 0, null()) }; + if !handle.is_null() { + crate::signal::set_sigint_event(handle as isize); + } + handle as isize + }); + if handle == 0 { None } else { Some(handle) } + } else { + None + } + }; + #[cfg(not(feature = "threading"))] + let sigint_event: Option = None; + if wait_all { // For wait_all, we wait sequentially for each batch + let mut err: Option = None; let deadline = if milliseconds != WIN_INFINITE { Some(unsafe { GetTickCount64() } + milliseconds as u64) } else { @@ -1328,9 +1398,8 @@ mod _winapi { let timeout = if let Some(deadline) = deadline { let now = unsafe { GetTickCount64() }; if now >= deadline { - return Err( - vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned()) - ); + err = Some(windows_sys::Win32::Foundation::WAIT_TIMEOUT); + break; } (deadline - now) as u32 } else { @@ -1348,11 +1417,42 @@ mod _winapi { }; if result == WAIT_FAILED { - return Err(vm.new_last_os_error()); + err = Some(unsafe { windows_sys::Win32::Foundation::GetLastError() }); + break; } if result == windows_sys::Win32::Foundation::WAIT_TIMEOUT { + err = Some(windows_sys::Win32::Foundation::WAIT_TIMEOUT); + break; + } + + if let Some(sigint_event) = sigint_event { + let sig_result = unsafe { + windows_sys::Win32::System::Threading::WaitForSingleObject( + sigint_event as _, + 0, + ) + }; + if sig_result == WAIT_OBJECT_0 { + err = Some(windows_sys::Win32::Foundation::ERROR_CONTROL_C_EXIT); + break; + } + if sig_result == WAIT_FAILED { + err = Some(unsafe { windows_sys::Win32::Foundation::GetLastError() }); + break; + } + } + } + + if let Some(err) = err { + if err == windows_sys::Win32::Foundation::WAIT_TIMEOUT { return Err(vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned())); } + if err == windows_sys::Win32::Foundation::ERROR_CONTROL_C_EXIT { + return Err(vm + .new_errno_error(libc::EINTR, "Interrupted system call") + .upcast()); + } + return Err(vm.new_os_error(err as i32)); } Ok(vm.ctx.none()) @@ -1453,7 +1553,10 @@ mod _winapi { } // Wait for any thread to complete - let thread_handles_raw: Vec<_> = thread_handles.iter().map(|&h| h as _).collect(); + let mut thread_handles_raw: Vec<_> = thread_handles.iter().map(|&h| h as _).collect(); + if let Some(sigint_event) = sigint_event { + thread_handles_raw.push(sigint_event as _); + } let result = unsafe { WaitForMultipleObjects( thread_handles_raw.len() as u32, @@ -1467,6 +1570,10 @@ mod _winapi { Some(unsafe { windows_sys::Win32::Foundation::GetLastError() }) } else if result == windows_sys::Win32::Foundation::WAIT_TIMEOUT { Some(windows_sys::Win32::Foundation::WAIT_TIMEOUT) + } else if sigint_event.is_some() + && result == WAIT_OBJECT_0 + thread_handles_raw.len() as u32 + { + Some(windows_sys::Win32::Foundation::ERROR_CONTROL_C_EXIT) } else { None }; @@ -1475,10 +1582,11 @@ mod _winapi { unsafe { WinSetEvent(cancel_event) }; // Wait for all threads to finish + let thread_handles_only: Vec<_> = thread_handles.iter().map(|&h| h as _).collect(); unsafe { WaitForMultipleObjects( - thread_handles_raw.len() as u32, - thread_handles_raw.as_ptr(), + thread_handles_only.len() as u32, + thread_handles_only.as_ptr(), 1, // wait_all WIN_INFINITE, ) @@ -1508,6 +1616,11 @@ mod _winapi { if e == windows_sys::Win32::Foundation::WAIT_TIMEOUT { return Err(vm.new_exception_empty(vm.ctx.exceptions.timeout_error.to_owned())); } + if e == windows_sys::Win32::Foundation::ERROR_CONTROL_C_EXIT { + return Err(vm + .new_errno_error(libc::EINTR, "Interrupted system call") + .upcast()); + } return Err(vm.new_os_error(e as i32)); }