X Tutup
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ flame-it = ["rustpython-vm/flame-it", "rustpython-stdlib/flame-it", "flame", "fl
freeze-stdlib = ["stdlib", "rustpython-vm/freeze-stdlib", "rustpython-pylib?/freeze-stdlib"]
jit = ["rustpython-vm/jit"]
threading = ["rustpython-vm/threading", "rustpython-stdlib/threading"]
fork = ["rustpython-vm/fork", "rustpython-stdlib/fork"]
sqlite = ["rustpython-stdlib/sqlite"]
ssl = []
ssl-rustls = ["ssl", "rustpython-stdlib/ssl-rustls"]
Expand Down
1 change: 1 addition & 0 deletions crates/stdlib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ default = ["compiler", "host_env"]
host_env = ["rustpython-vm/host_env"]
compiler = ["rustpython-vm/compiler"]
threading = ["rustpython-common/threading", "rustpython-vm/threading"]
fork = ["rustpython-vm/fork"]
sqlite = ["dep:libsqlite3-sys"]
# SSL backends - default to rustls
ssl = []
Expand Down
2 changes: 1 addition & 1 deletion crates/stdlib/src/_asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(crate) mod _asyncio {
});

// Register fork handler to clear task state in child process
#[cfg(unix)]
#[cfg(feature = "fork")]
{
let on_fork = vm
.get_attribute_opt(module.to_owned().into(), vm.ctx.intern_str("_on_fork"))?
Expand Down
1 change: 1 addition & 0 deletions crates/vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ flame-it = ["flame", "flamer"]
freeze-stdlib = ["encodings"]
jit = ["rustpython-jit"]
threading = ["rustpython-common/threading"]
fork = ["threading"]
gc = []
compiler = ["parser", "codegen", "rustpython-compiler"]
ast = ["ruff_python_ast", "ruff_text_size"]
Expand Down
2 changes: 1 addition & 1 deletion crates/vm/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl CodecsRegistry {
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) unsafe fn reinit_after_fork(&self) {
unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) };
}
Expand Down
4 changes: 2 additions & 2 deletions crates/vm/src/gc_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl GcGeneration {
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
unsafe fn reinit_stats_after_fork(&self) {
unsafe { crate::common::lock::reinit_mutex_after_fork(&self.stats) };
}
Expand Down Expand Up @@ -731,7 +731,7 @@ impl GcState {
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold any of these locks.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub unsafe fn reinit_after_fork(&self) {
use crate::common::lock::{reinit_mutex_after_fork, reinit_rwlock_after_fork};

Expand Down
2 changes: 1 addition & 1 deletion crates/vm/src/intern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl StringPool {
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) unsafe fn reinit_after_fork(&self) {
unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) };
}
Expand Down
4 changes: 2 additions & 2 deletions crates/vm/src/object/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ mod weakref_lock {

/// Reset all weakref stripe locks after fork in child process.
/// Locks held by parent threads would cause infinite spin in the child.
#[cfg(unix)]
#[cfg(feature = "fork")]
pub(crate) fn reset_all_after_fork() {
for lock in &LOCKS {
lock.store(0, Ordering::Release);
Expand All @@ -493,7 +493,7 @@ mod weakref_lock {

/// Reset weakref stripe locks after fork. Must be called before any
/// Python code runs in the child process.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) fn reset_weakref_locks_after_fork() {
weakref_lock::reset_all_after_fork();
}
Expand Down
3 changes: 1 addition & 2 deletions crates/vm/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ pub(crate) fn is_triggered() -> bool {

/// Reset all signal trigger state after fork in child process.
/// Stale triggers from the parent must not fire in the child.
#[cfg(unix)]
#[cfg(feature = "host_env")]
#[cfg(feature = "fork")]
pub(crate) fn clear_after_fork() {
ANY_TRIGGERED.store(false, Ordering::Release);
for trigger in &TRIGGERS {
Expand Down
13 changes: 7 additions & 6 deletions crates/vm/src/stdlib/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod lock {
IMP_LOCK.lock();
}

#[cfg(feature = "fork")]
pub(super) fn release_lock_after_fork_parent() {
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
unsafe { IMP_LOCK.unlock() };
Expand All @@ -53,7 +54,7 @@ mod lock {
/// # Safety
///
/// Must only be called from single-threaded child after fork().
#[cfg(unix)]
#[cfg(feature = "fork")]
pub(crate) unsafe fn reinit_after_fork() {
if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() {
// Held by a dead thread — reset to unlocked.
Expand All @@ -65,7 +66,7 @@ mod lock {
/// behavior in the post-fork child:
/// 1) if ownership metadata is stale (dead owner / changed tid), reset;
/// 2) if current thread owns the lock, release it.
#[cfg(unix)]
#[cfg(feature = "fork")]
pub(super) unsafe fn after_fork_child_reinit_and_release() {
unsafe { reinit_after_fork() };
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
Expand All @@ -75,22 +76,22 @@ mod lock {
}

/// Re-export for fork safety code in posix.rs
#[cfg(feature = "threading")]
#[cfg(feature = "fork")]
pub(crate) fn acquire_imp_lock_for_fork() {
lock::acquire_lock_for_fork();
}

#[cfg(feature = "threading")]
#[cfg(feature = "fork")]
pub(crate) fn release_imp_lock_after_fork_parent() {
lock::release_lock_after_fork_parent();
}

#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) unsafe fn reinit_imp_lock_after_fork() {
unsafe { lock::reinit_after_fork() }
}

#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) unsafe fn after_fork_child_imp_lock_release() {
unsafe { lock::after_fork_child_reinit_and_release() }
}
Expand Down
6 changes: 3 additions & 3 deletions crates/vm/src/stdlib/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* I/O core tools.
*/
pub(crate) use _io::module_def;
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub(crate) use _io::reinit_std_streams_after_fork;

cfg_if::cfg_if! {
Expand Down Expand Up @@ -4999,7 +4999,7 @@ mod _io {
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
pub unsafe fn reinit_std_streams_after_fork(vm: &VirtualMachine) {
for name in ["stdin", "stdout", "stderr"] {
let Ok(stream) = vm.sys_module.get_attr(name, vm) else {
Expand All @@ -5009,7 +5009,7 @@ mod _io {
}
}

#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
fn reinit_io_locks(obj: &PyObject) {
use crate::common::lock::reinit_thread_mutex_after_fork;

Expand Down
26 changes: 13 additions & 13 deletions crates/vm/src/stdlib/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod module {
builtins::{PyDictRef, PyInt, PyListRef, PyTupleRef, PyUtf8Str},
convert::{IntoPyException, ToPyObject, TryFromObject},
exceptions::OSErrorBuilder,
function::{ArgMapping, Either, KwArgs, OptionalArg},
function::{ArgMapping, Either, OptionalArg},
ospath::{OsPath, OsPathOrFd},
stdlib::os::{
_os, DirFd, FollowSymlinks, SupportFunc, TargetIsDirectory, fs_metadata,
Expand Down Expand Up @@ -681,6 +681,7 @@ pub mod module {
)
}

#[cfg(feature = "fork")]
#[derive(FromArgs)]
struct RegisterAtForkArgs {
#[pyarg(named, optional)]
Expand All @@ -691,6 +692,7 @@ pub mod module {
after_in_child: OptionalArg<PyObjectRef>,
}

#[cfg(feature = "fork")]
impl RegisterAtForkArgs {
fn into_validated(
self,
Expand Down Expand Up @@ -724,10 +726,11 @@ pub mod module {
}
}

#[cfg(feature = "fork")]
#[pyfunction]
fn register_at_fork(
args: RegisterAtForkArgs,
_ignored: KwArgs,
_ignored: crate::function::KwArgs,
vm: &VirtualMachine,
) -> PyResult<()> {
let (before, after_in_parent, after_in_child) = args.into_validated(vm)?;
Expand All @@ -744,6 +747,7 @@ pub mod module {
Ok(())
}

#[cfg(feature = "fork")]
fn run_at_forkers(mut funcs: Vec<PyObjectRef>, reversed: bool, vm: &VirtualMachine) {
if !funcs.is_empty() {
if reversed {
Expand All @@ -761,34 +765,31 @@ pub mod module {
}
}

#[cfg(feature = "fork")]
fn py_os_before_fork(vm: &VirtualMachine) {
let before_forkers: Vec<PyObjectRef> = vm.state.before_forkers.lock().clone();
// functions must be executed in reversed order as they are registered
// only for before_forkers, refer: test_register_at_fork in test_posix

run_at_forkers(before_forkers, true, vm);

#[cfg(feature = "threading")]
crate::stdlib::imp::acquire_imp_lock_for_fork();

#[cfg(feature = "threading")]
vm.state.stop_the_world.stop_the_world(vm);
}

#[cfg(feature = "fork")]
fn py_os_after_fork_child(vm: &VirtualMachine) {
#[cfg(feature = "threading")]
vm.state.stop_the_world.reset_after_fork();

// Phase 1: Reset all internal locks FIRST.
// After fork(), locks held by dead parent threads would deadlock
// if we try to acquire them. This must happen before anything else.
#[cfg(feature = "threading")]
reinit_locks_after_fork(vm);

// Reinit per-object IO buffer locks on std streams.
// BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be
// held by dead parent threads, causing deadlocks on any IO in the child.
#[cfg(feature = "threading")]
unsafe {
crate::stdlib::io::reinit_std_streams_after_fork(vm)
};
Expand All @@ -798,17 +799,14 @@ pub mod module {
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();

// Reset weakref stripe locks that may have been held during fork.
#[cfg(feature = "threading")]
crate::object::reset_weakref_locks_after_fork();

// Phase 3: Clean up thread state. Locks are now reinit'd so we can
// acquire them normally instead of using try_lock().
#[cfg(feature = "threading")]
crate::stdlib::thread::after_fork_child(vm);

// CPython parity: reinit import lock ownership metadata in child
// and release the lock acquired by PyOS_BeforeFork().
#[cfg(feature = "threading")]
unsafe {
crate::stdlib::imp::after_fork_child_imp_lock_release()
};
Expand All @@ -828,7 +826,7 @@ pub mod module {
/// After fork(), only the calling thread survives. Any locks held by other
/// (now-dead) threads would cause deadlocks. We unconditionally reset them
/// to unlocked by zeroing the raw lock bytes.
#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
fn reinit_locks_after_fork(vm: &VirtualMachine) {
use rustpython_common::lock::reinit_mutex_after_fork;

Expand Down Expand Up @@ -861,11 +859,10 @@ pub mod module {
}
}

#[cfg(feature = "fork")]
fn py_os_after_fork_parent(vm: &VirtualMachine) {
#[cfg(feature = "threading")]
vm.state.stop_the_world.start_the_world(vm);

#[cfg(feature = "threading")]
crate::stdlib::imp::release_imp_lock_after_fork_parent();

let after_forkers_parent: Vec<PyObjectRef> = vm.state.after_forkers_parent.lock().clone();
Expand All @@ -874,6 +871,7 @@ pub mod module {

/// Best-effort number of OS threads in this process.
/// Returns <= 0 when unavailable.
#[cfg(feature = "fork")]
fn get_number_of_os_threads() -> isize {
#[cfg(target_os = "macos")]
{
Expand Down Expand Up @@ -952,6 +950,7 @@ pub mod module {

/// Warn if forking from a multi-threaded process.
/// `num_os_threads` should be captured before parent after-fork hooks run.
#[cfg(feature = "fork")]
fn warn_if_multi_threaded(name: &str, num_os_threads: isize, vm: &VirtualMachine) {
let num_threads = if num_os_threads > 0 {
num_os_threads as usize
Expand Down Expand Up @@ -998,6 +997,7 @@ pub mod module {
}
}

#[cfg(feature = "fork")]
#[pyfunction]
fn fork(vm: &VirtualMachine) -> PyResult<i32> {
if vm
Expand Down
2 changes: 1 addition & 1 deletion crates/vm/src/stdlib/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ pub(crate) mod _signal {

/// Reset wakeup fd after fork in child process.
/// The child must not write to the parent's wakeup fd.
#[cfg(unix)]
#[cfg(feature = "fork")]
pub(crate) fn clear_wakeup_fd_after_fork() {
WAKEUP.store(INVALID_WAKEUP, Ordering::Relaxed);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/vm/src/stdlib/thread.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Implementation of the _thread module
#[cfg(unix)]
#[cfg(feature = "fork")]
pub(crate) use _thread::after_fork_child;
pub use _thread::get_ident;
#[cfg_attr(target_arch = "wasm32", allow(unused_imports))]
Expand Down Expand Up @@ -327,7 +327,7 @@ pub(crate) mod _thread {
current_thread_id()
}

#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
#[pyfunction]
fn _stop_the_world_stats(vm: &VirtualMachine) -> PyResult<PyDictRef> {
let stats = vm.state.stop_the_world.stats_snapshot();
Expand Down Expand Up @@ -378,7 +378,7 @@ pub(crate) mod _thread {
Ok(d)
}

#[cfg(all(unix, feature = "threading"))]
#[cfg(feature = "fork")]
#[pyfunction]
fn _stop_the_world_reset_stats(vm: &VirtualMachine) {
vm.state.stop_the_world.reset_stats();
Expand Down Expand Up @@ -1050,7 +1050,7 @@ pub(crate) mod _thread {
///
/// Precondition: `reinit_locks_after_fork()` has already been called, so all
/// parking_lot-based locks in VmState are in unlocked state.
#[cfg(unix)]
#[cfg(feature = "fork")]
pub fn after_fork_child(vm: &VirtualMachine) {
let current_ident = get_ident();

Expand Down Expand Up @@ -1134,7 +1134,7 @@ pub(crate) mod _thread {
}

/// Reset a parking_lot::Mutex to unlocked state after fork.
#[cfg(unix)]
#[cfg(feature = "fork")]
fn reinit_parking_lot_mutex<T: ?Sized>(mutex: &parking_lot::Mutex<T>) {
unsafe { rustpython_common::lock::zero_reinit_after_fork(mutex.raw()) };
}
Expand Down
Loading
Loading
X Tutup