Skip to content

Commit c36ff32

Browse files
authored
Merge pull request #620 from patchlevel/stream-store
[Experimental] Stream Store
2 parents 1ff6007 + 21a31d7 commit c36ff32

31 files changed

+3818
-85
lines changed

Makefile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,17 @@ test: phpunit
6868
benchmark: vendor ## run benchmarks
6969
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --report=default
7070

71-
.PHONY: benchmark-diff-test
72-
benchmark-diff-test: vendor ## run benchmarks
71+
.PHONY: benchmark-base
72+
benchmark-base: vendor ## run benchmarks
7373
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=default --progress=none --tag=base
74+
75+
.PHONY: benchmark-diff
76+
benchmark-diff: vendor ## run benchmarks
7477
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=diff --progress=none --ref=base
7578

79+
.PHONY: benchmark-diff-test
80+
benchmark-diff-test: benchmark-base benchmark-diff ## run benchmarks
81+
7682
.PHONY: dev
7783
dev: static test ## run dev tools
7884

baseline.xml

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
2+
<files psalm-version="5.25.0@01a8eb06b9e9cc6cfb6a320bf9fb14331919d505">
33
<file src="src/Aggregate/AggregateRootBehaviour.php">
44
<UnsafeInstantiation>
55
<code><![CDATA[new static()]]></code>
@@ -86,6 +86,11 @@
8686
<code><![CDATA[$dateTimeType->convertToPHPValue($data['recorded_on'], $platform)]]></code>
8787
</MixedArgument>
8888
</file>
89+
<file src="src/Store/StreamDoctrineDbalStoreStream.php">
90+
<ArgumentTypeCoercion>
91+
<code><![CDATA[$data['playhead'] === null ? null : (int)$data['playhead']]]></code>
92+
</ArgumentTypeCoercion>
93+
</file>
8994
<file src="src/Subscription/Store/DoctrineSubscriptionStore.php">
9095
<MixedArgument>
9196
<code><![CDATA[$context]]></code>
@@ -131,6 +136,14 @@
131136
<code><![CDATA[$store]]></code>
132137
</MissingConstructor>
133138
</file>
139+
<file src="tests/Benchmark/SimpleSetupStreamStoreBench.php">
140+
<MissingConstructor>
141+
<code><![CDATA[$multipleEventsId]]></code>
142+
<code><![CDATA[$repository]]></code>
143+
<code><![CDATA[$singleEventId]]></code>
144+
<code><![CDATA[$store]]></code>
145+
</MissingConstructor>
146+
</file>
134147
<file src="tests/Benchmark/SnapshotsBench.php">
135148
<MissingConstructor>
136149
<code><![CDATA[$adapter]]></code>
@@ -320,6 +333,63 @@
320333
)]]></code>
321334
</InternalMethod>
322335
</file>
336+
<file src="tests/Unit/Store/StreamDoctrineDbalStoreTest.php">
337+
<DeprecatedMethod>
338+
<code><![CDATA[addMethods]]></code>
339+
</DeprecatedMethod>
340+
<InternalMethod>
341+
<code><![CDATA[new DefaultSelectSQLBuilder(
342+
$abstractPlatform->reveal(),
343+
'FOR UPDATE',
344+
'SKIP LOCKED',
345+
)]]></code>
346+
<code><![CDATA[new DefaultSelectSQLBuilder(
347+
$abstractPlatform->reveal(),
348+
'FOR UPDATE',
349+
'SKIP LOCKED',
350+
)]]></code>
351+
<code><![CDATA[new DefaultSelectSQLBuilder(
352+
$abstractPlatform->reveal(),
353+
'FOR UPDATE',
354+
'SKIP LOCKED',
355+
)]]></code>
356+
<code><![CDATA[new DefaultSelectSQLBuilder(
357+
$abstractPlatform->reveal(),
358+
'FOR UPDATE',
359+
'SKIP LOCKED',
360+
)]]></code>
361+
<code><![CDATA[new DefaultSelectSQLBuilder(
362+
$abstractPlatform->reveal(),
363+
'FOR UPDATE',
364+
'SKIP LOCKED',
365+
)]]></code>
366+
<code><![CDATA[new DefaultSelectSQLBuilder(
367+
$abstractPlatform->reveal(),
368+
'FOR UPDATE',
369+
'SKIP LOCKED',
370+
)]]></code>
371+
<code><![CDATA[new DefaultSelectSQLBuilder(
372+
$abstractPlatform->reveal(),
373+
'FOR UPDATE',
374+
'SKIP LOCKED',
375+
)]]></code>
376+
<code><![CDATA[new DefaultSelectSQLBuilder(
377+
$abstractPlatform->reveal(),
378+
'FOR UPDATE',
379+
'SKIP LOCKED',
380+
)]]></code>
381+
<code><![CDATA[new DefaultSelectSQLBuilder(
382+
$abstractPlatform->reveal(),
383+
'FOR UPDATE',
384+
'SKIP LOCKED',
385+
)]]></code>
386+
<code><![CDATA[new DefaultSelectSQLBuilder(
387+
$abstractPlatform->reveal(),
388+
'FOR UPDATE',
389+
'SKIP LOCKED',
390+
)]]></code>
391+
</InternalMethod>
392+
</file>
323393
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
324394
<PossiblyUndefinedArrayOffset>
325395
<code><![CDATA[$update1]]></code>

deptrac.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ deptrac:
9292
collectors:
9393
- type: directory
9494
value: src/Subscription/.*
95+
- name: Test
96+
collectors:
97+
- type: directory
98+
value: src/Test/.*
9599

96100
ruleset:
97101
Aggregate:
@@ -175,7 +179,9 @@ deptrac:
175179
Store:
176180
- Aggregate
177181
- Attribute
182+
- Clock
178183
- Message
179184
- Metadata
180185
- Schema
181186
- Serializer
187+
Test:

docs/pages/store.md

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ Each message contains an event and the associated headers.
88
More information about the message can be found [here](message.md).
99

1010
The store is optimized to efficiently store and load events for aggregates.
11-
We currently only offer one [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) store.
1211

1312
## Create DBAL connection
1413

@@ -29,8 +28,14 @@ $connection = DriverManager::getConnection(
2928

3029
## Configure Store
3130

31+
We currently offer two stores, both based on the [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) library.
32+
The default store is the `DoctrineDbalStore` and the new experimental store is the `StreamDoctrineDbalStore`.
33+
34+
### DoctrineDbalStore
35+
36+
This is the current default store for event sourcing.
3237
You can create a store with the `DoctrineDbalStore` class.
33-
The store needs a dbal connection, an event serializer, an aggregate registry and some options.
38+
The store needs a dbal connection, an event serializer and has some optional parameters like options.
3439

3540
```php
3641
use Doctrine\DBAL\Connection;
@@ -41,21 +46,17 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
4146
$store = new DoctrineDbalStore(
4247
$connection,
4348
DefaultEventSerializer::createFromPaths(['src/Event']),
44-
null,
45-
[/** options */],
4649
);
4750
```
4851
Following options are available in `DoctrineDbalStore`:
4952

50-
| Option | Type | Default | Description |
51-
|-------------------|------------------|------------|----------------------------------------------|
52-
| table_name | string | eventstore | The name of the table in the database |
53-
| aggregate_id_type | "uuid"|"string" | uuid | The type of the `aggregate_id` column |
54-
| locking | bool | true | If the store should use locking for writing |
55-
| lock_id | int | 133742 | The id of the lock |
56-
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
57-
58-
## Schema
53+
| Option | Type | Default | Description |
54+
|-------------------|-----------------|------------|----------------------------------------------|
55+
| table_name | string | eventstore | The name of the table in the database |
56+
| aggregate_id_type | "uuid"/"string" | uuid | The type of the `aggregate_id` column |
57+
| locking | bool | true | If the store should use locking for writing |
58+
| lock_id | int | 133742 | The id of the lock |
59+
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
5960

6061
The table structure of the `DoctrineDbalStore` looks like this:
6162

@@ -72,13 +73,59 @@ The table structure of the `DoctrineDbalStore` looks like this:
7273
| archived | bool | If the event is archived |
7374
| custom_headers | json | Custom headers for the event |
7475

75-
With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.
76-
7776
!!! note
7877

7978
The default type of the `aggregate_id` column is `uuid` if the database supports it and `string` if not.
8079
You can change the type with the `aggregate_id_type` to `string` if you want use custom id.
8180

81+
### StreamDoctrineDbalStore
82+
83+
We offer a new experimental store called `StreamDoctrineDbalStore`.
84+
This store is decoupled from the aggregate and can be used to store events from other sources.
85+
The difference to the `DoctrineDbalStore` is that the `StreamDoctrineDbalStore` merge the aggregate id
86+
and the aggregate name into one column named `stream`. Additionally, the column `playhead` is nullable.
87+
This store introduces two new methods `streams` and `remove`.
88+
89+
The store needs a dbal connection, an event serializer and has some optional parameters like options.
90+
91+
```php
92+
use Doctrine\DBAL\Connection;
93+
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
94+
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
95+
96+
/** @var Connection $connection */
97+
$store = new StreamDoctrineDbalStore(
98+
$connection,
99+
DefaultEventSerializer::createFromPaths(['src/Event']),
100+
);
101+
```
102+
Following options are available in `StreamDoctrineDbalStore`:
103+
104+
| Option | Type | Default | Description |
105+
|-------------------|-----------------|-------------|----------------------------------------------|
106+
| table_name | string | event_store | The name of the table in the database |
107+
| locking | bool | true | If the store should use locking for writing |
108+
| lock_id | int | 133742 | The id of the lock |
109+
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
110+
111+
The table structure of the `StreamDoctrineDbalStore` looks like this:
112+
113+
| Column | Type | Description |
114+
|------------------|----------|--------------------------------------------------|
115+
| id | bigint | The index of the whole stream (autoincrement) |
116+
| stream | string | The name of the stream |
117+
| playhead | ?int | The current playhead of the aggregate |
118+
| event | string | The name of the event |
119+
| payload | json | The payload of the event |
120+
| recorded_on | datetime | The date when the event was recorded |
121+
| new_stream_start | bool | If the event is the first event of the aggregate |
122+
| archived | bool | If the event is archived |
123+
| custom_headers | json | Custom headers for the event |
124+
125+
## Schema
126+
127+
With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.
128+
82129
!!! tip
83130

84131
You can also use doctrine migration to create and keep your schema in sync.
@@ -92,11 +139,11 @@ Additionally, it implements the `DryRunSchemaDirector` interface, to show the sq
92139
```php
93140
use Doctrine\DBAL\Connection;
94141
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
95-
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
142+
use Patchlevel\EventSourcing\Store\Store;
96143

97144
/**
98145
* @var Connection $connection
99-
* @var DoctrineDbalStore $store
146+
* @var Store $store
100147
*/
101148
$schemaDirector = new DoctrineSchemaDirector(
102149
$connection,
@@ -179,13 +226,13 @@ use Doctrine\Migrations\DependencyFactory;
179226
use Doctrine\Migrations\Provider\SchemaProvider;
180227
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
181228
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
182-
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
229+
use Patchlevel\EventSourcing\Store\Store;
183230

184231
// event sourcing schema director configuration
185232

186233
/**
187234
* @var Connection $connection
188-
* @var DoctrineDbalStore $store
235+
* @var Store $store
189236
*/
190237
$schemaDirector = new DoctrineSchemaDirector(
191238
$connection,
@@ -355,11 +402,39 @@ $store->save(...$messages);
355402

356403
Use transactional method if you want call multiple save methods in a transaction.
357404

358-
### Delete & Update
405+
### Update
359406

360-
It is not possible to delete or update events.
407+
It is not possible to update events.
361408
In event sourcing, the events are immutable.
362409

410+
### Remove
411+
412+
You can remove a stream with the `remove` method.
413+
414+
```php
415+
use Patchlevel\EventSourcing\Store\StreamStore;
416+
417+
/** @var StreamStore $store */
418+
$store->remove('profile-*');
419+
```
420+
!!! note
421+
422+
The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.
423+
424+
### List Streams
425+
426+
You can list all streams with the `streams` method.
427+
428+
```php
429+
use Patchlevel\EventSourcing\Store\StreamStore;
430+
431+
/** @var StreamStore $store */
432+
$streams = $store->streams(); // ['profile-1', 'profile-2', 'profile-3']
433+
```
434+
!!! note
435+
436+
The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.
437+
363438
### Transaction
364439

365440
There is also the possibility of executing a function in a transaction.

phpbench.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"partition": "subject_name",
4444
"cols":
4545
{
46-
"time-diff": "percent_diff(partition['result_time_avg'][1], partition['result_time_avg'][0])"
46+
"time-diff": "percent_diff(coalesce(partition['result_time_avg']?[1], 0), coalesce(partition['result_time_avg']?[0], 0))"
4747
}
4848
},
4949
"memory":
@@ -61,7 +61,7 @@
6161
"partition": "subject_name",
6262
"cols":
6363
{
64-
"memory-diff": "percent_diff(partition['result_mem_peak'][1], partition['result_mem_peak'][0])"
64+
"memory-diff": "percent_diff(coalesce(partition['result_mem_peak']?[1], 0), coalesce(partition['result_mem_peak']?[0], 0))"
6565
}
6666
}
6767
}

phpstan-baseline.neon

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ parameters:
5050
count: 1
5151
path: src/Store/DoctrineDbalStoreStream.php
5252

53+
-
54+
message: "#^Parameter \\#2 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\StreamHeader constructor expects int\\<1, max\\>\\|null, int\\|null given\\.$#"
55+
count: 1
56+
path: src/Store/StreamDoctrineDbalStoreStream.php
57+
58+
-
59+
message: "#^Ternary operator condition is always true\\.$#"
60+
count: 1
61+
path: src/Store/StreamDoctrineDbalStoreStream.php
62+
5363
-
5464
message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Subscription\\\\SubscriptionError constructor expects array\\<int, array\\{class\\: class\\-string, message\\: string, code\\: int\\|string, file\\: string, line\\: int, trace\\: array\\<int, array\\{file\\?\\: string, line\\?\\: int, function\\?\\: string, class\\?\\: string, type\\?\\: string, args\\?\\: array\\}\\>\\}\\>\\|null, mixed given\\.$#"
5565
count: 1

src/Aggregate/AggregateHeader.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ public function __construct(
1919
public readonly DateTimeImmutable $recordedOn,
2020
) {
2121
}
22+
23+
public function streamName(): string
24+
{
25+
return StreamNameTranslator::streamName($this->aggregateName, $this->aggregateId);
26+
}
2227
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcing\Aggregate;
6+
7+
use RuntimeException;
8+
9+
use function sprintf;
10+
11+
/** @experimental */
12+
final class InvalidAggregateStreamName extends RuntimeException
13+
{
14+
public function __construct(string $stream)
15+
{
16+
parent::__construct(sprintf('Invalid aggregate stream name "%s". Expected format is "[aggregateName]-[aggregateId]".', $stream));
17+
}
18+
}

0 commit comments

Comments
 (0)