Skip to content

Commit 674cb15

Browse files
authored
Make supposedly unreachable code less reachable (#178)
* Move WriteJsonRpcMessageToBuffer to method * Fix indentation in McpClient * Make supposedly unreachable code less reachable * Guard against multiple RunAsync calls * Move RunAsync inside of Try in MapMcp
1 parent 25bcb44 commit 674cb15

File tree

7 files changed

+94
-80
lines changed

7 files changed

+94
-80
lines changed

src/ModelContextProtocol.AspNetCore/McpEndpointRouteBuilderExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo
4747
{
4848
throw new Exception($"Unreachable given good entropy! Session with ID '{sessionId}' has already been created.");
4949
}
50-
await using var server = McpServerFactory.Create(transport, mcpServerOptions.Value, loggerFactory, endpoints.ServiceProvider);
5150

5251
try
5352
{
5453
var transportTask = transport.RunAsync(cancellationToken: requestAborted);
54+
await using var server = McpServerFactory.Create(transport, mcpServerOptions.Value, loggerFactory, endpoints.ServiceProvider);
5555

5656
try
5757
{
@@ -85,7 +85,7 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo
8585

8686
if (!_sessions.TryGetValue(sessionId.ToString(), out var transport))
8787
{
88-
await Results.BadRequest($"Session {sessionId} not found.").ExecuteAsync(context);
88+
await Results.BadRequest($"Session ID not found.").ExecuteAsync(context);
8989
return;
9090
}
9191

src/ModelContextProtocol/Client/McpClient.cs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,27 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default)
8282
{
8383
// Connect transport
8484
_sessionTransport = await _clientTransport.ConnectAsync(cancellationToken).ConfigureAwait(false);
85-
// We don't want the ConnectAsync token to cancel the session after we've successfully connected.
86-
// The base class handles cleaning up the session in DisposeAsync without our help.
87-
StartSession(_sessionTransport, fullSessionCancellationToken: CancellationToken.None);
85+
StartSession(_sessionTransport);
8886

8987
// Perform initialization sequence
9088
using var initializationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
9189
initializationCts.CancelAfter(_options.InitializationTimeout);
9290

93-
try
94-
{
95-
// Send initialize request
96-
var initializeResponse = await SendRequestAsync<InitializeResult>(
97-
new JsonRpcRequest
98-
{
99-
Method = RequestMethods.Initialize,
100-
Params = new InitializeRequestParams()
91+
try
92+
{
93+
// Send initialize request
94+
var initializeResponse = await SendRequestAsync<InitializeResult>(
95+
new JsonRpcRequest
10196
{
102-
ProtocolVersion = _options.ProtocolVersion,
103-
Capabilities = _options.Capabilities ?? new ClientCapabilities(),
104-
ClientInfo = _options.ClientInfo
105-
}
106-
},
107-
initializationCts.Token).ConfigureAwait(false);
97+
Method = RequestMethods.Initialize,
98+
Params = new InitializeRequestParams()
99+
{
100+
ProtocolVersion = _options.ProtocolVersion,
101+
Capabilities = _options.Capabilities ?? new ClientCapabilities(),
102+
ClientInfo = _options.ClientInfo
103+
}
104+
},
105+
initializationCts.Token).ConfigureAwait(false);
108106

109107
// Store server information
110108
_logger.ServerCapabilitiesReceived(EndpointName,

src/ModelContextProtocol/Protocol/Transport/SseResponseStreamTransport.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,30 @@ public sealed class SseResponseStreamTransport(Stream sseResponseStream, string
3232
/// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
3333
public Task RunAsync(CancellationToken cancellationToken)
3434
{
35-
void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer)
36-
{
37-
if (item.EventType == "endpoint")
38-
{
39-
writer.Write(Encoding.UTF8.GetBytes(messageEndpoint));
40-
return;
41-
}
42-
43-
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
44-
}
45-
46-
IsConnected = true;
47-
4835
// The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
4936
// so we fib and special-case the "endpoint" event type in the formatter.
5037
if (!_outgoingSseChannel.Writer.TryWrite(new SseItem<IJsonRpcMessage?>(null, "endpoint")))
5138
{
5239
throw new InvalidOperationException($"You must call ${nameof(RunAsync)} before calling ${nameof(SendMessageAsync)}.");
5340
}
5441

42+
IsConnected = true;
43+
5544
var sseItems = _outgoingSseChannel.Reader.ReadAllAsync(cancellationToken);
5645
return _sseWriteTask = SseFormatter.WriteAsync(sseItems, sseResponseStream, WriteJsonRpcMessageToBuffer, cancellationToken);
5746
}
5847

48+
private void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer)
49+
{
50+
if (item.EventType == "endpoint")
51+
{
52+
writer.Write(Encoding.UTF8.GetBytes(messageEndpoint));
53+
return;
54+
}
55+
56+
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
57+
}
58+
5959
/// <inheritdoc/>
6060
public ChannelReader<IJsonRpcMessage> MessageReader => _incomingChannel.Reader;
6161

src/ModelContextProtocol/Server/McpServer.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ internal sealed class McpServer : McpJsonRpcEndpoint, IMcpServer
1414
private readonly EventHandler? _toolsChangedDelegate;
1515
private readonly EventHandler? _promptsChangedDelegate;
1616

17-
private ITransport _sessionTransport;
1817
private string _endpointName;
18+
private int _started;
1919

2020
/// <summary>
2121
/// Creates a new instance of <see cref="McpServer"/>.
@@ -32,7 +32,6 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
3232
Throw.IfNull(transport);
3333
Throw.IfNull(options);
3434

35-
_sessionTransport = transport;
3635
ServerOptions = options;
3736
Services = serviceProvider;
3837
_endpointName = $"Server ({options.ServerInfo.Name} {options.ServerInfo.Version})";
@@ -74,6 +73,8 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
7473
SetPromptsHandler(options);
7574
SetResourcesHandler(options);
7675
SetSetLoggingLevelHandler(options);
76+
77+
StartSession(transport);
7778
}
7879

7980
public ServerCapabilities? ServerCapabilities { get; set; }
@@ -96,11 +97,16 @@ public McpServer(ITransport transport, McpServerOptions options, ILoggerFactory?
9697
/// <inheritdoc />
9798
public async Task RunAsync(CancellationToken cancellationToken = default)
9899
{
100+
if (Interlocked.Exchange(ref _started, 1) != 0)
101+
{
102+
throw new InvalidOperationException($"{nameof(RunAsync)} must only be called once.");
103+
}
104+
99105
try
100106
{
101-
// Start processing messages
102-
StartSession(_sessionTransport, fullSessionCancellationToken: cancellationToken);
103-
await MessageProcessingTask.ConfigureAwait(false);
107+
using var _ = cancellationToken.Register(static s => ((McpServer)s!).CancelSession(), this);
108+
// The McpServer ctor always calls StartSession, so MessageProcessingTask is always set.
109+
await MessageProcessingTask!.ConfigureAwait(false);
104110
}
105111
finally
106112
{

src/ModelContextProtocol/Shared/McpJsonRpcEndpoint.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
2222

2323
private McpSession? _session;
2424
private CancellationTokenSource? _sessionCts;
25-
private int _started;
2625

2726
private readonly SemaphoreSlim _disposeLock = new(1, 1);
2827
private bool _disposed;
@@ -61,18 +60,15 @@ public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancella
6160
protected Task? MessageProcessingTask { get; set; }
6261

6362
[MemberNotNull(nameof(MessageProcessingTask))]
64-
protected void StartSession(ITransport sessionTransport, CancellationToken fullSessionCancellationToken = default)
63+
protected void StartSession(ITransport sessionTransport)
6564
{
66-
if (Interlocked.Exchange(ref _started, 1) != 0)
67-
{
68-
throw new InvalidOperationException("The MCP session has already stared.");
69-
}
70-
71-
_sessionCts = CancellationTokenSource.CreateLinkedTokenSource(fullSessionCancellationToken);
65+
_sessionCts = new CancellationTokenSource();
7266
_session = new McpSession(sessionTransport, EndpointName, _requestHandlers, _notificationHandlers, _logger);
7367
MessageProcessingTask = _session.ProcessMessagesAsync(_sessionCts.Token);
7468
}
7569

70+
protected void CancelSession() => _sessionCts?.Cancel();
71+
7672
public async ValueTask DisposeAsync()
7773
{
7874
using var _ = await _disposeLock.LockAsync().ConfigureAwait(false);

tests/ModelContextProtocol.Tests/Server/McpServerFactoryTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
using ModelContextProtocol.Protocol.Transport;
2-
using ModelContextProtocol.Protocol.Types;
1+
using ModelContextProtocol.Protocol.Types;
32
using ModelContextProtocol.Server;
43
using ModelContextProtocol.Tests.Utils;
5-
using Moq;
64

75
namespace ModelContextProtocol.Tests.Server;
86

@@ -25,7 +23,8 @@ public McpServerFactoryTests(ITestOutputHelper testOutputHelper)
2523
public async Task Create_Should_Initialize_With_Valid_Parameters()
2624
{
2725
// Arrange & Act
28-
await using IMcpServer server = McpServerFactory.Create(Mock.Of<ITransport>(), _options, LoggerFactory);
26+
await using var transport = new TestServerTransport();
27+
await using IMcpServer server = McpServerFactory.Create(transport, _options, LoggerFactory);
2928

3029
// Assert
3130
Assert.NotNull(server);
@@ -39,9 +38,10 @@ public void Create_Throws_For_Null_ServerTransport()
3938
}
4039

4140
[Fact]
42-
public void Create_Throws_For_Null_Options()
41+
public async Task Create_Throws_For_Null_Options()
4342
{
4443
// Arrange, Act & Assert
45-
Assert.Throws<ArgumentNullException>("serverOptions", () => McpServerFactory.Create(Mock.Of<ITransport>(), null!, LoggerFactory));
44+
await using var transport = new TestServerTransport();
45+
Assert.Throws<ArgumentNullException>("serverOptions", () => McpServerFactory.Create(transport, null!, LoggerFactory));
4646
}
4747
}

0 commit comments

Comments
 (0)