Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async def __call__(
async def _default_message_handler(
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
) -> None:
if isinstance(message, Exception):
logger.warning("Unhandled exception in message handler: %s", message)
await anyio.lowlevel.checkpoint()


Expand Down
44 changes: 29 additions & 15 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

# Reconnection defaults
DEFAULT_RECONNECTION_DELAY_MS = 1000 # 1 second fallback when server doesn't provide retry
MAX_RECONNECTION_ATTEMPTS = 2 # Max retry attempts before giving up
MAX_RECONNECTION_ATTEMPTS = 5 # Max retry attempts before giving up


class StreamableHTTPError(Exception):
Expand Down Expand Up @@ -197,6 +197,7 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer:
event_source.response.raise_for_status()
logger.debug("GET SSE connection established")

received_events = False
async for sse in event_source.aiter_sse():
# Track last event ID for reconnection
if sse.id:
Expand All @@ -205,10 +206,12 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer:
if sse.retry is not None:
retry_interval_ms = sse.retry

received_events = True
await self._handle_sse_event(sse, read_stream_writer)

# Stream ended normally (server closed) - reset attempt counter
attempt = 0
# Only reset attempts if we actually received events;
# empty connections count toward MAX_RECONNECTION_ATTEMPTS
attempt = 0 if received_events else attempt + 1

except Exception: # pragma: lax no cover
logger.debug("GET stream error", exc_info=True)
Expand Down Expand Up @@ -364,25 +367,36 @@ async def _handle_sse_response(
await response.aclose()
return # Normal completion, no reconnect needed
except Exception:
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
logger.debug("SSE stream error", exc_info=True)

# Stream ended without response - reconnect if we received an event with ID
if last_event_id is not None: # pragma: no branch
# Stream ended without a complete response — attempt reconnection if possible
if last_event_id is not None:
logger.info("SSE stream disconnected, reconnecting...")
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
if await self._handle_reconnection(ctx, last_event_id, retry_interval_ms):
return # Reconnection delivered the response

# No response delivered — unblock the waiting request with an error
error_data = ErrorData(code=INTERNAL_ERROR, message="SSE stream ended without a response")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
await ctx.read_stream_writer.send(error_msg)

async def _handle_reconnection(
self,
ctx: RequestContext,
last_event_id: str,
retry_interval_ms: int | None = None,
attempt: int = 0,
) -> None:
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
) -> bool:
"""Reconnect with Last-Event-ID to resume stream after server disconnect.

Returns:
True if the response was successfully delivered, False if max
reconnection attempts were exceeded without delivering a response.
"""
# Bail if max retries exceeded
if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
if attempt >= MAX_RECONNECTION_ATTEMPTS:
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
return
return False

# Always wait - use server value or default
delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS
Expand Down Expand Up @@ -419,15 +433,15 @@ async def _handle_reconnection(
)
if is_complete:
await event_source.response.aclose()
return
return True

# Stream ended again without response - reconnect again (reset attempt counter)
# Stream ended again without response - reconnect again
logger.info("SSE stream disconnected, reconnecting...")
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
return await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, attempt + 1)
except Exception as e: # pragma: no cover
logger.debug(f"Reconnection failed: {e}")
# Try to reconnect again if we still have an event ID
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
return await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)

async def post_writer(
self,
Expand Down
Loading