X Tutup
Skip to content
Merged
1 change: 0 additions & 1 deletion Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2876,7 +2876,6 @@ def test_get_event_loop_after_set_none(self):
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)

@unittest.expectedFailure # TODO: RUSTPYTHON; - mock.patch doesn't work correctly with threading.current_thread
@mock.patch('asyncio.events.threading.current_thread')
def test_get_event_loop_thread(self, m_current_thread):

Expand Down
2 changes: 0 additions & 2 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,6 @@ def import_threading():
self.assertEqual(out, b'')
self.assertEqual(err, b'')

# TODO: RUSTPYTHON - __del__ not called during interpreter finalization (no cyclic GC)
@unittest.expectedFailure
def test_start_new_thread_at_finalization(self):
code = """if 1:
import _thread
Expand Down
29 changes: 29 additions & 0 deletions crates/common/src/lock/thread_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
.is_some()
}

/// Like `lock()` but wraps the blocking wait in `wrap_fn`.
/// The caller can use this to detach thread state while waiting.
pub fn lock_wrapped<F: FnOnce(&dyn Fn())>(&self, wrap_fn: F) -> bool {
let id = self.get_thread_id.nonzero_thread_id().get();
if self.owner.load(Ordering::Relaxed) == id {
return false;
}
wrap_fn(&|| self.mutex.lock());
self.owner.store(id, Ordering::Relaxed);
true
}

/// Returns `Some(true)` if able to successfully lock without blocking, `Some(false)`
/// otherwise, and `None` when the mutex is already locked on the current thread.
pub fn try_lock(&self) -> Option<bool> {
Expand Down Expand Up @@ -135,6 +147,23 @@ impl<R: RawMutex, G: GetThreadId, T: ?Sized> ThreadMutex<R, G, T> {
None
}
}

/// Like `lock()` but wraps the blocking wait in `wrap_fn`.
/// The caller can use this to detach thread state while waiting.
pub fn lock_wrapped<F: FnOnce(&dyn Fn())>(
&self,
wrap_fn: F,
) -> Option<ThreadMutexGuard<'_, R, G, T>> {
if self.raw.lock_wrapped(wrap_fn) {
Some(ThreadMutexGuard {
mu: self,
marker: PhantomData,
})
} else {
None
}
}

pub fn try_lock(&self) -> Result<ThreadMutexGuard<'_, R, G, T>, TryLockThreadError> {
match self.raw.try_lock() {
Some(true) => Ok(ThreadMutexGuard {
Expand Down
17 changes: 9 additions & 8 deletions crates/stdlib/src/multiprocessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,15 @@ mod _multiprocessing {
tv_sec: (delay / 1_000_000) as _,
tv_usec: (delay % 1_000_000) as _,
};
unsafe {
vm.allow_threads(|| unsafe {
libc::select(
0,
core::ptr::null_mut(),
core::ptr::null_mut(),
core::ptr::null_mut(),
&mut tv_delay,
)
};
});

// check for signals - preserve the exception (e.g., KeyboardInterrupt)
if let Err(exc) = vm.check_signals() {
Expand Down Expand Up @@ -710,13 +710,13 @@ mod _multiprocessing {
#[cfg(not(target_vendor = "apple"))]
{
loop {
let sem_ptr = self.handle.as_ptr();
// Py_BEGIN_ALLOW_THREADS / Py_END_ALLOW_THREADS
// RustPython doesn't have GIL, so we just do the wait
if let Some(ref dl) = deadline {
res = unsafe { libc::sem_timedwait(self.handle.as_ptr(), dl) };
res = if let Some(ref dl) = deadline {
vm.allow_threads(|| unsafe { libc::sem_timedwait(sem_ptr, dl) })
} else {
res = unsafe { libc::sem_wait(self.handle.as_ptr()) };
}
vm.allow_threads(|| unsafe { libc::sem_wait(sem_ptr) })
};

if res >= 0 {
break;
Expand Down Expand Up @@ -750,7 +750,8 @@ mod _multiprocessing {
} else {
// No timeout: use sem_wait (available on macOS)
loop {
res = unsafe { libc::sem_wait(self.handle.as_ptr()) };
let sem_ptr = self.handle.as_ptr();
res = vm.allow_threads(|| unsafe { libc::sem_wait(sem_ptr) });
if res >= 0 {
break;
}
Expand Down
20 changes: 11 additions & 9 deletions crates/stdlib/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ mod decl {

loop {
let mut tv = timeout.map(sec_to_timeval);
let res = vm.allow_threads(|| {
super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut())
});
let res = vm.allow_threads(|| super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut()));

match res {
Ok(_) => break,
Expand Down Expand Up @@ -504,7 +502,9 @@ mod decl {
let deadline = timeout.map(|d| Instant::now() + d);
let mut poll_timeout = timeout_ms;
loop {
let res = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as _, poll_timeout) };
let res = vm.allow_threads(|| unsafe {
libc::poll(fds.as_mut_ptr(), fds.len() as _, poll_timeout)
});
match nix::Error::result(res) {
Ok(_) => break,
Err(nix::Error::EINTR) => vm.check_signals()?,
Expand Down Expand Up @@ -697,11 +697,13 @@ mod decl {

loop {
events.clear();
match epoll::wait(
epoll,
rustix::buffer::spare_capacity(&mut events),
poll_timeout.as_ref(),
) {
match vm.allow_threads(|| {
epoll::wait(
epoll,
rustix::buffer::spare_capacity(&mut events),
poll_timeout.as_ref(),
)
}) {
Ok(_) => break,
Err(rustix::io::Errno::INTR) => vm.check_signals()?,
Err(e) => return Err(e.into_pyexception(vm)),
Expand Down
4 changes: 3 additions & 1 deletion crates/vm/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,9 @@ impl ExecutingFrame<'_> {
}
}

if let Err(exception) = vm.check_signals() {
if vm.eval_breaker_tripped()
&& let Err(exception) = vm.check_signals()
{
#[cold]
fn handle_signal_exception(
frame: &mut ExecutingFrame<'_>,
Expand Down
5 changes: 5 additions & 0 deletions crates/vm/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub(crate) fn set_triggered() {
ANY_TRIGGERED.store(true, Ordering::Release);
}

#[inline(always)]
pub(crate) fn is_triggered() -> bool {
ANY_TRIGGERED.load(Ordering::Relaxed)
}

/// Reset all signal trigger state after fork in child process.
/// Stale triggers from the parent must not fire in the child.
#[cfg(unix)]
Expand Down
24 changes: 13 additions & 11 deletions crates/vm/src/stdlib/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ mod _io {

fn lock(&self, vm: &VirtualMachine) -> PyResult<PyThreadMutexGuard<'_, BufferedData>> {
self.data()
.lock()
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
.ok_or_else(|| vm.new_runtime_error("reentrant call inside buffered io"))
}

Expand Down Expand Up @@ -2812,7 +2812,7 @@ mod _io {
vm: &VirtualMachine,
) -> PyResult<PyThreadMutexGuard<'_, Option<TextIOData>>> {
self.data
.lock()
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
.ok_or_else(|| vm.new_runtime_error("reentrant call inside textio"))
}

Expand Down Expand Up @@ -4158,7 +4158,7 @@ mod _io {
vm: &VirtualMachine,
) -> PyResult<PyThreadMutexGuard<'_, Option<IncrementalNewlineDecoderData>>> {
self.data
.lock()
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
.ok_or_else(|| vm.new_runtime_error("reentrant call inside nldecoder"))
}

Expand Down Expand Up @@ -5336,7 +5336,7 @@ mod fileio {
types::{Constructor, DefaultConstructor, Destructor, Initializer, Representable},
};
use crossbeam_utils::atomic::AtomicCell;
use std::io::{Read, Write};
use std::io::Read;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and get its total line count
wc -l crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 96


🏁 Script executed:

# Read the specific lines mentioned in the review (5339 and 5764-5786)
# Start with line 5339
sed -n '5335,5345p' crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 427


🏁 Script executed:

# Read the lines 5764-5786 to see the full context of the issue
sed -n '5760,5790p' crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 1218


🏁 Script executed:

# Also check around line 5767 mentioned in the comment
sed -n '5760,5800p' crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 1533


🏁 Script executed:

# Search for crt_fd module usage and definition
rg "crt_fd" crates/vm/src/stdlib/io.rs | head -20

Repository: RustPython/RustPython

Length of output: 859


🏁 Script executed:

# Check the Rust std::io::Read documentation behavior - search for how read_to_end is used elsewhere
rg "read_to_end" crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 114


🏁 Script executed:

# Look for crt_fd module definition
fd "crt_fd" crates/

Repository: RustPython/RustPython

Length of output: 92


🏁 Script executed:

# Check if there's a read function being used with crt_fd elsewhere
rg "crt_fd::read" crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 222


🏁 Script executed:

# Search for DEFAULT_BUFFER_SIZE constant
rg "DEFAULT_BUFFER_SIZE" crates/vm/src/stdlib/io.rs | head -5

Repository: RustPython/RustPython

Length of output: 360


🏁 Script executed:

# Check the crt_fd module to understand the read function
cat crates/common/src/crt_fd.rs

Repository: RustPython/RustPython

Length of output: 9961


🏁 Script executed:

# Look for the context where crt_fd::read is used in that loop
rg -B5 -A5 "crt_fd::read\(handle" crates/vm/src/stdlib/io.rs

Repository: RustPython/RustPython

Length of output: 1070


Replace read_to_end() with explicit crt_fd::read() loop to preserve signal checking on EINTR.

The current code uses std::io::Read::read_to_end(), which internally retries on EINTR, preventing the outer error handler from triggering vm.check_signals(). This breaks the unbounded-read path (size < 0) signal handling.

Use a chunked read loop with crt_fd::read() instead, which surfaces EINTR to the error handler:

Replace read_to_end() with chunked crt_fd::read() loop
-                let mut bytes = vec![];
-                // Loop on EINTR (PEP 475)
-                loop {
-                    match vm.allow_threads(|| {
-                        let mut h = handle;
-                        h.read_to_end(&mut bytes)
-                    }) {
-                        Ok(_) => break,
-                        Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
-                            vm.check_signals()?;
-                            continue;
-                        }
-                        // Non-blocking mode: return None if EAGAIN (only if no data read yet)
-                        Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => {
-                            if bytes.is_empty() {
-                                return Ok(None);
-                            }
-                            break;
-                        }
-                        Err(e) => return Err(Self::io_error(zelf, e, vm)),
-                    }
-                }
+                let mut bytes = vec![];
+                loop {
+                    let mut chunk = vec![0; super::DEFAULT_BUFFER_SIZE];
+                    match vm.allow_threads(|| crt_fd::read(handle, &mut chunk)) {
+                        Ok(0) => break,
+                        Ok(n) => bytes.extend_from_slice(&chunk[..n]),
+                        Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
+                            vm.check_signals()?;
+                            continue;
+                        }
+                        Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => {
+                            if bytes.is_empty() {
+                                return Ok(None);
+                            }
+                            break;
+                        }
+                        Err(e) => return Err(Self::io_error(zelf, e, vm)),
+                    }
+                }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm/src/stdlib/io.rs` at line 5339, The code currently uses
std::io::Read::read_to_end(), which hides EINTR and prevents vm.check_signals()
from running on the unbounded-read path (size < 0); replace that call with an
explicit chunked loop that uses crt_fd::read() to fill a Vec<u8> (read
fixed-size chunks, e.g., 8KiB), append each successful chunk to the buffer,
break on EOF (0 bytes), and on read errors return Err so EINTR is propagated to
the outer error handler; make sure to reference and replace the read_to_end()
call site and keep vm.check_signals() semantics by letting crt_fd::read()
surface EINTR instead of swallowing it.


bitflags::bitflags! {
#[derive(Copy, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -5740,12 +5740,12 @@ mod fileio {
"File or stream is not readable".to_owned(),
));
}
let mut handle = zelf.get_fd(vm)?;
let handle = zelf.get_fd(vm)?;
let bytes = if let Some(read_byte) = read_byte.to_usize() {
let mut bytes = vec![0; read_byte];
// Loop on EINTR (PEP 475)
let n = loop {
match handle.read(&mut bytes) {
match vm.allow_threads(|| crt_fd::read(handle, &mut bytes)) {
Ok(n) => break n,
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
vm.check_signals()?;
Expand All @@ -5764,7 +5764,10 @@ mod fileio {
let mut bytes = vec![];
// Loop on EINTR (PEP 475)
loop {
match handle.read_to_end(&mut bytes) {
match vm.allow_threads(|| {
let mut h = handle;
h.read_to_end(&mut bytes)
}) {
Ok(_) => break,
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
vm.check_signals()?;
Expand Down Expand Up @@ -5802,10 +5805,9 @@ mod fileio {
let handle = zelf.get_fd(vm)?;

let mut buf = obj.borrow_buf_mut();
let mut f = handle.take(buf.len() as _);
// Loop on EINTR (PEP 475)
let ret = loop {
match f.read(&mut buf) {
match vm.allow_threads(|| crt_fd::read(handle, &mut buf)) {
Ok(n) => break n,
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
vm.check_signals()?;
Expand Down Expand Up @@ -5835,11 +5837,11 @@ mod fileio {
));
}

let mut handle = zelf.get_fd(vm)?;
let handle = zelf.get_fd(vm)?;

// Loop on EINTR (PEP 475)
let len = loop {
match obj.with_ref(|b| handle.write(b)) {
match obj.with_ref(|b| vm.allow_threads(|| crt_fd::write(handle, b))) {
Ok(n) => break n,
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
vm.check_signals()?;
Expand Down
Loading
Loading
X Tutup