Skip to content

Possible ressource leak / race condition in streamable_http_client #1805

@h-filzer

Description

@h-filzer

Initial Checks

Description

Observed Issue

When using the MCP SDK with the streamable-http transport, the cpu usage spikes and never goes back down after sending multiple requests and exiting the client context. I observed this bahavior especially in the google adk-python when the mcp-toolset tries to close the client.

When running the testcode to reproduce i get:

Client-Side Testcode shows Thread Leaks

Iteration 1: 1 thread  → 2 threads
Iteration 2: 2 threads → 7 threads
Iteration 3: 7 threads → 7 threads
Final: 7 threads (6 leaked asyncio_* threads)

Server-Side: Exceptions

Session crashed: unhandled errors in a TaskGroup (1 sub-exception)

ClosedResourceError at session.py:349
  → _write_stream.send() fails - stream already closed

BrokenResourceError at streamable_http.py:638
  → SSE writer has no receiver

Possible Cause

  1. Client sends multiple requests via write_stream.send()
  2. Client exits async with streamable_http_client context
  3. tg.cancel_scope.cancel() is called during cleanup
  4. Server is still processing requests (e.g., slow_echo with delay)
  5. Server tries to send responses via _write_stream.send()
  6. Stream is already closed → ClosedResourceError
  7. Background asyncio threads handling responses become orphaned
  8. Threads never terminate → memory/resource leak

Expected Behavior

  • No thread leaks after exiting streamable_http_client context
  • Graceful handling of client disconnection on server side

Impact

  • Memory leak from accumulated threads
  • Resource exhaustion in long-running applications

Affected Code Paths

  • mcp/client/streamable_http.py - streamable_http_client() cleanup
  • mcp/shared/session.py:237-238 - cancel_scope.cancel() and __aexit__
  • mcp/server/streamable_http.py:638 - SSE response handling

Example Code

"""
This script demonstrates a possible bug in the MCP SDK where cleanup of
streamable_http_client causes BrokenResourceError and leaks threads.

Bug?: When tg.cancel_scope.cancel() is called during cleanup, child tasks
spawned by post_writer (like _handle_json_response) are still trying to
use read_stream_writer. The stream gets closed before they finish,
causing BrokenResourceError and preventing proper httpx client cleanup.
"""

import asyncio
import threading
import httpx
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.message import SessionMessage
from mcp.types import JSONRPCRequest, JSONRPCMessage


async def make_mcp_request(url: str, trigger_race: bool = False) -> None:
    """Make an MCP request and observe cleanup behavior."""

    print(f"[Before] Active threads: {threading.active_count()}")
    print(f"[Before] Thread names: {[t.name for t in threading.enumerate()]}")

    # Create custom httpx client with short timeouts
    http_client = httpx.AsyncClient(
        headers={"Authorization": "test"},
        timeout=httpx.Timeout(5.0, read=10.0),
    )

    try:
        async with streamable_http_client(
            url=url,
            http_client=http_client,
        ) as (read_stream, write_stream, get_session_id):
            print(f"[Connected] Session ID: {get_session_id()}")
            print(f"[Connected] Active threads: {threading.active_count()}")

            # Send an initialize request
            init_request = JSONRPCRequest(
                jsonrpc="2.0",
                id="1",
                method="initialize",
                params={
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "test-client", "version": "1.0.0"},
                },
            )
            await write_stream.send(SessionMessage(JSONRPCMessage(init_request)))
            print("[Sent] Initialize request")

            # Read the response
            async for message in read_stream:
                print(f"[Received] {type(message).__name__}")
                if isinstance(message, Exception):
                    print(f"[Error in stream] {message}")
                break

            if trigger_race:
                # Send multiple requests quickly to create race during cleanup
                for i in range(5):
                    tool_request = JSONRPCRequest(
                        jsonrpc="2.0",
                        id=f"tool-{i}",
                        method="tools/call",
                        params={
                            "name": "slow_echo",
                            "arguments": {"message": f"test-{i}"},
                        },
                    )
                    await write_stream.send(SessionMessage(JSONRPCMessage(tool_request)))
                print("[Sent] 5 tool requests - exiting immediately to trigger race")
                # Exit immediately without reading responses - this should trigger the race

    except Exception as e:
        import traceback
        print(f"[Error] {type(e).__name__}: {e}")
        traceback.print_exc()

    print(f"[After cleanup] Active threads: {threading.active_count()}")
    print(f"[After cleanup] Thread names: {[t.name for t in threading.enumerate()]}")

    # Wait and check if threads persist
    await asyncio.sleep(5)
    print(f"[After 5s] Active threads: {threading.active_count()}")
    print(f"[After 5s] Thread names: {[t.name for t in threading.enumerate()]}")


async def main():
    # Replace with your MCP server URL
    MCP_URL = "http://localhost:8000/mcp"

    print("=" * 60)
    print("MCP SDK Cleanup Race Condition Reproduction")
    print("=" * 60)

    # Run multiple iterations to accumulate leaked threads
    for i in range(3):
        print(f"\n--- Iteration {i + 1} ---")
        # Trigger race condition on iterations 2 and 3
        await make_mcp_request(MCP_URL, trigger_race=(i > 0))
        await asyncio.sleep(2)

    print("\n" + "=" * 60)
    print("Final State")
    print("=" * 60)
    print(f"Active threads: {threading.active_count()}")
    for t in threading.enumerate():
        print(f"  - {t.name} (daemon={t.daemon})")


if __name__ == "__main__":
    asyncio.run(main())


-----

"""Minimal MCP server for race condition testing."""

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Test Server")


@mcp.tool()
def echo(message: str) -> str:
    """Echo the message back."""
    return f"Echo: {message}"


@mcp.tool()
def slow_echo(message: str) -> str:
    """Echo with a delay to simulate work."""
    import time

    time.sleep(0.5)
    return f"Slow Echo: {message}"


if __name__ == "__main__":
    mcp.run(transport="streamable-http")

Python & MCP Python SDK

- Python: 3.13
- MCP SDK: 1.25.0
- Transport: streamable-http

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingneeds confirmationNeeds confirmation that the PR is actually required or needed.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions