Skip to content

Commit a84083a

Browse files
committed
[DEVEX-227] Refactored State Store extracting interface and moving the overloads to extension methods
1 parent cc77373 commit a84083a

File tree

3 files changed

+104
-50
lines changed

3 files changed

+104
-50
lines changed

src/Kurrent.Client/Streams/DecisionMaking/Aggregate.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,36 @@
33

44
namespace Kurrent.Client.Streams.DecisionMaking;
55

6-
public interface IAggregate<TEvent>: IState<TEvent>{
6+
public interface IAggregate<in TEvent> : IState<TEvent> {
77
Message[] DequeueUncommittedMessages();
88
}
99

10-
public class Aggregate : Aggregate<object>;
10+
public interface IAggregate : IAggregate<object>;
1111

12-
public abstract class Aggregate<TEvent>: IAggregate<TEvent> where TEvent : notnull {
12+
public class Aggregate : Aggregate<object>, IAggregate;
13+
14+
public abstract class Aggregate<TEvent> : IAggregate<TEvent> where TEvent : notnull {
1315
readonly Queue<Message> _uncommittedEvents = new();
1416

1517
public virtual void Apply(TEvent @event) { }
1618

17-
Message[] IAggregate<TEvent>.DequeueUncommittedMessages()
18-
{
19+
Message[] IAggregate<TEvent>.DequeueUncommittedMessages() {
1920
var dequeuedEvents = _uncommittedEvents.ToArray();
2021

2122
_uncommittedEvents.Clear();
2223

2324
return dequeuedEvents;
2425
}
2526

26-
protected void Enqueue(TEvent @event) {
27-
Apply(@event);
28-
_uncommittedEvents.Enqueue(Message.From(@event));
27+
protected void Enqueue(TEvent message) {
28+
Apply(message);
29+
_uncommittedEvents.Enqueue(Message.From(message));
2930
}
3031

3132
protected void Enqueue(Message message) {
32-
Apply((TEvent)message.Data);
33+
if (message.Data is TEvent @event)
34+
Apply(@event);
35+
3336
_uncommittedEvents.Enqueue(message);
3437
}
3538
}

src/Kurrent.Client/Streams/DecisionMaking/AggregateStore.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Task<IWriteResult> HandleAsync(
3535

3636
public static class AggregateStoreExtensions {
3737
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
38-
IAggregateStore<TAggregate, TEvent> aggregateStore,
38+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
3939
string streamName,
4040
TAggregate aggregate,
4141
CancellationToken ct = default
@@ -48,7 +48,7 @@ public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
4848
);
4949

5050
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
51-
IAggregateStore<TAggregate, TEvent> aggregateStore,
51+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
5252
string streamName,
5353
Func<TAggregate, CancellationToken, ValueTask> handle,
5454
CancellationToken ct = default
@@ -61,7 +61,7 @@ public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
6161
);
6262

6363
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
64-
IAggregateStore<TAggregate, TEvent> aggregateStore,
64+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
6565
string streamName,
6666
Action<TAggregate> handle,
6767
CancellationToken ct = default
@@ -77,7 +77,7 @@ public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
7777
);
7878

7979
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
80-
IAggregateStore<TAggregate, TEvent> aggregateStore,
80+
this IAggregateStore<TAggregate, TEvent> aggregateStore,
8181
string streamName,
8282
Action<TAggregate> handle,
8383
DecideOptions<TAggregate>? decideOption,

src/Kurrent.Client/Streams/DecisionMaking/StateStore.cs

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,81 +13,132 @@ public class StateStoreOptions<TState> where TState : notnull {
1313
public GetStreamStateOptions<TState>? GetStreamStateOptions { get; set; }
1414
}
1515

16-
public class StateStore<TState, TEvent>(KurrentClient client, StateStoreOptions<TState> options)
16+
public interface IStateStore<TState, in TEvent>
1717
where TState : notnull
1818
where TEvent : notnull {
19-
public Task<StateAtPointInTime<TState>> Get(string streamName, CancellationToken ct = default) =>
20-
client.GetStateAsync(streamName, options.StateBuilder, options.GetStreamStateOptions, ct);
19+
Task<StateAtPointInTime<TState>> Get(
20+
string streamName,
21+
CancellationToken ct = default
22+
);
2123

22-
public Task<IWriteResult> AddAsync(string streamName, IEnumerable<TEvent> events, CancellationToken ct = default) =>
23-
AddAsync(
24-
streamName,
25-
events,
26-
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
27-
ct
28-
);
24+
Task<IWriteResult> AddAsync(
25+
string streamName,
26+
IEnumerable<TEvent> events,
27+
AppendToStreamOptions? appendToStreamOptions,
28+
CancellationToken ct = default
29+
);
2930

30-
public Task<IWriteResult> AddAsync(
31+
Task<IWriteResult> UpdateAsync(
3132
string streamName,
3233
IEnumerable<TEvent> events,
33-
AppendToStreamOptions appendToStreamOptions,
34+
AppendToStreamOptions? appendToStreamOptions,
3435
CancellationToken ct = default
35-
) {
36-
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
37-
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
36+
);
3837

39-
return client.AppendToStreamAsync(streamName, events.Cast<object>(), appendToStreamOptions, ct);
40-
}
38+
Task<IWriteResult> Handle(
39+
string streamName,
40+
CommandHandler<TState> handle,
41+
DecideOptions<TState>? decideOptions,
42+
CancellationToken ct = default
43+
);
44+
}
4145

42-
public Task<IWriteResult> UpdateAsync(
46+
public interface IStateStore<TState> : IStateStore<TState, object>
47+
where TState : notnull;
48+
49+
public static class StateStoreExtensions {
50+
public static Task<IWriteResult> AddAsync<TState, TEvent>(
51+
this IStateStore<TState, TEvent> stateStore,
4352
string streamName,
4453
IEnumerable<TEvent> events,
4554
CancellationToken ct = default
46-
) =>
47-
UpdateAsync(
55+
) where TState : notnull where TEvent : notnull =>
56+
stateStore.AddAsync(
57+
streamName,
58+
events,
59+
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
60+
ct
61+
);
62+
63+
public static Task<IWriteResult> UpdateAsync<TState, TEvent>(
64+
this IStateStore<TState, TEvent> stateStore,
65+
string streamName,
66+
IEnumerable<TEvent> events,
67+
CancellationToken ct = default
68+
) where TState : notnull where TEvent : notnull =>
69+
stateStore.UpdateAsync(
4870
streamName,
4971
events,
5072
new AppendToStreamOptions { ExpectedStreamState = StreamState.StreamExists },
5173
ct
5274
);
5375

54-
public Task<IWriteResult> UpdateAsync(
76+
public static Task<IWriteResult> UpdateAsync<TState, TEvent>(
77+
this IStateStore<TState, TEvent> stateStore,
5578
string streamName,
5679
IEnumerable<TEvent> events,
5780
StreamRevision expectedStreamRevision,
5881
CancellationToken ct = default
59-
) =>
60-
UpdateAsync(
82+
) where TState : notnull where TEvent : notnull =>
83+
stateStore.UpdateAsync(
6184
streamName,
6285
events,
6386
new AppendToStreamOptions { ExpectedStreamRevision = expectedStreamRevision },
6487
ct
6588
);
6689

67-
public Task<IWriteResult> UpdateAsync(
90+
public static Task<IWriteResult> Handle<TState, TEvent>(
91+
this IStateStore<TState, TEvent> stateStore,
92+
string streamName,
93+
CommandHandler<TState> handle,
94+
CancellationToken ct = default
95+
) where TState : notnull where TEvent : notnull =>
96+
stateStore.Handle(
97+
streamName,
98+
handle,
99+
null,
100+
ct
101+
);
102+
}
103+
104+
public class StateStore<TState>(KurrentClient client, StateStoreOptions<TState> options)
105+
: StateStore<TState, object>(client, options), IStateStore<TState>
106+
where TState : notnull;
107+
108+
public class StateStore<TState, TEvent>(KurrentClient client, StateStoreOptions<TState> options)
109+
: IStateStore<TState, TEvent>
110+
where TState : notnull
111+
where TEvent : notnull {
112+
public Task<StateAtPointInTime<TState>> Get(string streamName, CancellationToken ct = default) =>
113+
client.GetStateAsync(streamName, options.StateBuilder, options.GetStreamStateOptions, ct);
114+
115+
public Task<IWriteResult> AddAsync(
68116
string streamName,
69117
IEnumerable<TEvent> events,
70-
AppendToStreamOptions appendToStreamOptions,
118+
AppendToStreamOptions? appendToStreamOptions,
71119
CancellationToken ct = default
72120
) {
121+
appendToStreamOptions ??= new AppendToStreamOptions();
122+
73123
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
74-
appendToStreamOptions.ExpectedStreamState = StreamState.StreamExists;
124+
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
75125

76126
return client.AppendToStreamAsync(streamName, events.Cast<object>(), appendToStreamOptions, ct);
77127
}
78128

79-
public Task<IWriteResult> Handle(
129+
public Task<IWriteResult> UpdateAsync(
80130
string streamName,
81-
CommandHandler<TState> handle,
131+
IEnumerable<TEvent> events,
132+
AppendToStreamOptions? appendToStreamOptions,
82133
CancellationToken ct = default
83-
) =>
84-
client.DecideAsync(
85-
streamName,
86-
handle,
87-
options.StateBuilder,
88-
new DecideOptions<TState>(),
89-
ct
90-
);
134+
) {
135+
appendToStreamOptions ??= new AppendToStreamOptions();
136+
137+
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
138+
appendToStreamOptions.ExpectedStreamState = StreamState.StreamExists;
139+
140+
return client.AppendToStreamAsync(streamName, events.Cast<object>(), appendToStreamOptions, ct);
141+
}
91142

92143
public Task<IWriteResult> Handle(
93144
string streamName,
@@ -99,7 +150,7 @@ public Task<IWriteResult> Handle(
99150
streamName,
100151
handle,
101152
options.StateBuilder,
102-
new DecideOptions<TState>(),
153+
decideOptions ?? new DecideOptions<TState>(),
103154
ct
104155
);
105156
}

0 commit comments

Comments
 (0)