Skip to content
This repository was archived by the owner on Mar 20, 2024. It is now read-only.

Commit cfb64c4

Browse files
hossambarakatJonJam
authored andcommitted
Update WatcherDelegatingHandler to stop eating first line (kubernetes-client#183) (kubernetes-client#190)
* Update WatcherDelegatingHandler to stop eating first line (kubernetes-client#183) * Override Read methods in PeekableStreamReader to avoid unpredicted behaviour (kubernetes-client#183) Override Read methods in PeekableStreamReader to avoid unpredicted behaviour (kubernetes-client#183)
1 parent c18d8b2 commit cfb64c4

File tree

2 files changed

+137
-5
lines changed

2 files changed

+137
-5
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using System.Net;
6+
using System.Net.Http;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.AspNetCore.WebUtilities;
10+
11+
namespace k8s
12+
{
13+
/// <summary>
14+
/// This HttpDelegatingHandler is to rewrite the response and return first line to autorest client
15+
/// then use WatchExt to create a watch object which interact with the replaced http response to get watch works.
16+
/// </summary>
17+
internal class WatcherDelegatingHandler : DelegatingHandler
18+
{
19+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
20+
CancellationToken cancellationToken)
21+
{
22+
var originResponse = await base.SendAsync(request, cancellationToken);
23+
24+
if (originResponse.IsSuccessStatusCode)
25+
{
26+
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);
27+
28+
if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
29+
{
30+
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
31+
}
32+
}
33+
return originResponse;
34+
}
35+
36+
internal class LineSeparatedHttpContent : HttpContent
37+
{
38+
private readonly HttpContent _originContent;
39+
private Stream _originStream;
40+
41+
public LineSeparatedHttpContent(HttpContent originContent)
42+
{
43+
_originContent = originContent;
44+
}
45+
46+
internal PeekableStreamReader StreamReader { get; private set; }
47+
48+
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
49+
{
50+
_originStream = await _originContent.ReadAsStreamAsync();
51+
52+
StreamReader = new PeekableStreamReader(_originStream);
53+
54+
var firstLine = await StreamReader.PeekLineAsync();
55+
56+
var writer = new StreamWriter(stream);
57+
58+
// using (writer) // leave open
59+
{
60+
await writer.WriteAsync(firstLine);
61+
await writer.FlushAsync();
62+
}
63+
}
64+
65+
protected override bool TryComputeLength(out long length)
66+
{
67+
length = 0;
68+
return false;
69+
}
70+
}
71+
internal class PeekableStreamReader : StreamReader
72+
{
73+
private Queue<string> _buffer;
74+
public PeekableStreamReader(Stream stream) : base(stream)
75+
{
76+
_buffer = new Queue<string>();
77+
}
78+
79+
public override string ReadLine()
80+
{
81+
if (_buffer.Count > 0)
82+
{
83+
return _buffer.Dequeue();
84+
}
85+
return base.ReadLine();
86+
}
87+
public override Task<string> ReadLineAsync()
88+
{
89+
if (_buffer.Count > 0)
90+
{
91+
return Task.FromResult(_buffer.Dequeue());
92+
}
93+
return base.ReadLineAsync();
94+
}
95+
public async Task<string> PeekLineAsync()
96+
{
97+
var line = await ReadLineAsync();
98+
_buffer.Enqueue(line);
99+
return line;
100+
}
101+
102+
public override int Read()
103+
{
104+
throw new NotImplementedException();
105+
}
106+
107+
public override int Read(char[] buffer, int index, int count)
108+
{
109+
throw new NotImplementedException();
110+
}
111+
public override Task<int> ReadAsync(char[] buffer, int index, int count)
112+
{
113+
throw new NotImplementedException();
114+
}
115+
public override int ReadBlock(char[] buffer, int index, int count)
116+
{
117+
throw new NotImplementedException();
118+
}
119+
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
120+
{
121+
throw new NotImplementedException();
122+
}
123+
public override string ReadToEnd()
124+
{
125+
throw new NotImplementedException();
126+
}
127+
public override Task<string> ReadToEndAsync()
128+
{
129+
throw new NotImplementedException();
130+
}
131+
}
132+
}
133+
}

tests/KubernetesClient.Tests/WatchTests.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ await Assert.ThrowsAnyAsync<Exception>(() =>
8686
[Fact]
8787
public async Task SuriveBadLine()
8888
{
89-
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
89+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
9090
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
9191
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
9292

@@ -148,7 +148,7 @@ public async Task SuriveBadLine()
148148
Assert.Contains(WatchEventType.Added, events);
149149
Assert.Contains(WatchEventType.Modified, events);
150150

151-
Assert.Equal(2, errors);
151+
Assert.Equal(3, errors);
152152

153153
Assert.True(watcher.Watching);
154154

@@ -236,7 +236,6 @@ public async Task WatchAllEvents()
236236

237237
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
238238
{
239-
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
240239
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
241240
await WriteStreamLine(httpContext, MockDeletedStreamLine);
242241
await WriteStreamLine(httpContext, MockModifiedStreamLine);
@@ -303,7 +302,7 @@ public async Task WatchAllEvents()
303302
[Fact]
304303
public async Task WatchEventsWithTimeout()
305304
{
306-
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
305+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
307306
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
308307
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
309308

@@ -362,7 +361,7 @@ public async Task WatchEventsWithTimeout()
362361
Assert.Contains(WatchEventType.Modified, events);
363362
Assert.Contains(WatchEventType.Error, events);
364363

365-
Assert.Equal(0, errors);
364+
Assert.Equal(1, errors);
366365

367366
Assert.True(watcher.Watching);
368367

0 commit comments

Comments
 (0)