Skip to content

Unexpected behaviour of GetContinuousChangesAsync #198

Closed
@nefarius

Description

@nefarius

So consider this simple background worker snippet listening for changes:

public class EventsChangeListener : BackgroundService
{
    private readonly EventsContext _context;
    private readonly ILogger<EventsChangeListener> _logger;

    public EventsChangeListener(EventsContext context, ILogger<EventsChangeListener> logger)
    {
        _context = context;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        ChangesFeedOptions opts = new() { IncludeDocs = true, Since = "now" };

        _logger.LogInformation("Starting change listener for {Source}", nameof(_context.Events));

        await foreach (ChangesFeedResponseResult<Doc> change in _context.Events.GetContinuousChangesAsync(
                           opts, null, stoppingToken))
        {
            _logger.LogInformation("Got change for ID {Id}, changes: {@Changes}", change.Id, change.Changes);
        }
    }
}

According to the documentation, this is the correct and expected usage (correct me if I'm wrong) however the loop can be exited at seemingly random intervals.

I was scratching my head over why my C# app wasn't getting change notifications consistently as a similar NodeJS app was.

Turns out that GetContinuousChangesAsync stops returning elements when the underlying stream gets closed, which on further inspection makes sense, however is not clear to the caller.

Is this a bug or "a feature" that simply should be documented better? A workaround could be as simple as:

        while (!stoppingToken.IsCancellationRequested)
        {
            await foreach (ChangesFeedResponseResult<Doc> change in _context.Events.GetContinuousChangesAsync(
                               opts, null, stoppingToken))
            {
                _logger.LogInformation("Got change for ID {Id}, changes: {@Changes}", change.Id, change.Changes);
            }
        }

I do not like this approach though because this allows for changes getting lost; if changes occur between the last connection closure and the time it takes to open a new socket (with ChangesFeedOptions opts = new() { Since = "now" }) those will not be returned.

Shouldn't a proper implementation of GetContinuousChangesAsync resume on its own indefinitely until cancellation is signalled and take the last successful change sequence into account?

Thanks for reading, cheers!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions