X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
gh-75572: Speed up test_xpickle
Run a long living subprocess which handles multiple requests instead of
running a new subprocess for each request.
  • Loading branch information
serhiy-storchaka committed Feb 2, 2026
commit c7bd567e41fb747ec38487256caa18a11166bf98
103 changes: 72 additions & 31 deletions Lib/test/test_xpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import os
import pickle
import struct
import subprocess
import sys
import unittest
Expand Down Expand Up @@ -83,16 +84,27 @@ def have_python_version(py_version):
return py_executable_map.get(py_version, None)


@support.requires_resource('cpu')
def read_exact(f, n):
buf = b''
while len(buf) < n:
chunk = f.read(n - len(buf))
if not chunk:
raise EOFError
buf += chunk
return buf


class AbstractCompatTests(pickletester.AbstractPickleTests):
py_version = None
worker = None

@classmethod
def setUpClass(cls):
assert cls.py_version is not None, 'Needs a python version tuple'
if not have_python_version(cls.py_version):
py_version_str = ".".join(map(str, cls.py_version))
raise unittest.SkipTest(f'Python {py_version_str} not available')
cls.addClassCleanup(cls.close_worker)
# Override the default pickle protocol to match what xpickle worker
# will be running.
highest_protocol = highest_proto_for_py_version(cls.py_version)
Expand All @@ -101,8 +113,31 @@ def setUpClass(cls):
cls.enterClassContext(support.swap_attr(pickle, 'HIGHEST_PROTOCOL',
highest_protocol))

@staticmethod
def send_to_worker(python, data):
@classmethod
def start_worker(cls):
target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
worker = subprocess.Popen([*python, target],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# For windows bpo-17023.
shell=is_windows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use text=True to fix this code below:

            _, stderr = worker.communicate()
            raise RuntimeError(stderr)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we do binary IO for stdin/stdout in normal case.

This code is only for troubleshooting. It is not executed when the tests pass.

cls.worker = worker

@classmethod
def close_worker(cls):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"stop_worker()" name would be better to have start/stop_worker methods.

worker = cls.worker
if worker is None:
return
cls.worker = None
worker.stdin.close()
worker.stdout.close()
worker.stderr.close()
worker.terminate()
worker.wait()

@classmethod
def send_to_worker(cls, python, data):
"""Bounce a pickled object through another version of Python.
This will send data to a child process where it will
be unpickled, then repickled and sent back to the parent process.
Expand All @@ -112,33 +147,40 @@ def send_to_worker(python, data):
Returns:
The pickled data received from the child process.
"""
target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
worker = subprocess.Popen([*python, target],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# For windows bpo-17023.
shell=is_windows)
stdout, stderr = worker.communicate(data)
if worker.returncode == 0:
return stdout
# if the worker fails, it will write the exception to stdout
worker = cls.worker
if worker is None:
target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
worker = subprocess.Popen([*python, target],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not calling start_worker() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is a remnant. I was not sure whether it is worth to move this code out into a method.

stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# For windows bpo-17023.
shell=is_windows)
cls.worker = worker

try:
exception = pickle.loads(stdout)
except (pickle.UnpicklingError, EOFError):
worker.stdin.write(struct.pack('!i', len(data)) + data)
worker.stdin.flush()

size, = struct.unpack('!i', read_exact(worker.stdout, 4))
if size > 0:
return read_exact(worker.stdout, size)
# if the worker fails, it will write the exception to stdout
if size < 0:
stdout = read_exact(worker.stdout, -size)
try:
exception = pickle.loads(stdout)
except (pickle.UnpicklingError, EOFError):
pass
else:
if isinstance(exception, Exception):
# To allow for tests which test for errors.
raise exception
_, stderr = worker.communicate()
raise RuntimeError(stderr)
else:
if support.verbose > 1:
print()
print(f'{data = }')
print(f'{stdout = }')
print(f'{stderr = }')
if isinstance(exception, Exception):
# To allow for tests which test for errors.
raise exception
else:
raise RuntimeError(stderr)

except:
cls.close_worker()
raise

def dumps(self, arg, proto=0, **kwargs):
# Skip tests that require buffer_callback arguments since
Expand All @@ -148,9 +190,8 @@ def dumps(self, arg, proto=0, **kwargs):
self.skipTest('Test does not support "buffer_callback" argument.')
f = io.BytesIO()
p = self.pickler(f, proto, **kwargs)
p.dump((proto, arg))
f.seek(0)
data = bytes(f.read())
p.dump(arg)
data = struct.pack('!i', proto) + f.getvalue()
python = py_executable_map[self.py_version]
return self.send_to_worker(python, data)

Expand Down
35 changes: 29 additions & 6 deletions Lib/test/xpickle_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# pickles in a different Python version.
import os
import pickle
import struct
import sys


Expand All @@ -24,16 +25,38 @@
sources = f.read()
exec(sources, vars(test_module))

def read_exact(f, n):
buf = b''
while len(buf) < n:
chunk = f.read(n - len(buf))
if not chunk:
raise EOFError
buf += chunk
return buf

in_stream = getattr(sys.stdin, 'buffer', sys.stdin)
out_stream = getattr(sys.stdout, 'buffer', sys.stdout)

try:
message = pickle.load(in_stream)
protocol, obj = message
pickle.dump(obj, out_stream, protocol)
except Exception as e:
while True:
size, = struct.unpack('!i', read_exact(in_stream, 4))
if not size:
break
data = read_exact(in_stream, size)
protocol, = struct.unpack('!i', data[:4])
obj = pickle.loads(data[4:])
data = pickle.dumps(obj, protocol)
out_stream.write(struct.pack('!i', len(data)) + data)
out_stream.flush()
except Exception as exc:
# dump the exception to stdout and write to stderr, then exit
pickle.dump(e, out_stream)
sys.stderr.write(repr(e))
try:
data = pickle.dumps(exc)
out_stream.write(struct.pack('!i', -len(data)) + data)
out_stream.flush()
except Exception:
out_stream.write(struct.pack('!i', 0))
out_stream.flush()
sys.stderr.write(repr(exc))
sys.stderr.flush()
sys.exit(1)
Loading
X Tutup