Skip to content

Commit 2084a38

Browse files
leavezgaoji
andauthored
Feat: Impl Server-Side Streamable HTTP transport (#273)
* Create logger.go * remove duplicated key * impl streamable http server * update * Update streamable_http.go * update protocol version * use HTTPContextFunc * impl SessionWithTools for sessions * rename * rename * add more validation * fix * add test * ut * Update http_transport_options.go * Update streamable_http.go * doc * doc * remove shared options for sse * doc * doc * ut * fix race * ut * doc * fix * Update README.md * Update streamable_http.go * update demo * fix compile * fix lint --------- Co-authored-by: gaoji <[email protected]>
1 parent 99720bb commit 2084a38

File tree

12 files changed

+1419
-265
lines changed

12 files changed

+1419
-265
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,10 @@ For examples, see the `examples/` directory.
531531

532532
## Extras
533533

534+
### Transports
535+
536+
MCP-Go supports stdio, SSE and streamable-HTTP transport layers.
537+
534538
### Session Management
535539

536540
MCP-Go provides a robust session management system that allows you to:

client/transport/streamable_http.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func (c *StreamableHTTP) Close() error {
127127
}
128128

129129
const (
130-
initializeMethod = "initialize"
131130
headerKeySessionID = "Mcp-Session-Id"
132131
)
133132

@@ -198,7 +197,7 @@ func (c *StreamableHTTP) SendRequest(
198197
return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, body)
199198
}
200199

