Skip to content

Remove batching support from StreamableHttpServerTransport #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override async Task SendMessageAsync(JsonRpcMessage message, Cancellation
id = messageWithId.Id.ToString();
}

var json = JsonSerializer.Serialize(message, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage)));
var json = JsonSerializer.Serialize(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage);

using var _ = await _sendLock.LockAsync(cancellationToken).ConfigureAwait(false);
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public async ValueTask<bool> RunAsync(CancellationToken cancellationToken)
// The incomingChannel is null to handle the potential client GET request to handle unsolicited JsonRpcMessages.
if (incomingChannel is not null)
{
await OnPostBodyReceivedAsync(httpBodies.Input, cancellationToken).ConfigureAwait(false);
var message = await JsonSerializer.DeserializeAsync(httpBodies.Input.AsStream(),
McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false);
await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false);
}

if (_pendingRequests.Count == 0)
Expand Down Expand Up @@ -72,24 +74,6 @@ public async ValueTask DisposeAsync()
}
}

private async ValueTask OnPostBodyReceivedAsync(PipeReader streamableHttpRequestBody, CancellationToken cancellationToken)
{
if (!await IsJsonArrayAsync(streamableHttpRequestBody, cancellationToken).ConfigureAwait(false))
{
var message = await JsonSerializer.DeserializeAsync(streamableHttpRequestBody.AsStream(), McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false);
await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false);
}
else
{
// Batched JSON-RPC message
var messages = JsonSerializer.DeserializeAsyncEnumerable(streamableHttpRequestBody.AsStream(), McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false);
await foreach (var message in messages.WithCancellation(cancellationToken))
{
await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false);
}
}
}

private async ValueTask OnMessageReceivedAsync(JsonRpcMessage? message, CancellationToken cancellationToken)
{
if (message is null)
Expand All @@ -108,27 +92,4 @@ private async ValueTask OnMessageReceivedAsync(JsonRpcMessage? message, Cancella
Throw.IfNull(incomingChannel);
await incomingChannel.WriteAsync(message, cancellationToken).ConfigureAwait(false);
}

private async ValueTask<bool> IsJsonArrayAsync(PipeReader requestBody, CancellationToken cancellationToken)
{
// REVIEW: Should we bother trimming whitespace before checking for '['?
var firstCharacterResult = await requestBody.ReadAtLeastAsync(1, cancellationToken).ConfigureAwait(false);

try
{
if (firstCharacterResult.Buffer.Length == 0)
{
return false;
}

Span<byte> firstCharBuffer = stackalloc byte[1];
firstCharacterResult.Buffer.Slice(0, 1).CopyTo(firstCharBuffer);
return firstCharBuffer[0] == (byte)'[';
}
finally
{
// Never consume data when checking for '['. System.Text.Json still needs to consume it.
requestBody.AdvanceTo(firstCharacterResult.Buffer.Start);
}
}
}
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Shared/McpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public McpSession(
StdioClientSessionTransport or StdioServerTransport => "stdio",
StreamClientSessionTransport or StreamServerTransport => "stream",
SseClientSessionTransport or SseResponseStreamTransport => "sse",
StreamableHttpServerTransport or StreamableHttpPostTransport => "http",
StreamableHttpClientSessionTransport or StreamableHttpServerTransport or StreamableHttpPostTransport => "http",
_ => "unknownTransport"
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,39 +182,6 @@ public async Task InitializeJsonRpcRequest_IsHandled_WithCompleteSseResponse()
await CallInitializeAndValidateAsync();
}

[Fact]
public async Task BatchedJsonRpcRequests_IsHandled_WithCompleteSseResponse()
{
await StartAsync();

using var response = await HttpClient.PostAsync("", JsonContent($"[{InitializeRequest},{EchoRequest}]"), TestContext.Current.CancellationToken);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);

var eventCount = 0;
await foreach (var sseEvent in ReadSseAsync(response.Content))
{
var jsonRpcResponse = JsonSerializer.Deserialize(sseEvent, GetJsonTypeInfo<JsonRpcResponse>());
Assert.NotNull(jsonRpcResponse);
var responseId = Assert.IsType<long>(jsonRpcResponse.Id.Id);

switch (responseId)
{
case 1:
AssertServerInfo(jsonRpcResponse);
break;
case 2:
AssertEchoResponse(jsonRpcResponse);
break;
default:
throw new Exception($"Unexpected response ID: {jsonRpcResponse.Id}");
}

eventCount++;
}

Assert.Equal(2, eventCount);
}

[Fact]
public async Task SingleJsonRpcRequest_ThatThrowsIsHandled_WithCompleteSseResponse()
{
Expand Down