Skip to content

Commit 9ae3cb3

Browse files
qmfrederikk8s-ci-robot
authored andcommitted
Fix build warnings in the StreamDemuxerTests (#259)
* Make sure StreamDemuxer.Start() returns immediately. * Make MockWebSocket.RecieveAsync cancellable * Fix staging
1 parent 4bcfaeb commit 9ae3cb3

File tree

3 files changed

+24
-19
lines changed

3 files changed

+24
-19
lines changed

src/KubernetesClient/StreamDemuxer.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ public Stream GetStream(byte? inputIndex, byte? outputIndex)
182182

183183
protected async Task RunLoop(CancellationToken cancellationToken)
184184
{
185+
// This is a background task. Immediately yield to the caller.
186+
await Task.Yield();
187+
185188
// Get a 1KB buffer
186189
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 1024);
187190
// This maps remembers bytes skipped for each stream.

tests/KubernetesClient.Tests/Mock/MockWebSocket.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using Nito.AsyncEx;
12
using System;
23
using System.Collections.Concurrent;
34
using System.Net.WebSockets;
@@ -13,7 +14,7 @@ public class MockWebSocket : WebSocket
1314
private WebSocketState state;
1415
private string subProtocol;
1516
private ConcurrentQueue<MessageData> receiveBuffers = new ConcurrentQueue<MessageData>();
16-
private AutoResetEvent receiveEvent = new AutoResetEvent(false);
17+
private AsyncAutoResetEvent receiveEvent = new AsyncAutoResetEvent(false);
1718

1819
public MockWebSocket(string subProtocol = null)
1920
{
@@ -78,12 +79,13 @@ public override void Dispose()
7879
this.receiveEvent.Set();
7980
}
8081

81-
public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
82+
public override async Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
8283
{
8384
if (this.receiveBuffers.Count == 0)
8485
{
85-
this.receiveEvent.WaitOne();
86+
await this.receiveEvent.WaitAsync(cancellationToken).ConfigureAwait(false);
8687
}
88+
8789
int bytesReceived = 0;
8890
bool endOfMessage = true;
8991
WebSocketMessageType messageType = WebSocketMessageType.Close;
@@ -107,7 +109,8 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
107109
received.Buffer = received.Buffer.Slice(buffer.Count);
108110
}
109111
}
110-
return Task.FromResult(new WebSocketReceiveResult(bytesReceived, messageType, endOfMessage));
112+
113+
return new WebSocketReceiveResult(bytesReceived, messageType, endOfMessage);
111114
}
112115

113116
public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)

tests/KubernetesClient.Tests/StreamDemuxerTests.cs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ public StreamDemuxerTests(ITestOutputHelper testOutput)
2424
public async Task SendDataRemoteCommand()
2525
{
2626
using (MockWebSocket ws = new MockWebSocket())
27+
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
2728
{
2829
List<byte> sentBuffer = new List<byte>();
2930
ws.MessageSent += (sender, args) =>
3031
{
3132
sentBuffer.AddRange(args.Data.Buffer);
3233
};
3334

34-
StreamDemuxer demuxer = new StreamDemuxer(ws);
35-
Task.Run(() => demuxer.Start());
35+
demuxer.Start();
3636

3737
byte channelIndex = 12;
3838
var stream = demuxer.GetStream(channelIndex, channelIndex);
@@ -50,15 +50,15 @@ public async Task SendDataRemoteCommand()
5050
public async Task SendMultipleDataRemoteCommand()
5151
{
5252
using (MockWebSocket ws = new MockWebSocket())
53+
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
5354
{
5455
List<byte> sentBuffer = new List<byte>();
5556
ws.MessageSent += (sender, args) =>
5657
{
5758
sentBuffer.AddRange(args.Data.Buffer);
5859
};
5960

60-
StreamDemuxer demuxer = new StreamDemuxer(ws);
61-
Task.Run(() => demuxer.Start());
61+
demuxer.Start();
6262

6363
byte channelIndex = 12;
6464
var stream = demuxer.GetStream(channelIndex, channelIndex);
@@ -80,9 +80,9 @@ public async Task SendMultipleDataRemoteCommand()
8080
public async Task ReceiveDataRemoteCommand()
8181
{
8282
using (MockWebSocket ws = new MockWebSocket())
83+
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
8384
{
84-
StreamDemuxer demuxer = new StreamDemuxer(ws);
85-
Task.Run(() => demuxer.Start());
85+
demuxer.Start();
8686

8787
List<byte> receivedBuffer = new List<byte>();
8888
byte channelIndex = 12;
@@ -129,9 +129,9 @@ public async Task ReceiveDataRemoteCommand()
129129
public async Task ReceiveDataPortForward()
130130
{
131131
using (MockWebSocket ws = new MockWebSocket())
132+
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
132133
{
133-
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
134-
Task.Run(() => demuxer.Start());
134+
demuxer.Start();
135135

136136
List<byte> receivedBuffer = new List<byte>();
137137
byte channelIndex = 12;
@@ -179,9 +179,9 @@ public async Task ReceiveDataPortForward()
179179
public async Task ReceiveDataPortForwardOneByteMessage()
180180
{
181181
using (MockWebSocket ws = new MockWebSocket())
182+
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
182183
{
183-
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
184-
Task.Run(() => demuxer.Start());
184+
demuxer.Start();
185185

186186
List<byte> receivedBuffer = new List<byte>();
187187
byte channelIndex = 12;
@@ -227,9 +227,9 @@ public async Task ReceiveDataPortForwardOneByteMessage()
227227
public async Task ReceiveDataRemoteCommandMultipleStream()
228228
{
229229
using (MockWebSocket ws = new MockWebSocket())
230+
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
230231
{
231-
StreamDemuxer demuxer = new StreamDemuxer(ws);
232-
Task.Run(() => demuxer.Start());
232+
demuxer.Start();
233233

234234
List<byte> receivedBuffer1 = new List<byte>();
235235
byte channelIndex1 = 1;
@@ -304,9 +304,9 @@ public async Task ReceiveDataRemoteCommandMultipleStream()
304304
public async Task ReceiveDataPortForwardMultipleStream()
305305
{
306306
using (MockWebSocket ws = new MockWebSocket())
307+
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
307308
{
308-
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
309-
Task.Run(() => demuxer.Start());
309+
demuxer.Start();
310310

311311
List<byte> receivedBuffer1 = new List<byte>();
312312
byte channelIndex1 = 1;
@@ -379,7 +379,6 @@ public async Task ReceiveDataPortForwardMultipleStream()
379379
}
380380
}
381381

382-
383382
private static byte[] GenerateRandomBuffer(int length, byte channelIndex, byte content, bool portForward)
384383
{
385384
var buffer = GenerateRandomBuffer(length, content);

0 commit comments

Comments
 (0)