fix: close all memory stream ends in client transport cleanup#2266
Draft
fix: close all memory stream ends in client transport cleanup#2266
Conversation
Client transports for SSE, WebSocket, and StreamableHTTP create 4 memory stream ends (2 paired streams) but only closed 2 in their finally blocks. anyio memory stream ends are independent — closing the writer does not close the reader. The unclosed ends leak and emit ResourceWarning when garbage collected. This caused flaky test failures in CI: a transport connection error in one test would leak streams, then GC in a later unrelated test would trigger ResourceWarning, which pytest promotes to a test failure. Fix follows the existing correct pattern in stdio.py: - sse.py: close all 4 stream ends in the existing finally block - streamable_http.py: close all 4 stream ends in the existing finally block (read_stream was previously never closed, even on happy path) - websocket.py: add try/finally wrapping the entire body, closing all 4 stream ends (previously had no cleanup at all — ws_connect failure leaked everything) Regression tests force gc.collect() after the transport context exits so leaked streams fail deterministically in the test that caused them.
The gc.collect() in these tests was picking up leaked PipeHandles from flaky stdio tests (TestChildProcessCleanup) on the same xdist worker, causing false failures on Windows CI. Now uses a custom sys.unraisablehook that filters for MemoryObject stream leaks specifically, ignoring unrelated resources leaked by other tests. Also adds explicit del exc_info in the SSE test since the traceback would otherwise keep leaked stream locals alive past gc.collect().
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Client transports for SSE, WebSocket, and StreamableHTTP create 4 anyio memory stream ends (2 paired streams) but only closed 2 in their
finallyblocks. anyio memory stream ends are independent — closing the writer does not close the reader. Unclosed stream ends leak and emitResourceWarningwhen garbage collected.This caused flaky CI failures: a transport connection error (404, 403,
ConnectError) in one test would leak streams, then GC in a later unrelated test would triggerResourceWarning, which pytest'sfilterwarnings = ["error"]promotes to a test failure — in whatever test happened to be running when GC fired, not the test that actually leaked.Fix
Follows the existing correct pattern in
stdio.py(which closes all 4 ends on both early-fail and normal-exit paths):sse.pyfinallyclosed 2 of 4finallycloses all 4streamable_http.pyfinallyclosed 2 of 4 —read_streamwas never closed, even on happy pathfinallycloses all 4websocket.pytry/finallyat all — ifws_connect()raised, all 4 leakedtry/finallythat closes all 4anyio's
aclose()is idempotent, so double-closing (e.g. when reader/writer tasks already closed their end) is safe.Tests
Added
tests/client/test_transport_stream_cleanup.pywith one regression test per transport. Each test triggers the error/exit path, then callsgc.collect()to force any leaked stream to emitResourceWarningdeterministically. All 3 tests fail onmainwithResourceWarning: Unclosed <MemoryObjectReceiveStream>and pass with this fix.CI Evidence of the Flakiness
test_tool_progressknock-on failure — streams leaked bytest_basic_resources(which got a 404), GC'd duringtest_tool_progresstest_call_toolknock-on failure — in-memory test that doesn't touch the network, failed because an earlier test on the same worker leaked resourcesAI Disclaimer