Skip to content

Watcher: notify the caller when the server closes the connection #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gen/KubernetesWatchGenerator/IKubernetes.Watch.cs.template
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ namespace k8s
/// <param name="onError">
/// The action to invoke when an error occurs.
/// </param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
/// </param>
Expand All @@ -55,6 +58,7 @@ namespace k8s
Dictionary<string, List<string>> customHeaders = null,
Action<WatchEventType, {{GetClassName operation}}> onEvent = null,
Action<Exception> onError = null,
Action onClosed = null,
CancellationToken cancellationToken = default(CancellationToken));

{{/.}}
Expand Down
3 changes: 2 additions & 1 deletion gen/KubernetesWatchGenerator/Kubernetes.Watch.cs.template
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ namespace k8s
Dictionary<string, List<string>> customHeaders = null,
Action<WatchEventType, {{GetClassName operation}}> onEvent = null,
Action<Exception> onError = null,
Action onClosed = null,
CancellationToken cancellationToken = default(CancellationToken))
{
string path = $"{{GetPathExpression .}}";
return WatchObjectAsync<{{GetClassName operation}}>(path: path, @continue: @continue, fieldSelector: fieldSelector, includeUninitialized: includeUninitialized, labelSelector: labelSelector, limit: limit, pretty: pretty, timeoutSeconds: timeoutSeconds, resourceVersion: resourceVersion, customHeaders: customHeaders, onEvent: onEvent, onError: onError, cancellationToken: cancellationToken);
return WatchObjectAsync<{{GetClassName operation}}>(path: path, @continue: @continue, fieldSelector: fieldSelector, includeUninitialized: includeUninitialized, labelSelector: labelSelector, limit: limit, pretty: pretty, timeoutSeconds: timeoutSeconds, resourceVersion: resourceVersion, customHeaders: customHeaders, onEvent: onEvent, onError: onError, onClosed: onClosed, cancellationToken: cancellationToken);
}

{{/.}}
Expand Down
5 changes: 4 additions & 1 deletion src/KubernetesClient/IKubernetes.Watch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ public partial interface IKubernetes
/// <param name="onError">
/// The action to invoke when an error occurs.
/// </param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
/// </param>
/// <returns>
/// A <see cref="Task"/> which represents the asynchronous operation, and returns a new watcher.
/// </returns>
Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, CancellationToken cancellationToken = default(CancellationToken));
Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, Action onClosed = null, CancellationToken cancellationToken = default(CancellationToken));
}
}
4 changes: 2 additions & 2 deletions src/KubernetesClient/Kubernetes.Watch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace k8s
public partial class Kubernetes
{
/// <inheritdoc/>
public async Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, CancellationToken cancellationToken = default(CancellationToken))
public async Task<Watcher<T>> WatchObjectAsync<T>(string path, string @continue = null, string fieldSelector = null, bool? includeUninitialized = null, string labelSelector = null, int? limit = null, bool? pretty = null, int? timeoutSeconds = null, string resourceVersion = null, Dictionary<string, List<string>> customHeaders = null, Action<WatchEventType, T> onEvent = null, Action<Exception> onError = null, Action onClosed = null, CancellationToken cancellationToken = default(CancellationToken))
{
// Tracing
bool _shouldTrace = ServiceClientTracing.IsEnabled;
Expand Down Expand Up @@ -152,7 +152,7 @@ public partial class Kubernetes
var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
StreamReader reader = new StreamReader(stream);

return new Watcher<T>(reader, onEvent, onError);
return new Watcher<T>(reader, onEvent, onError, onClosed);
}
}
}
162 changes: 98 additions & 64 deletions src/KubernetesClient/Watcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,68 +29,36 @@ public class Watcher<T> : IDisposable
public bool Watching { get; private set; }

private readonly CancellationTokenSource _cts;
private readonly StreamReader _streamReader;

public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
private readonly StreamReader _streamReader;
private readonly Task _watcherLoop;

