Skip to content

Commit 87a8677

Browse files
committed
Attempts to fix #198
1 parent 0eeab6d commit 87a8677

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

src/CouchDB.Driver/CouchDatabase.cs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -459,33 +459,45 @@ public async IAsyncEnumerable<ChangesFeedResponseResult<TSource>> GetContinuousC
459459
request = request.ApplyQueryParametersOptions(options);
460460
}
461461

462-
await using Stream stream = filter == null
463-
? await request.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
464-
.ConfigureAwait(false)
465-
: await request.QueryContinuousWithFilterAsync<TSource>(_queryProvider, filter, cancellationToken)
466-
.ConfigureAwait(false);
467-
468-
await foreach (var line in stream.ReadLinesAsync(cancellationToken))
462+
do
469463
{
470-
if (string.IsNullOrEmpty(line))
471-
{
472-
continue;
473-
}
474-
475-
MatchCollection matches = _feedChangeLineStartPattern.Matches(line);
476-
for (var i = 0; i < matches.Count; i++)
464+
await using Stream stream = filter == null
465+
? await request.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
466+
.ConfigureAwait(false)
467+
: await request.QueryContinuousWithFilterAsync<TSource>(_queryProvider, filter, cancellationToken)
468+
.ConfigureAwait(false);
469+
470+
var lastSequence = options?.Since ?? "0";
471+
472+
await foreach (var line in stream.ReadLinesAsync(cancellationToken))
477473
{
478-
var startIndex = matches[i].Index;
479-
var endIndex = i < matches.Count - 1 ? matches[i + 1].Index : line.Length;
480-
var lineLength = endIndex - startIndex;
481-
var substring = line.Substring(startIndex, lineLength);
482-
ChangesFeedResponseResult<TSource>? result = JsonConvert.DeserializeObject<ChangesFeedResponseResult<TSource>>(substring);
483-
if (string.IsNullOrWhiteSpace(_discriminator) || result.Document.SplitDiscriminator == _discriminator)
474+
if (string.IsNullOrEmpty(line))
475+
{
476+
continue;
477+
}
478+
479+
MatchCollection matches = _feedChangeLineStartPattern.Matches(line);
480+
for (var i = 0; i < matches.Count; i++)
484481
{
485-
yield return result;
482+
var startIndex = matches[i].Index;
483+
var endIndex = i < matches.Count - 1 ? matches[i + 1].Index : line.Length;
484+
var lineLength = endIndex - startIndex;
485+
var substring = line.Substring(startIndex, lineLength);
486+
ChangesFeedResponseResult<TSource>? result =
487+
JsonConvert.DeserializeObject<ChangesFeedResponseResult<TSource>>(substring);
488+
if (string.IsNullOrWhiteSpace(_discriminator) ||
489+
result.Document.SplitDiscriminator == _discriminator)
490+
{
491+
lastSequence = result.Seq;
492+
yield return result;
493+
}
486494
}
487495
}
488-
}
496+
497+
// stream broke, pick up listening after last successful processed sequence
498+
request = request.SetQueryParam("since", lastSequence);
499+
500+
} while (!cancellationToken.IsCancellationRequested);
489501
}
490502

491503
#endregion

0 commit comments

Comments
 (0)