201-
if request.Method == initializeMethod {
200+
if request.Method == string(mcp.MethodInitialize) {
202201
// saved the received session ID in the response
203202
// empty session ID is allowed
204203
if sessionID := resp.Header.Get(headerKeySessionID); sessionID != "" {

examples/custom_context/main.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,8 @@ func NewMCPServer() *MCPServer {
122122
}
123123
}
124124

125-
func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
126-
return server.NewSSEServer(s.server,
127-
server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
125+
func (s *MCPServer) ServeHTTP() *server.StreamableHTTPServer {
126+
return server.NewStreamableHTTPServer(s.server,
128127
server.WithHTTPContextFunc(authFromRequest),
129128
)
130129
}
@@ -135,12 +134,12 @@ func (s *MCPServer) ServeStdio() error {
135134

136135
func main() {
137136
var transport string
138-
flag.StringVar(&transport, "t", "stdio", "Transport type (stdio or sse)")
137+
flag.StringVar(&transport, "t", "stdio", "Transport type (stdio or http)")
139138
flag.StringVar(
140139
&transport,
141140
"transport",
142141
"stdio",
143-
"Transport type (stdio or sse)",
142+
"Transport type (stdio or http)",
144143
)
145144
flag.Parse()
146145

@@ -151,15 +150,15 @@ func main() {
151150
if err := s.ServeStdio(); err != nil {
152151
log.Fatalf("Server error: %v", err)
153152
}
154-
case "sse":
155-
sseServer := s.ServeSSE("localhost:8080")
156-
log.Printf("SSE server listening on :8080")
157-
if err := sseServer.Start(":8080"); err != nil {
153+
case "http":
154+
httpServer := s.ServeHTTP()
155+
log.Printf("HTTP server listening on :8080")
156+
if err := httpServer.Start(":8080"); err != nil {
158157
log.Fatalf("Server error: %v", err)
159158
}
160159
default:
161160
log.Fatalf(
162-
"Invalid transport type: %s. Must be 'stdio' or 'sse'",
161+
"Invalid transport type: %s. Must be 'stdio' or 'http'",
163162
transport,
164163
)
165164
}

examples/everything/main.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -475,17 +475,17 @@ func handleNotification(
475475

476476
func main() {
477477
var transport string
478-
flag.StringVar(&transport, "t", "stdio", "Transport type (stdio or sse)")
479-
flag.StringVar(&transport, "transport", "stdio", "Transport type (stdio or sse)")
478+
flag.StringVar(&transport, "t", "stdio", "Transport type (stdio or http)")
479+
flag.StringVar(&transport, "transport", "stdio", "Transport type (stdio or http)")
480480
flag.Parse()
481481

482482
mcpServer := NewMCPServer()
483483

484-
// Only check for "sse" since stdio is the default
485-
if transport == "sse" {
486-
sseServer := server.NewSSEServer(mcpServer, server.WithBaseURL("http://localhost:8080"))
487-
log.Printf("SSE server listening on :8080")
488-
if err := sseServer.Start(":8080"); err != nil {
484+
// Only check for "http" since stdio is the default
485+
if transport == "http" {
486+
httpServer := server.NewStreamableHTTPServer(mcpServer)
487+
log.Printf("HTTP server listening on :8080/mcp")
488+
if err := httpServer.Start(":8080"); err != nil {
489489
log.Fatalf("Server error: %v", err)
490490
}
491491
} else {

examples/simple_client/main.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import (
1717
func main() {
1818
// Define command line flags
1919
stdioCmd := flag.String("stdio", "", "Command to execute for stdio transport (e.g. 'python server.py')")
20-
sseURL := flag.String("sse", "", "URL for SSE transport (e.g. 'http://localhost:8080/sse')")
20+
httpURL := flag.String("http", "", "URL for HTTP transport (e.g. 'http://localhost:8080/mcp')")
2121
flag.Parse()
2222

2323
// Validate flags
24-
if (*stdioCmd == "" && *sseURL == "") || (*stdioCmd != "" && *sseURL != "") {
25-
fmt.Println("Error: You must specify exactly one of --stdio or --sse")
24+
if (*stdioCmd == "" && *httpURL == "") || (*stdioCmd != "" && *httpURL != "") {
25+
fmt.Println("Error: You must specify exactly one of --stdio or --http")
2626
flag.Usage()
2727
os.Exit(1)
2828
}
@@ -51,11 +51,6 @@ func main() {
5151
// Create stdio transport with verbose logging
5252
stdioTransport := transport.NewStdio(command, nil, cmdArgs...)
5353

54-
// Start the transport
55-
if err := stdioTransport.Start(ctx); err != nil {
56-
log.Fatalf("Failed to start stdio transport: %v", err)
57-
}
58-
5954
// Create client with the transport
6055
c = client.NewClient(stdioTransport)
6156

@@ -78,20 +73,20 @@ func main() {
7873
}()
7974
}
8075
} else {
81-
fmt.Println("Initializing SSE client...")
82-
// Create SSE transport
83-
sseTransport, err := transport.NewSSE(*sseURL)
76+
fmt.Println("Initializing HTTP client...")
77+
// Create HTTP transport
78+
httpTransport, err := transport.NewStreamableHTTP(*httpURL)
8479
if err != nil {
85-
log.Fatalf("Failed to create SSE transport: %v", err)
86-
}
87-
88-
// Start the transport
89-
if err := sseTransport.Start(ctx); err != nil {
90-
log.Fatalf("Failed to start SSE transport: %v", err)
80+
log.Fatalf("Failed to create HTTP transport: %v", err)
9181
}
9282

9383
// Create client with the transport
94-
c = client.NewClient(sseTransport)
84+
c = client.NewClient(httpTransport)
85+
}
86+
87+
// Start the client
88+
if err := c.Start(ctx); err != nil {
89+
log.Fatalf("Failed to start client: %v", err)
9590
}
9691

9792
// Set up notification handler

mcp/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (t *URITemplate) UnmarshalJSON(data []byte) error {
9696
type JSONRPCMessage any
9797

9898
// LATEST_PROTOCOL_VERSION is the most recent version of the MCP protocol.
99-
const LATEST_PROTOCOL_VERSION = "2024-11-05"
99+
const LATEST_PROTOCOL_VERSION = "2025-03-26"
100100

101101
// JSONRPC_VERSION is the version of JSON-RPC used by MCP.
102102
const JSONRPC_VERSION = "2.0"

server/http_transport_options.go

Lines changed: 0 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -3,187 +3,9 @@ package server
33
import (
44
"context"
55
"net/http"
6-
"net/url"
7-
"strings"
8-
"time"
96
)
107

118
// HTTPContextFunc is a function that takes an existing context and the current
129
// request and returns a potentially modified context based on the request
1310
// content. This can be used to inject context values from headers, for example.
1411
type HTTPContextFunc func(ctx context.Context, r *http.Request) context.Context
15-
16-
// httpTransportConfigurable is an internal interface for shared HTTP transport configuration.
17-
type httpTransportConfigurable interface {
18-
setBasePath(string)
19-
setDynamicBasePath(DynamicBasePathFunc)
20-
setKeepAliveInterval(time.Duration)
21-
setKeepAlive(bool)
22-
setContextFunc(HTTPContextFunc)
23-
setHTTPServer(*http.Server)
24-
setBaseURL(string)
25-
}
26-
27-
// HTTPTransportOption is a function that configures an httpTransportConfigurable.
28-
type HTTPTransportOption func(httpTransportConfigurable)
29-
30-
// Option interfaces and wrappers for server configuration
31-
// Base option interface
32-
type HTTPServerOption interface {
33-
isHTTPServerOption()
34-
}
35-
36-
// SSE-specific option interface
37-
type SSEOption interface {
38-
HTTPServerOption
39-
applyToSSE(*SSEServer)
40-
}
41-
42-
// StreamableHTTP-specific option interface
43-
type StreamableHTTPOption interface {
44-
HTTPServerOption
45-
applyToStreamableHTTP(*StreamableHTTPServer)
46-
}
47-
48-
// Common options that work with both server types
49-
type CommonHTTPServerOption interface {
50-
SSEOption
51-
StreamableHTTPOption
52-
}
53-
54-
// Wrapper for SSE-specific functional options
55-
type sseOption func(*SSEServer)
56-
57-
func (o sseOption) isHTTPServerOption() {}
58-
func (o sseOption) applyToSSE(s *SSEServer) { o(s) }
59-
60-
// Wrapper for StreamableHTTP-specific functional options
61-
type streamableHTTPOption func(*StreamableHTTPServer)
62-
63-
func (o streamableHTTPOption) isHTTPServerOption() {}
64-
func (o streamableHTTPOption) applyToStreamableHTTP(s *StreamableHTTPServer) { o(s) }
65-
66-
// Refactor commonOption to use a single apply func(httpTransportConfigurable)
67-
type commonOption struct {
68-
apply func(httpTransportConfigurable)
69-
}
70-
71-
func (o commonOption) isHTTPServerOption() {}
72-
func (o commonOption) applyToSSE(s *SSEServer) { o.apply(s) }
73-
func (o commonOption) applyToStreamableHTTP(s *StreamableHTTPServer) { o.apply(s) }
74-
75-
// TODO: This is a stub implementation of StreamableHTTPServer just to show how
76-
// to use it with the new options interfaces.
77-
type StreamableHTTPServer struct{}
78-
79-
// Add stub methods to satisfy httpTransportConfigurable
80-
81-
func (s *StreamableHTTPServer) setBasePath(string) {}
82-
func (s *StreamableHTTPServer) setDynamicBasePath(DynamicBasePathFunc) {}
83-
func (s *StreamableHTTPServer) setKeepAliveInterval(time.Duration) {}
84-
func (s *StreamableHTTPServer) setKeepAlive(bool) {}
85-
func (s *StreamableHTTPServer) setContextFunc(HTTPContextFunc) {}
86-
func (s *StreamableHTTPServer) setHTTPServer(srv *http.Server) {}
87-
func (s *StreamableHTTPServer) setBaseURL(baseURL string) {}
88-
89-
// Ensure the option types implement the correct interfaces
90-
var (
91-
_ httpTransportConfigurable = (*StreamableHTTPServer)(nil)
92-
_ SSEOption = sseOption(nil)
93-
_ StreamableHTTPOption = streamableHTTPOption(nil)
94-
_ CommonHTTPServerOption = commonOption{}
95-
)
96-
97-
// WithStaticBasePath adds a new option for setting a static base path.
98-
// This is useful for mounting the server at a known, fixed path.
99-
func WithStaticBasePath(basePath string) CommonHTTPServerOption {
100-
return commonOption{
101-
apply: func(c httpTransportConfigurable) {
102-
c.setBasePath(basePath)
103-
},
104-
}
105-
}
106-
107-
// DynamicBasePathFunc allows the user to provide a function to generate the
108-
// base path for a given request and sessionID. This is useful for cases where
109-
// the base path is not known at the time of SSE server creation, such as when
110-
// using a reverse proxy or when the base path is dynamically generated. The
111-
// function should return the base path (e.g., "/mcp/tenant123").
112-
type DynamicBasePathFunc func(r *http.Request, sessionID string) string
113-
114-
// WithDynamicBasePath accepts a function for generating the base path.
115-
// This is useful for cases where the base path is not known at the time of server creation,
116-
// such as when using a reverse proxy or when the server is mounted at a dynamic path.
117-
func WithDynamicBasePath(fn DynamicBasePathFunc) CommonHTTPServerOption {
118-
return commonOption{
119-
apply: func(c httpTransportConfigurable) {
120-
c.setDynamicBasePath(fn)
121-
},
122-
}
123-
}
124-
125-
// WithKeepAliveInterval sets the keep-alive interval for the transport.
126-
// When enabled, the server will periodically send ping events to keep the connection alive.
127-
func WithKeepAliveInterval(interval time.Duration) CommonHTTPServerOption {
128-
return commonOption{
129-
apply: func(c httpTransportConfigurable) {
130-
c.setKeepAliveInterval(interval)
131-
},
132-
}
133-
}
134-
135-
// WithKeepAlive enables or disables keep-alive for the transport.
136-
// When enabled, the server will send periodic keep-alive events to clients.
137-
func WithKeepAlive(keepAlive bool) CommonHTTPServerOption {
138-
return commonOption{
139-
apply: func(c httpTransportConfigurable) {
140-
c.setKeepAlive(keepAlive)
141-
},
142-
}
143-
}
144-
145-
// WithHTTPContextFunc sets a function that will be called to customize the context
146-
// for the server using the incoming request. This is useful for injecting
147-
// context values from headers or other request properties.
148-
func WithHTTPContextFunc(fn HTTPContextFunc) CommonHTTPServerOption {
149-
return commonOption{
150-
apply: func(c httpTransportConfigurable) {
151-
c.setContextFunc(fn)
152-
},
153-
}
154-
}
155-
156-
// WithBaseURL sets the base URL for the HTTP transport server.
157-
// This is useful for configuring the externally visible base URL for clients.
158-
func WithBaseURL(baseURL string) CommonHTTPServerOption {
159-
return commonOption{
160-
apply: func(c httpTransportConfigurable) {
161-
if baseURL != "" {
162-
u, err := url.Parse(baseURL)
163-
if err != nil {
164-
return
165-
}
166-
if u.Scheme != "http" && u.Scheme != "https" {
167-
return
168-
}
169-
if u.Host == "" || strings.HasPrefix(u.Host, ":") {
170-
return
171-
}
172-
if len(u.Query()) > 0 {
173-
return
174-
}
175-
}
176-
c.setBaseURL(strings.TrimSuffix(baseURL, "/"))
177-
},
178-
}
179-
}
180-
181-
// WithHTTPServer sets the HTTP server instance for the transport.
182-
// This is useful for advanced scenarios where you want to provide your own http.Server.
183-
func WithHTTPServer(srv *http.Server) CommonHTTPServerOption {
184-
return commonOption{
185-
apply: func(c httpTransportConfigurable) {
186-
c.setHTTPServer(srv)
187-
},
188-
}
189-
}

0 commit comments

Comments
 (0)