Skip to content

Update WatcherDelegatingHandler to stop eating first line (#183) #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 76 additions & 12 deletions src/KubernetesClient/WatcherDelegatingHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.IO;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand All @@ -24,8 +26,8 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);

if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
{
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
{
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
}
}
return originResponse;
Expand All @@ -41,18 +43,19 @@ public LineSeparatedHttpContent(HttpContent originContent)
_originContent = originContent;
}

internal StreamReader StreamReader { get; private set; }
internal PeekableStreamReader StreamReader { get; private set; }

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
_originStream = await _originContent.ReadAsStreamAsync();

StreamReader = new StreamReader(_originStream);

var firstLine = await StreamReader.ReadLineAsync();
var writer = new StreamWriter(stream);

// using (writer) // leave open
StreamReader = new PeekableStreamReader(_originStream);

var firstLine = await StreamReader.PeekLineAsync();

var writer = new StreamWriter(stream);

// using (writer) // leave open
{
await writer.WriteAsync(firstLine);
await writer.FlushAsync();
Expand All @@ -64,6 +67,67 @@ protected override bool TryComputeLength(out long length)
length = 0;
return false;
}
}
internal class PeekableStreamReader : StreamReader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that if someone calls any of the other ReadBlock, Read etc methods this is going to break...

At the very least let's override those messages and have them throw an exception so that we detect that it's happening...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. I have updated the pull request to override all the remaining Read methods to throw NotImplementedException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brendanburns The build is failing but I have no clue why and I don't have permission to re-trigger the build. Would you please point me to what should I check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to re-trigger the build and all pass now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brendanburns, Was wondering if you have time to review the updated request

{
private Queue<string> _buffer;
public PeekableStreamReader(Stream stream) : base(stream)
{
_buffer = new Queue<string>();
}

public override string ReadLine()
{
if (_buffer.Count > 0)
{
return _buffer.Dequeue();
}
return base.ReadLine();
}
public override Task<string> ReadLineAsync()
{
if (_buffer.Count > 0)
{
return Task.FromResult(_buffer.Dequeue());
}
return base.ReadLineAsync();
}
public async Task<string> PeekLineAsync()
{
var line = await ReadLineAsync();
_buffer.Enqueue(line);
return line;
}

public override int Read()
{
throw new NotImplementedException();
}

public override int Read(char[] buffer, int index, int count)
{
throw new NotImplementedException();
}
public override Task<int> ReadAsync(char[] buffer, int index, int count)
{
throw new NotImplementedException();
}
public override int ReadBlock(char[] buffer, int index, int count)
{
throw new NotImplementedException();
}
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
throw new NotImplementedException();
}
public override string ReadToEnd()
{
throw new NotImplementedException();
}
public override Task<string> ReadToEndAsync()
{
throw new NotImplementedException();
}
}
}
}
}
}
9 changes: 4 additions & 5 deletions tests/KubernetesClient.Tests/WatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ await Assert.ThrowsAnyAsync<Exception>(() =>
[Fact]
public async Task SuriveBadLine()
{
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();

Expand Down Expand Up @@ -148,7 +148,7 @@ public async Task SuriveBadLine()
Assert.Contains(WatchEventType.Added, events);
Assert.Contains(WatchEventType.Modified, events);

Assert.Equal(2, errors);
Assert.Equal(3, errors);

Assert.True(watcher.Watching);

Expand Down Expand Up @@ -236,7 +236,6 @@ public async Task WatchAllEvents()

using (var server = new MockKubeApiServer(testOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await WriteStreamLine(httpContext, MockDeletedStreamLine);
await WriteStreamLine(httpContext, MockModifiedStreamLine);
Expand Down Expand Up @@ -303,7 +302,7 @@ public async Task WatchAllEvents()
[Fact]
public async Task WatchEventsWithTimeout()
{
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();

Expand Down Expand Up @@ -362,7 +361,7 @@ public async Task WatchEventsWithTimeout()
Assert.Contains(WatchEventType.Modified, events);
Assert.Contains(WatchEventType.Error, events);

Assert.Equal(0, errors);
Assert.Equal(1, errors);

Assert.True(watcher.Watching);

Expand Down