@@ -65,6 +65,8 @@ type SSEServer struct {
65
65
66
66
keepAlive bool
67
67
keepAliveInterval time.Duration
68
+
69
+ mu sync.RWMutex
68
70
}
69
71
70
72
// SSEOption defines a function type for configuring SSEServer
@@ -189,18 +191,25 @@ func NewTestServer(server *MCPServer, opts ...SSEOption) *httptest.Server {
189
191
// Start begins serving SSE connections on the specified address.
190
192
// It sets up HTTP handlers for SSE and message endpoints.
191
193
func (s * SSEServer ) Start (addr string ) error {
194
+ s .mu .Lock ()
192
195
s .srv = & http.Server {
193
196
Addr : addr ,
194
197
Handler : s ,
195
198
}
199
+ s .mu .Unlock ()
196
200
197
201
return s .srv .ListenAndServe ()
198
202
}
199
203
200
204
// Shutdown gracefully stops the SSE server, closing all active sessions
201
205
// and shutting down the HTTP server.
202
206
func (s * SSEServer ) Shutdown (ctx context.Context ) error {
203
- if s .srv != nil {
207
+ s .mu .RLock ()
208
+ srv := s .srv
209
+ s .mu .RUnlock ()
210
+
211
+ if srv != nil {
212
+ // 关闭所有会话
204
213
s .sessions .Range (func (key , value interface {}) bool {
205
214
if session , ok := value .(* sseSession ); ok {
206
215
close (session .done )
@@ -209,7 +218,7 @@ func (s *SSEServer) Shutdown(ctx context.Context) error {
209
218
return true
210
219
})
211
220
212
- return s . srv .Shutdown (ctx )
221
+ return srv .Shutdown (ctx )
213
222
}
214
223
return nil
215
224
}
@@ -336,7 +345,9 @@ func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
336
345
return
337
346
}
338
347
348
+ s .mu .RLock ()
339
349
sessionI , ok := s .sessions .Load (sessionID )
350
+ s .mu .RUnlock ()
340
351
if ! ok {
341
352
s .writeJSONRPCError (w , nil , mcp .INVALID_PARAMS , "Invalid session ID" )
342
353
return
0 commit comments