Skip to content

feat: client-side streamable-http transport supports resumability #380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

FH-30
Copy link

@FH-30 FH-30 commented Jun 9, 2025

Description

Implemented for client-side streamable-http transport

  • option to attempt resumption up till max retry attempts on failure to get final JSON-RPC Response when server sends back an SSE response stream

Fixes #<issue_number> (if applicable)

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • MCP spec compatibility implementation
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Tests only (no functional changes)
  • Other (please describe):

Checklist

  • My code follows the code style of this project
  • I have performed a self-review of my own code
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the documentation accordingly

MCP Spec Compliance

  • This PR implements a feature defined in the MCP specification
  • Link to relevant spec section: Link text
  • Implementation follows the specification exactly

Additional Information

Will add the tests in a later commit

Summary by CodeRabbit

  • New Features

    • Added support for automatic resumption of interrupted Server-Sent Events (SSE) connections, with configurable retry attempts to ensure seamless streaming and minimize event loss.
  • Style

    • Removed unnecessary blank lines in test files for improved code readability.

Copy link
Contributor

coderabbitai bot commented Jun 9, 2025

Walkthrough

This update introduces support for resuming interrupted Server-Sent Events (SSE) streams in the StreamableHTTP client by tracking the last received event ID and retrying failed connections. It adds a new resumption option, related methods, and updates SSE event handling to propagate event IDs. Minor whitespace cleanups are also included in test files.

Changes

Files Change Summary
client/transport/streamable_http.go Added SSE resumption support: new option, event ID tracking, retry logic, new methods, updated SSE event parsing and handler signatures.
client/transport/sse_test.go, client/transport/streamable_http_test.go Removed extraneous blank lines in SSE-related test cases.

Possibly related PRs

Suggested labels

type: enhancement, area: mcp spec

Suggested reviewers

  • rwjblue-glean

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (1.64.8)

Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2
Failed executing command with error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
client/transport/streamable_http.go (2)

334-340: Improve retry loop readability and error message specificity

The retry logic is correct but could be more idiomatic and the error message more specific.

-		for range c.maxRetryCount {
-			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
-			if err == nil || lastEventId == "" || !canRetry {
-				return resp, err
-			}
-		}
-		return nil, fmt.Errorf("failed to retrieve response after attempting resumption %d times", c.maxRetryCount)
+		for i := 0; i < c.maxRetryCount; i++ {
+			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
+			if err == nil || lastEventId == "" || !canRetry {
+				return resp, err
+			}
+		}
+		return nil, fmt.Errorf("failed to resume SSE stream after %d retry attempts", c.maxRetryCount)

57-437: Consider adding tests for the resumption functionality

The resumption implementation is well-designed and follows the MCP specification. However, I notice that tests for this new functionality are not included in this PR. As mentioned in the PR objectives, tests are planned for a future commit.

Would you like me to help generate comprehensive test cases for the resumption functionality, including:

  • Successful resumption scenarios
  • Retry exhaustion cases
  • Error handling during resumption
  • Null byte filtering in event IDs
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c7a09d and 21760bc.

📒 Files selected for processing (3)
  • client/transport/sse_test.go (2 hunks)
  • client/transport/streamable_http.go (10 hunks)
  • client/transport/streamable_http_test.go (2 hunks)
🔇 Additional comments (6)
client/transport/streamable_http_test.go (1)

420-420: LGTM - Whitespace cleanup

Removing trailing blank lines improves code consistency.

Also applies to: 440-440

client/transport/sse_test.go (1)

411-411: LGTM - Consistent whitespace cleanup

Removing trailing blank lines to maintain consistency with the changes in streamable_http_test.go.

Also applies to: 413-413, 452-452

client/transport/streamable_http.go (4)

57-75: Well-implemented resumption option

The WithResumption function properly validates input and follows the established pattern for configuration options. Good documentation referencing the MCP specification.


90-96: Appropriate struct fields for resumption support

The new fields resumptionEnabled and maxRetryCount are well-placed and properly support the resumption feature.


346-437: Excellent implementation of SSE resumption logic

The performSSEResumption method is comprehensive and handles all edge cases properly:

  • Correct use of GET request with Last-Event-Id header
  • Appropriate error handling with meaningful canRetry flag
  • Consistent OAuth and session management

