Skip to content

Commit 143992c

Browse files
Merge pull request #126 from TransactionProcessing/task/#125_getlatestversionfromlastevent
Added GetLatestVersionFromLastEvent
2 parents 432dfa3 + 2afa17b commit 143992c

4 files changed

Lines changed: 76 additions & 4 deletions

File tree

Shared.EventStore/Aggregate/AggregateRepository.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,29 @@ public async Task<TAggregate> GetLatestVersion(Guid aggregateId,
6464
return this.ProcessEvents(aggregate, resolvedEvents);
6565
}
6666

67+
/// <summary>
68+
/// Gets the latest version from last event.
69+
/// </summary>
70+
/// <param name="aggregateId">The aggregate identifier.</param>
71+
/// <param name="cancellationToken">The cancellation token.</param>
72+
/// <returns></returns>
73+
public async Task<TAggregate> GetLatestVersionFromLastEvent(Guid aggregateId,
74+
CancellationToken cancellationToken)
75+
{
76+
TAggregate aggregate = new()
77+
{
78+
AggregateId = aggregateId
79+
};
80+
81+
String streamName = AggregateRepository<TAggregate, TDomainEvent>.GetStreamName(aggregate.AggregateId);
82+
83+
IList<ResolvedEvent> events = await this.EventStoreContext.GetEventsBackwardAsync(streamName, 1, cancellationToken);
84+
85+
aggregate = this.ProcessEvents(aggregate, events);
86+
87+
return aggregate;
88+
}
89+
6790
/// <summary>
6891
/// Gets the name of the stream.
6992
/// </summary>

Shared.EventStore/Aggregate/IAggregateRepository.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ Task<TAggregate> GetLatestVersion(Guid aggregateId,
2828
Task SaveChanges(TAggregate aggregate,
2929
CancellationToken cancellationToken);
3030

31+
/// <summary>
32+
/// Gets the latest version from last event.
33+
/// </summary>
34+
/// <param name="aggregateId">The aggregate identifier.</param>
35+
/// <returns></returns>
36+
Task<TAggregate> GetLatestVersionFromLastEvent(Guid aggregateId, CancellationToken cancellationToken);
37+
3138
#endregion
3239
}
3340
}

Shared.EventStore/EventStore/EventStoreContext.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,38 @@ public async Task<String> GetPartitionResultFromProjection(String projectionName
8080

8181
}
8282

83+
/// <summary>
84+
/// Gets the events backwards asynchronous.
85+
/// </summary>
86+
/// <param name="streamName">Name of the stream.</param>
87+
/// <param name="maxNumberOfEventsToRetrieve">The maximum number of events to retrieve.</param>
88+
/// <param name="cancellationToken">The cancellation token.</param>
89+
/// <returns></returns>
90+
public async Task<IList<ResolvedEvent>> GetEventsBackwardAsync(String streamName,
91+
Int32 maxNumberOfEventsToRetrieve,
92+
CancellationToken cancellationToken)
93+
{
94+
List<ResolvedEvent> resolvedEvents = new();
95+
96+
EventStoreClient.ReadStreamResult response = this.EventStoreClient.ReadStreamAsync(Direction.Backwards,
97+
streamName,
98+
StreamPosition.End,
99+
maxNumberOfEventsToRetrieve,
100+
resolveLinkTos: true,
101+
cancellationToken: cancellationToken);
102+
103+
if (await response.ReadState == ReadState.StreamNotFound)
104+
{
105+
return resolvedEvents;
106+
}
107+
108+
List<ResolvedEvent> events = await response.ToListAsync(cancellationToken);
109+
110+
resolvedEvents.AddRange(events);
111+
112+
return resolvedEvents;
113+
}
114+
83115
/// <summary>
84116
/// Gets the partition state from projection.
85117
/// </summary>
@@ -162,7 +194,7 @@ public async Task InsertEvents(String streamName,
162194
this.LogInformation($"About to append {aggregateEvents.Count} to Stream {streamName}");
163195
await this.EventStoreClient.AppendToStreamAsync(streamName, StreamRevision.FromInt64(expectedVersion), aggregateEvents.AsEnumerable(), cancellationToken: cancellationToken);
164196
}
165-
197+
166198
/// <summary>
167199
/// Reads the events.
168200
/// </summary>

Shared.EventStore/EventStore/IEventStoreContext.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Collections.Generic;
55
using System.Threading;
66
using System.Threading.Tasks;
7-
using DomainDrivenDesign.EventSourcing;
87
using global::EventStore.Client;
98

109
public interface IEventStoreContext
@@ -20,6 +19,17 @@ public interface IEventStoreContext
2019

2120
#region Methods
2221

22+
/// <summary>
23+
/// Gets the events backwards asynchronous.
24+
/// </summary>
25+
/// <param name="streamName">Name of the stream.</param>
26+
/// <param name="maxNumberOfEventsToRetrieve">The maximum number of events to retrieve.</param>
27+
/// <param name="cancellationToken">The cancellation token.</param>
28+
/// <returns></returns>
29+
Task<IList<ResolvedEvent>> GetEventsBackwardAsync(String streamName,
30+
Int32 maxNumberOfEventsToRetrieve,
31+
CancellationToken cancellationToken);
32+
2333
/// <summary>
2434
/// Gets the partition result from projection.
2535
/// </summary>
@@ -96,8 +106,8 @@ Task InsertEvents(String streamName,
96106
/// <param name="cancellationToken">The cancellation token.</param>
97107
/// <returns></returns>
98108
Task<List<ResolvedEvent>> ReadEvents(String streamName,
99-
Int64 fromVersion,
100-
CancellationToken cancellationToken);
109+
Int64 fromVersion,
110+
CancellationToken cancellationToken);
101111

102112
#endregion
103113
}

0 commit comments

Comments
 (0)