X Tutup
Skip to content

Commit 7fbd0a9

Browse files
committed
More fork safety (RustPython#7380)
* apply more allow_threads * Simplify STW thread state transitions - Fix park_detached_threads: successful CAS no longer sets all_suspended=false, avoiding unnecessary polling rounds - Replace park_timeout(50µs) with park() in wait_while_suspended - Remove redundant self-suspension in attach_thread and detach_thread; the STW controller handles DETACHED→SUSPENDED via park_detached_threads - Add double-check under mutex before condvar wait to prevent lost wakes - Remove dead stats_detach_wait_yields field and add_detach_wait_yields * Representable for ThreadHandle * Set ThreadHandle state to Running in parent thread after spawn Like CPython's ThreadHandle_start, set RUNNING state in the parent thread immediately after spawn() succeeds, rather than in the child. This eliminates a race where join() could see Starting state if called before the child thread executes. Also reverts the macOS skip for test_start_new_thread_failed since the root cause is fixed. * Set ThreadHandle state to Running in parent thread after spawn * Add debug_assert for thread state in start_the_world * Unskip now-passing test_get_event_loop_thread and test_start_new_thread_at_finalization * Wrap IO locks and file ops in allow_threads Add lock_wrapped to ThreadMutex for detaching thread state while waiting on contended locks. Use it for buffered and text IO locks. Wrap FileIO read/write in allow_threads via crt_fd to prevent STW hangs on blocking file operations. * Use std::sync for thread start/ready events Replace parking_lot Mutex/Condvar with std::sync (pthread-based) for started_event and handle_ready_event. This prevents hangs in forked children where parking_lot's global HASHTABLE may be corrupted.
1 parent ca69664 commit 7fbd0a9

File tree

12 files changed

+923
-250
lines changed

12 files changed

+923
-250
lines changed

Lib/test/test_asyncio/test_events.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2876,7 +2876,6 @@ def test_get_event_loop_after_set_none(self):
28762876
policy.set_event_loop(None)
28772877
self.assertRaises(RuntimeError, policy.get_event_loop)
28782878

2879-
@unittest.expectedFailure # TODO: RUSTPYTHON; - mock.patch doesn't work correctly with threading.current_thread
28802879
@mock.patch('asyncio.events.threading.current_thread')
28812880
def test_get_event_loop_thread(self, m_current_thread):
28822881

Lib/test/test_threading.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,8 +1162,6 @@ def import_threading():
11621162
self.assertEqual(out, b'')
11631163
self.assertEqual(err, b'')
11641164

1165-
# TODO: RUSTPYTHON - __del__ not called during interpreter finalization (no cyclic GC)
1166-
@unittest.expectedFailure
11671165
def test_start_new_thread_at_finalization(self):
11681166
code = """if 1:
11691167
import _thread

crates/common/src/lock/thread_mutex.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
5454
.is_some()
5555
}
5656

57+
/// Like `lock()` but wraps the blocking wait in `wrap_fn`.
58+
/// The caller can use this to detach thread state while waiting.
59+
pub fn lock_wrapped<F: FnOnce(&dyn Fn())>(&self, wrap_fn: F) -> bool {
60+
let id = self.get_thread_id.nonzero_thread_id().get();
61+
if self.owner.load(Ordering::Relaxed) == id {
62+
return false;
63+
}
64+
wrap_fn(&|| self.mutex.lock());
65+
self.owner.store(id, Ordering::Relaxed);
66+
true
67+
}
68+
5769
/// Returns `Some(true)` if able to successfully lock without blocking, `Some(false)`
5870
/// otherwise, and `None` when the mutex is already locked on the current thread.
5971
pub fn try_lock(&self) -> Option<bool> {
@@ -135,6 +147,23 @@ impl<R: RawMutex, G: GetThreadId, T: ?Sized> ThreadMutex<R, G, T> {
135147
None
136148
}
137149
}
150+
151+
/// Like `lock()` but wraps the blocking wait in `wrap_fn`.
152+
/// The caller can use this to detach thread state while waiting.
153+
pub fn lock_wrapped<F: FnOnce(&dyn Fn())>(
154+
&self,
155+
wrap_fn: F,
156+
) -> Option<ThreadMutexGuard<'_, R, G, T>> {
157+
if self.raw.lock_wrapped(wrap_fn) {
158+
Some(ThreadMutexGuard {
159+
mu: self,
160+
marker: PhantomData,
161+
})
162+
} else {
163+
None
164+
}
165+
}
166+
138167
pub fn try_lock(&self) -> Result<ThreadMutexGuard<'_, R, G, T>, TryLockThreadError> {
139168
match self.raw.try_lock() {
140169
Some(true) => Ok(ThreadMutexGuard {

crates/stdlib/src/multiprocessing.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -484,15 +484,15 @@ mod _multiprocessing {
484484
tv_sec: (delay / 1_000_000) as _,
485485
tv_usec: (delay % 1_000_000) as _,
486486
};
487-
unsafe {
487+
vm.allow_threads(|| unsafe {
488488
libc::select(
489489
0,
490490
core::ptr::null_mut(),
491491
core::ptr::null_mut(),
492492
core::ptr::null_mut(),
493493
&mut tv_delay,
494494
)
495-
};
495+
});
496496

497497
// check for signals - preserve the exception (e.g., KeyboardInterrupt)
498498
if let Err(exc) = vm.check_signals() {
@@ -710,13 +710,13 @@ mod _multiprocessing {
710710
#[cfg(not(target_vendor = "apple"))]
711711
{
712712
loop {
713+
let sem_ptr = self.handle.as_ptr();
713714
// Py_BEGIN_ALLOW_THREADS / Py_END_ALLOW_THREADS
714-
// RustPython doesn't have GIL, so we just do the wait
715-
if let Some(ref dl) = deadline {
716-
res = unsafe { libc::sem_timedwait(self.handle.as_ptr(), dl) };
715+
res = if let Some(ref dl) = deadline {
716+
vm.allow_threads(|| unsafe { libc::sem_timedwait(sem_ptr, dl) })
717717
} else {
718-
res = unsafe { libc::sem_wait(self.handle.as_ptr()) };
719-
}
718+
vm.allow_threads(|| unsafe { libc::sem_wait(sem_ptr) })
719+
};
720720

721721
if res >= 0 {
722722
break;
@@ -750,7 +750,8 @@ mod _multiprocessing {
750750
} else {
751751
// No timeout: use sem_wait (available on macOS)
752752
loop {
753-
res = unsafe { libc::sem_wait(self.handle.as_ptr()) };
753+
let sem_ptr = self.handle.as_ptr();
754+
res = vm.allow_threads(|| unsafe { libc::sem_wait(sem_ptr) });
754755
if res >= 0 {
755756
break;
756757
}

crates/stdlib/src/select.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,7 @@ mod decl {
280280

281281
loop {
282282
let mut tv = timeout.map(sec_to_timeval);
283-
let res = vm.allow_threads(|| {
284-
super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut())
285-
});
283+
let res = vm.allow_threads(|| super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut()));
286284

287285
match res {
288286
Ok(_) => break,
@@ -504,7 +502,9 @@ mod decl {
504502
let deadline = timeout.map(|d| Instant::now() + d);
505503
let mut poll_timeout = timeout_ms;
506504
loop {
507-
let res = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as _, poll_timeout) };
505+
let res = vm.allow_threads(|| unsafe {
506+
libc::poll(fds.as_mut_ptr(), fds.len() as _, poll_timeout)
507+
});
508508
match nix::Error::result(res) {
509509
Ok(_) => break,
510510
Err(nix::Error::EINTR) => vm.check_signals()?,
@@ -697,11 +697,13 @@ mod decl {
697697

698698
loop {
699699
events.clear();
700-
match epoll::wait(
701-
epoll,
702-
rustix::buffer::spare_capacity(&mut events),
703-
poll_timeout.as_ref(),
704-
) {
700+
match vm.allow_threads(|| {
701+
epoll::wait(
702+
epoll,
703+
rustix::buffer::spare_capacity(&mut events),
704+
poll_timeout.as_ref(),
705+
)
706+
}) {
705707
Ok(_) => break,
706708
Err(rustix::io::Errno::INTR) => vm.check_signals()?,
707709
Err(e) => return Err(e.into_pyexception(vm)),

crates/vm/src/frame.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,9 @@ impl ExecutingFrame<'_> {
11991199
}
12001200
}
12011201

1202-
if let Err(exception) = vm.check_signals() {
1202+
if vm.eval_breaker_tripped()
1203+
&& let Err(exception) = vm.check_signals()
1204+
{
12031205
#[cold]
12041206
fn handle_signal_exception(
12051207
frame: &mut ExecutingFrame<'_>,

crates/vm/src/signal.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ pub(crate) fn set_triggered() {
9191
ANY_TRIGGERED.store(true, Ordering::Release);
9292
}
9393

94+
#[inline(always)]
95+
pub(crate) fn is_triggered() -> bool {
96+
ANY_TRIGGERED.load(Ordering::Relaxed)
97+
}
98+
9499
/// Reset all signal trigger state after fork in child process.
95100
/// Stale triggers from the parent must not fire in the child.
96101
#[cfg(unix)]

crates/vm/src/stdlib/io.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,7 +1580,7 @@ mod _io {
15801580

15811581
fn lock(&self, vm: &VirtualMachine) -> PyResult<PyThreadMutexGuard<'_, BufferedData>> {
15821582
self.data()
1583-
.lock()
1583+
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
15841584
.ok_or_else(|| vm.new_runtime_error("reentrant call inside buffered io"))
15851585
}
15861586

@@ -2812,7 +2812,7 @@ mod _io {
28122812
vm: &VirtualMachine,
28132813
) -> PyResult<PyThreadMutexGuard<'_, Option<TextIOData>>> {
28142814
self.data
2815-
.lock()
2815+
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
28162816
.ok_or_else(|| vm.new_runtime_error("reentrant call inside textio"))
28172817
}
28182818

@@ -4158,7 +4158,7 @@ mod _io {
41584158
vm: &VirtualMachine,
41594159
) -> PyResult<PyThreadMutexGuard<'_, Option<IncrementalNewlineDecoderData>>> {
41604160
self.data
4161-
.lock()
4161+
.lock_wrapped(|do_lock| vm.allow_threads(do_lock))
41624162
.ok_or_else(|| vm.new_runtime_error("reentrant call inside nldecoder"))
41634163
}
41644164

@@ -5336,7 +5336,7 @@ mod fileio {
53365336
types::{Constructor, DefaultConstructor, Destructor, Initializer, Representable},
53375337
};
53385338
use crossbeam_utils::atomic::AtomicCell;
5339-
use std::io::{Read, Write};
5339+
use std::io::Read;
53405340

53415341
bitflags::bitflags! {
53425342
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -5740,12 +5740,12 @@ mod fileio {
57405740
"File or stream is not readable".to_owned(),
57415741
));
57425742
}
5743-
let mut handle = zelf.get_fd(vm)?;
5743+
let handle = zelf.get_fd(vm)?;
57445744
let bytes = if let Some(read_byte) = read_byte.to_usize() {
57455745
let mut bytes = vec![0; read_byte];
57465746
// Loop on EINTR (PEP 475)
57475747
let n = loop {
5748-
match handle.read(&mut bytes) {
5748+
match vm.allow_threads(|| crt_fd::read(handle, &mut bytes)) {
57495749
Ok(n) => break n,
57505750
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
57515751
vm.check_signals()?;
@@ -5764,7 +5764,10 @@ mod fileio {
57645764
let mut bytes = vec![];
57655765
// Loop on EINTR (PEP 475)
57665766
loop {
5767-
match handle.read_to_end(&mut bytes) {
5767+
match vm.allow_threads(|| {
5768+
let mut h = handle;
5769+
h.read_to_end(&mut bytes)
5770+
}) {
57685771
Ok(_) => break,
57695772
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
57705773
vm.check_signals()?;
@@ -5802,10 +5805,9 @@ mod fileio {
58025805
let handle = zelf.get_fd(vm)?;
58035806

58045807
let mut buf = obj.borrow_buf_mut();
5805-
let mut f = handle.take(buf.len() as _);
58065808
// Loop on EINTR (PEP 475)
58075809
let ret = loop {
5808-
match f.read(&mut buf) {
5810+
match vm.allow_threads(|| crt_fd::read(handle, &mut buf)) {
58095811
Ok(n) => break n,
58105812
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
58115813
vm.check_signals()?;
@@ -5835,11 +5837,11 @@ mod fileio {
58355837
));
58365838
}
58375839

5838-
let mut handle = zelf.get_fd(vm)?;
5840+
let handle = zelf.get_fd(vm)?;
58395841

58405842
// Loop on EINTR (PEP 475)
58415843
let len = loop {
5842-
match obj.with_ref(|b| handle.write(b)) {
5844+
match obj.with_ref(|b| vm.allow_threads(|| crt_fd::write(handle, b))) {
58435845
Ok(n) => break n,
58445846
Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
58455847
vm.check_signals()?;

0 commit comments

Comments
 (0)
X Tutup