/// <summary>
/// Initializes a new instance of the <see cref="Watcher{T}"/> class.
/// </summary>
/// <param name="streamReader">
/// A <see cref="StreamReader"/> from which to read the events.
/// </param>
/// <param name="onEvent">
/// The action to invoke when the server sends a new event.
/// </param>
/// <param name="onError">
/// The action to invoke when an error occurs.
/// </param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
{
_streamReader = streamReader;
OnEvent += onEvent;
OnError += onError;

_cts = new CancellationTokenSource();

var token = _cts.Token;

Task.Run(async () =>
{
try
{
Watching = true;

while (!streamReader.EndOfStream)
{
if (token.IsCancellationRequested)
{
return;
}

var line = await streamReader.ReadLineAsync();
OnError += onError;
OnClosed += onClosed;

try
{
var genericEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<KubernetesObject>.WatchEvent>(line);

if (genericEvent.Object.Kind == "Status")
{
var statusEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<V1Status>.WatchEvent>(line);
var exception = new KubernetesException(statusEvent.Object);
this.OnError?.Invoke(exception);
}
else
{
var @event = SafeJsonConvert.DeserializeObject<k8s.Watcher<T>.WatchEvent>(line);
this.OnEvent?.Invoke(@event.Type, @event.Object);
}
}
catch (Exception e)
{
// error if deserialized failed or onevent throws
OnError?.Invoke(e);
}
}
}
catch (Exception e)
{
// error when transport error, IOException ect
OnError?.Invoke(e);
}
finally
{
Watching = false;
}
}, token);
_cts = new CancellationTokenSource();
_watcherLoop = this.WatcherLoop(_cts.Token);
}


/// <inheritdoc/>
public void Dispose()
{
_cts.Cancel();
Expand All @@ -105,13 +73,71 @@ public void Dispose()
/// <summary>
/// add/remove callbacks when any exception was caught during watching
/// </summary>
public event Action<Exception> OnError;
public event Action<Exception> OnError;

/// <summary>
/// The event which is raised when the server closes th econnection.
/// </summary>
public event Action OnClosed;

public class WatchEvent
{
public WatchEventType Type { get; set; }

public T Object { get; set; }
}

private async Task WatcherLoop(CancellationToken cancellationToken)
{
// Make sure we run async
await Task.Yield();

try
{
Watching = true;
string line;

// ReadLineAsync will return null when we've reached the end of the stream.
while ((line = await this._streamReader.ReadLineAsync().ConfigureAwait(false)) != null)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

try
{
var genericEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<KubernetesObject>.WatchEvent>(line);

if (genericEvent.Object.Kind == "Status")
{
var statusEvent = SafeJsonConvert.DeserializeObject<k8s.Watcher<V1Status>.WatchEvent>(line);
var exception = new KubernetesException(statusEvent.Object);
this.OnError?.Invoke(exception);
}
else
{
var @event = SafeJsonConvert.DeserializeObject<k8s.Watcher<T>.WatchEvent>(line);
this.OnEvent?.Invoke(@event.Type, @event.Object);
}
}
catch (Exception e)
{
// error if deserialized failed or onevent throws
OnError?.Invoke(e);
}
}
}
catch (Exception e)
{
// error when transport error, IOException ect
OnError?.Invoke(e);
}
finally
{
Watching = false;
OnClosed?.Invoke();
}
}
}

Expand All @@ -123,18 +149,22 @@ public static class WatcherExt
/// <typeparam name="T">type of the event object</typeparam>
/// <param name="response">the api response</param>
/// <param name="onEvent">a callback when any event raised from api server</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
/// <returns>a watch object</returns>
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
Action<Exception> onError = null,
Action onClosed = null)
{
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
{
throw new KubernetesClientException("not a watchable request or failed response");
}

return new Watcher<T>(content.StreamReader, onEvent, onError);
return new Watcher<T>(content.StreamReader, onEvent, onError, onClosed);
}

/// <summary>
Expand All @@ -143,13 +173,17 @@ public static Watcher<T> Watch<T>(this HttpOperationResponse response,
/// <typeparam name="T">type of the event object</typeparam>
/// <param name="response">the api response</param>
/// <param name="onEvent">a callback when any event raised from api server</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
/// <returns>a watch object</returns>
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
Action<Exception> onError = null,
Action onClosed = null)
{
return Watch((HttpOperationResponse) response, onEvent, onError);
return Watch((HttpOperationResponse)response, onEvent, onError, onClosed);
}
}
}
Loading