X Tutup
use lexopt::Arg::*; use lexopt::ValueExt; use rustpython_vm::{Settings, vm::CheckHashPycsMode}; use std::str::FromStr; use std::{cmp, env}; pub enum RunMode { Script(String), Command(String), Module(String), InstallPip(InstallPipMode), Repl, } pub enum InstallPipMode { /// Install pip using the ensurepip pip module. This has a higher chance of /// success, but may not install the latest version of pip. Ensurepip, /// Install pip using the get-pip.py script, which retrieves the latest pip version. /// This can be broken due to incompatibilities with cpython. GetPip, } impl FromStr for InstallPipMode { type Err = &'static str; fn from_str(s: &str) -> Result { match s { "ensurepip" => Ok(Self::Ensurepip), "get-pip" => Ok(Self::GetPip), _ => Err("--install-pip takes ensurepip or get-pip as first argument"), } } } #[derive(Default)] struct CliArgs { bytes_warning: u8, dont_write_bytecode: bool, debug: u8, ignore_environment: bool, inspect: bool, isolate: bool, optimize: u8, safe_path: bool, quiet: bool, random_hash_seed: bool, no_user_site: bool, no_site: bool, unbuffered: bool, verbose: u8, warning_control: Vec, implementation_option: Vec, check_hash_based_pycs: CheckHashPycsMode, #[cfg(feature = "flame-it")] profile_output: Option, #[cfg(feature = "flame-it")] profile_format: Option, } const USAGE_STRING: &str = "\ usage: {PROG} [option] ... [-c cmd | -m mod | file | -] [arg] ... Options (and corresponding environment variables): -b : issue warnings about converting bytes/bytearray to str and comparing bytes/bytearray with str or bytes with int. (-bb: issue errors) -B : don't write .pyc files on import; also PYTHONDONTWRITEBYTECODE=x -c cmd : program passed in as string (terminates option list) -d : turn on parser debugging output (for experts only, only works on debug builds); also PYTHONDEBUG=x -E : ignore PYTHON* environment variables (such as PYTHONPATH) -h : print this help message and exit (also -? or --help) -i : inspect interactively after running script; forces a prompt even if stdin does not appear to be a terminal; also PYTHONINSPECT=x -I : isolate Python from the user's environment (implies -E and -s) -m mod : run library module as a script (terminates option list) -O : remove assert and __debug__-dependent statements; add .opt-1 before .pyc extension; also PYTHONOPTIMIZE=x -OO : do -O changes and also discard docstrings; add .opt-2 before .pyc extension -P : don't prepend a potentially unsafe path to sys.path; also PYTHONSAFEPATH -q : don't print version and copyright messages on interactive startup -s : don't add user site directory to sys.path; also PYTHONNOUSERSITE=x -S : don't imply 'import site' on initialization -u : force the stdout and stderr streams to be unbuffered; this option has no effect on stdin; also PYTHONUNBUFFERED=x -v : verbose (trace import statements); also PYTHONVERBOSE=x can be supplied multiple times to increase verbosity -V : print the Python version number and exit (also --version) when given twice, print more information about the build -W arg : warning control; arg is action:message:category:module:lineno also PYTHONWARNINGS=arg -x : skip first line of source, allowing use of non-Unix forms of #!cmd -X opt : set implementation-specific option --check-hash-based-pycs always|default|never: control how Python invalidates hash-based .pyc files --help-env: print help about Python environment variables and exit --help-xoptions: print help about implementation-specific -X options and exit --help-all: print complete help information and exit RustPython extensions: Arguments: file : program read from script file - : program read from stdin (default; interactive mode if a tty) arg ...: arguments passed to program in sys.argv[1:] "; fn parse_args() -> Result<(CliArgs, RunMode, Vec), lexopt::Error> { let mut args = CliArgs::default(); let mut parser = lexopt::Parser::from_env(); fn argv(argv0: String, mut parser: lexopt::Parser) -> Result, lexopt::Error> { std::iter::once(Ok(argv0)) .chain(parser.raw_args()?.map(|arg| arg.string())) .collect() } while let Some(arg) = parser.next()? { match arg { Short('b') => args.bytes_warning += 1, Short('B') => args.dont_write_bytecode = true, Short('c') => { let cmd = parser.value()?.string()?; return Ok((args, RunMode::Command(cmd), argv("-c".to_owned(), parser)?)); } Short('d') => args.debug += 1, Short('E') => args.ignore_environment = true, Short('h' | '?') | Long("help") => help(parser), Short('i') => args.inspect = true, Short('I') => args.isolate = true, Short('m') => { let module = parser.value()?.string()?; let argv = argv("PLACEHOLDER".to_owned(), parser)?; return Ok((args, RunMode::Module(module), argv)); } Short('O') => args.optimize += 1, Short('P') => args.safe_path = true, Short('q') => args.quiet = true, Short('R') => args.random_hash_seed = true, Short('S') => args.no_site = true, Short('s') => args.no_user_site = true, Short('u') => args.unbuffered = true, Short('v') => args.verbose += 1, Short('V') | Long("version") => version(), Short('W') => args.warning_control.push(parser.value()?.string()?), // TODO: Short('x') => Short('X') => args.implementation_option.push(parser.value()?.string()?), Long("check-hash-based-pycs") => { args.check_hash_based_pycs = parser.value()?.parse()? } // TODO: make these more specific Long("help-env") => help(parser), Long("help-xoptions") => help(parser), Long("help-all") => help(parser), #[cfg(feature = "flame-it")] Long("profile-output") => args.profile_output = Some(parser.value()?), #[cfg(feature = "flame-it")] Long("profile-format") => args.profile_format = Some(parser.value()?.string()?), Long("install-pip") => { let (mode, argv) = if let Some(val) = parser.optional_value() { (val.parse()?, vec![val.string()?]) } else if let Ok(argv0) = parser.value() { let mode = argv0.parse()?; (mode, argv(argv0.string()?, parser)?) } else { ( InstallPipMode::Ensurepip, ["ensurepip", "--upgrade", "--default-pip"] .map(str::to_owned) .into(), ) }; return Ok((args, RunMode::InstallPip(mode), argv)); } Value(script_name) => { let script_name = script_name.string()?; let mode = if script_name == "-" { RunMode::Repl } else { RunMode::Script(script_name.clone()) }; return Ok((args, mode, argv(script_name, parser)?)); } _ => return Err(arg.unexpected()), } } Ok((args, RunMode::Repl, vec![])) } fn help(parser: lexopt::Parser) -> ! { let usage = USAGE_STRING.replace("{PROG}", parser.bin_name().unwrap_or("rustpython")); print!("{usage}"); std::process::exit(0); } fn version() -> ! { println!("Python {}", rustpython_vm::version::get_version()); std::process::exit(0); } /// Create settings by examining command line arguments and environment /// variables. pub fn parse_opts() -> Result<(Settings, RunMode), lexopt::Error> { let (args, mode, argv) = parse_args()?; let mut settings = Settings::default(); settings.isolated = args.isolate; settings.ignore_environment = settings.isolated || args.ignore_environment; settings.bytes_warning = args.bytes_warning.into(); settings.import_site = !args.no_site; let ignore_environment = settings.ignore_environment; if !ignore_environment { settings.path_list.extend(get_paths("RUSTPYTHONPATH")); settings.path_list.extend(get_paths("PYTHONPATH")); } // Now process command line flags: let get_env = |env| (!ignore_environment).then(|| env::var_os(env)).flatten(); let env_count = |env| { get_env(env).filter(|v| !v.is_empty()).map_or(0, |val| { val.to_str().and_then(|v| v.parse::().ok()).unwrap_or(1) }) }; settings.optimize = cmp::max(args.optimize, env_count("PYTHONOPTIMIZE")); settings.verbose = cmp::max(args.verbose, env_count("PYTHONVERBOSE")); settings.debug = cmp::max(args.debug, env_count("PYTHONDEBUG")); let env_bool = |env| get_env(env).is_some_and(|v| !v.is_empty()); settings.user_site_directory = !(settings.isolated || args.no_user_site || env_bool("PYTHONNOUSERSITE")); settings.quiet = args.quiet; settings.write_bytecode = !(args.dont_write_bytecode || env_bool("PYTHONDONTWRITEBYTECODE")); settings.safe_path = settings.isolated || args.safe_path || env_bool("PYTHONSAFEPATH"); settings.inspect = args.inspect || env_bool("PYTHONINSPECT"); settings.interactive = args.inspect; settings.buffered_stdio = !args.unbuffered; if let Some(val) = get_env("PYTHONINTMAXSTRDIGITS") { settings.int_max_str_digits = match val.to_str().and_then(|s| s.parse().ok()) { Some(digits @ (0 | 640..)) => digits, _ => { error!( "Fatal Python error: config_init_int_max_str_digits: PYTHONINTMAXSTRDIGITS: invalid limit; must be >= 640 or 0 for unlimited.\nPython runtime state: preinitialized" ); std::process::exit(1); } }; } settings.check_hash_pycs_mode = args.check_hash_based_pycs; if let Some(val) = get_env("PYTHONUTF8") && let Some(val_str) = val.to_str() && !val_str.is_empty() { settings.utf8_mode = match val_str { "1" => 1, "0" => 0, _ => { error!( "Fatal Python error: config_init_utf8_mode: \ PYTHONUTF8=N: N is missing or invalid\n\ Python runtime state: preinitialized" ); std::process::exit(1); } }; } let xopts = args.implementation_option.into_iter().map(|s| { let (name, value) = match s.split_once('=') { Some((name, value)) => (name.to_owned(), Some(value)), None => (s, None), }; match &*name { "dev" => settings.dev_mode = true, "faulthandler" => settings.faulthandler = true, "warn_default_encoding" => settings.warn_default_encoding = true, "utf8" => { settings.utf8_mode = match value { None => 1, Some("1") => 1, Some("0") => 0, _ => { error!( "Fatal Python error: config_init_utf8_mode: \ -X utf8=n: n is missing or invalid\n\ Python runtime state: preinitialized" ); std::process::exit(1); } }; } "no_sig_int" => settings.install_signal_handlers = false, "no_debug_ranges" => settings.code_debug_ranges = false, "int_max_str_digits" => { settings.int_max_str_digits = match value.unwrap().parse() { Ok(digits) if digits == 0 || digits >= 640 => digits, _ => { error!( "Fatal Python error: config_init_int_max_str_digits: \ -X int_max_str_digits: \ invalid limit; must be >= 640 or 0 for unlimited.\n\ Python runtime state: preinitialized" ); std::process::exit(1); } }; } "thread_inherit_context" => { settings.thread_inherit_context = match value { Some("1") => true, Some("0") => false, _ => { error!( "Fatal Python error: config_init_thread_inherit_context: \ -X thread_inherit_context=n: n is missing or invalid\n\ Python runtime state: preinitialized" ); std::process::exit(1); } }; } _ => {} } (name, value.map(str::to_owned)) }); settings.xoptions.extend(xopts); // Resolve utf8_mode if not explicitly set by PYTHONUTF8 or -X utf8. // Default to UTF-8 mode since RustPython's locale encoding detection // is incomplete. Users can set PYTHONUTF8=0 or -X utf8=0 to disable. if settings.utf8_mode < 0 { settings.utf8_mode = 1; } settings.warn_default_encoding = settings.warn_default_encoding || env_bool("PYTHONWARNDEFAULTENCODING"); settings.faulthandler = settings.faulthandler || env_bool("PYTHONFAULTHANDLER"); if env_bool("PYTHONNODEBUGRANGES") { settings.code_debug_ranges = false; } if let Some(val) = get_env("PYTHON_THREAD_INHERIT_CONTEXT") { settings.thread_inherit_context = match val.to_str() { Some("1") => true, Some("0") => false, _ => { error!( "Fatal Python error: config_init_thread_inherit_context: \ PYTHON_THREAD_INHERIT_CONTEXT=N: N is missing or invalid\n\ Python runtime state: preinitialized" ); std::process::exit(1); } }; } // Parse PYTHONIOENCODING=encoding[:errors] if let Some(val) = get_env("PYTHONIOENCODING") && let Some(val_str) = val.to_str() && !val_str.is_empty() { if let Some((enc, err)) = val_str.split_once(':') { if !enc.is_empty() { settings.stdio_encoding = Some(enc.to_owned()); } if !err.is_empty() { settings.stdio_errors = Some(err.to_owned()); } } else { settings.stdio_encoding = Some(val_str.to_owned()); } } if settings.dev_mode { settings.warnoptions.push("default".to_owned()); settings.faulthandler = true; } if settings.bytes_warning > 0 { let warn = if settings.bytes_warning > 1 { "error::BytesWarning" } else { "default::BytesWarning" }; settings.warnoptions.push(warn.to_owned()); } if let Some(val) = get_env("PYTHONWARNINGS") && let Some(val_str) = val.to_str() && !val_str.is_empty() { for warning in val_str.split(',') { let warning = warning.trim(); if !warning.is_empty() { settings.warnoptions.push(warning.to_owned()); } } } settings.warnoptions.extend(args.warning_control); settings.hash_seed = match (!args.random_hash_seed) .then(|| get_env("PYTHONHASHSEED")) .flatten() { Some(s) if s == "random" || s.is_empty() => None, Some(s) => { let seed = s.parse_with(|s| { s.parse::().map_err(|_| { "Fatal Python init error: PYTHONHASHSEED must be \ \"random\" or an integer in range [0; 4294967295]" }) })?; Some(seed) } None => None, }; settings.argv = argv; #[cfg(feature = "flame-it")] { settings.profile_output = args.profile_output; settings.profile_format = args.profile_format; } Ok((settings, mode)) } /// Helper function to retrieve a sequence of paths from an environment variable. fn get_paths(env_variable_name: &str) -> impl Iterator + '_ { env::var_os(env_variable_name) .filter(|v| !v.is_empty()) .into_iter() .flat_map(move |paths| { split_paths(&paths) .map(|path| { path.into_os_string() .into_string() .unwrap_or_else(|_| panic!("{env_variable_name} isn't valid unicode")) }) .collect::>() }) } #[cfg(not(target_os = "wasi"))] pub(crate) use env::split_paths; #[cfg(target_os = "wasi")] pub(crate) fn split_paths + ?Sized>( s: &T, ) -> impl Iterator + '_ { let s = s.as_ref().as_encoded_bytes(); s.split(|b| *b == b':').map(|x| { unsafe { std::ffi::OsStr::from_encoded_bytes_unchecked(x) } .to_owned() .into() }) }
X Tutup