559-566: Good security consideration for event ID handling

Excellent implementation that properly filters null bytes to prevent HTTP header injection vulnerabilities. The comment clearly explains the security rationale.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
client/transport/streamable_http.go (1)

372-374: Validate empty lastEventId parameter handling.

The check for empty lastEventId is good, but this condition should theoretically never occur based on the calling logic in SendRequest. Consider adding a more descriptive error message or debug logging.

-	if lastEventId == "" {
-		return nil, fmt.Errorf("sse resumption request requires a last event id"), false
-	}
+	if lastEventId == "" {
+		return nil, fmt.Errorf("sse resumption request requires a last event id (this should not happen)"), false
+	}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21760bc and da980e8.

📒 Files selected for processing (1)
  • client/transport/streamable_http.go (10 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/streamable_http.go (3)
client/transport/interface.go (2)
  • HTTPHeaderFunc (13-13)
  • JSONRPCResponse (41-50)
mcp/types.go (1)
  • JSONRPCResponse (324-328)
client/oauth.go (1)
  • OAuthAuthorizationRequiredError (61-61)
🔇 Additional comments (9)
client/transport/streamable_http.go (9)

57-74: Well-implemented resumption configuration option.

The function properly validates the retry count and provides clear documentation referencing the MCP specification. The parameter validation ensuring maxRetryCount >= 1 prevents invalid configurations.


90-95: Struct fields properly integrated for resumption support.

The new fields resumptionEnabled and maxRetryCount are appropriately added to support the resumption functionality without breaking existing code.


181-182: Header constants follow existing patterns.

The new headerKeyLastEventID constant is properly defined following the same naming convention as the existing headerKeySessionID.


346-439: Comprehensive resumption method with proper error handling.

The performSSEResumption method is well-implemented with appropriate OAuth handling, session management, and error classification via the canRetry boolean return value.


443-447: Method signature change is appropriate for resumption support.

The addition of the resumptionCallback parameter maintains backward compatibility since this is an internal method and enables the resumption functionality.


470-472: Event ID tracking correctly integrated.

The resumption callback is properly invoked when an event ID is present, enabling the retry mechanism to track progress through the SSE stream.


561-567: Excellent security practice with null byte filtering.

The null byte filtering prevents HTTP header injection attacks when the event ID is sent back in the Last-Event-Id header. This is a crucial security consideration.


507-571: Enhanced SSE parsing with proper event ID handling.

The modifications to readSSE correctly parse the id: field from SSE streams and properly reset the ID after each event. The implementation follows the SSE specification accurately.


323-340:

✅ Verification successful

Verify retry loop termination conditions.

The resumption logic looks correct, but ensure the loop termination is robust. The current implementation could potentially retry on non-recoverable errors.

Consider adding exponential backoff or different retry strategies:

-		for range c.maxRetryCount {
+		for attempt := 1; attempt <= c.maxRetryCount; attempt++ {
+			if attempt > 1 {
+				// Add exponential backoff delay
+				time.Sleep(time.Duration(attempt) * 100 * time.Millisecond)
+			}
 			resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)

🏁 Script executed:

#!/bin/bash
# Search for similar retry patterns in the codebase to ensure consistency
rg -A 5 -B 5 "for.*range.*maxRetry" --type go

Length of output: 781


🏁 Script executed:

#!/bin/bash
# Locate the declaration of maxRetryCount in client/transport/streamable_http.go
rg -n "maxRetryCount" client/transport/streamable_http.go

Length of output: 385


Retry loop termination verified—no infinite retries
The loop runs exactly c.maxRetryCount times and exits early on success (err == nil), missing IDs (lastEventId == ""), or non-recoverable errors (canRetry == false). No change is strictly required.

Optional improvement: switch to an index-based loop to add exponential backoff:

-   for range c.maxRetryCount {
+   for attempt := 1; attempt <= c.maxRetryCount; attempt++ {
+       if attempt > 1 {
+           time.Sleep(time.Duration(attempt) * 100 * time.Millisecond)
+       }
        resp, err, canRetry := c.performSSEResumption(ctx, lastEventId, resumptionCallback)
        if err == nil || lastEventId == "" || !canRetry {
            return resp, err
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant