Skip to content

Commit fe23cbc

Browse files
authored
Merge pull request #623 from patchlevel/cli-stream-store-support
add stream store support in cli commands
2 parents c36ff32 + 677b9f4 commit fe23cbc

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

src/Console/Command/ShowCommand.php

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
use Patchlevel\EventSourcing\Console\OutputStyle;
99
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
1010
use Patchlevel\EventSourcing\Serializer\EventSerializer;
11+
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
12+
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
1113
use Patchlevel\EventSourcing\Store\Store;
1214
use Symfony\Component\Console\Attribute\AsCommand;
1315
use Symfony\Component\Console\Command\Command;
@@ -41,6 +43,12 @@ protected function configure(): void
4143
'How many messages should be displayed',
4244
10,
4345
)
46+
->addOption(
47+
'stream',
48+
null,
49+
InputOption::VALUE_REQUIRED,
50+
'Show messages from a specific stream (e.g. "stream-*")',
51+
)
4452
->addOption(
4553
'forward',
4654
null,
@@ -53,11 +61,18 @@ protected function execute(InputInterface $input, OutputInterface $output): int
5361
{
5462
$limit = InputHelper::positiveIntOrZero($input->getOption('limit'));
5563
$forward = InputHelper::bool($input->getOption('forward'));
64+
$stream = InputHelper::nullableString($input->getOption('stream'));
5665

5766
$console = new OutputStyle($input, $output);
5867

59-
$maxCount = $this->store->count();
60-
$stream = $this->store->load(null, null, null, !$forward);
68+
$criteria = null;
69+
70+
if ($stream !== null) {
71+
$criteria = new Criteria(new StreamCriterion($stream));
72+
}
73+
74+
$maxCount = $this->store->count($criteria);
75+
$stream = $this->store->load($criteria, null, null, !$forward);
6176

6277
$currentCount = 0;
6378

src/Console/Command/WatchCommand.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,23 @@ protected function configure(): void
4545
'How much time should elapse before the next job is executed in milliseconds',
4646
1000,
4747
)
48+
->addOption(
49+
'stream',
50+
null,
51+
InputOption::VALUE_REQUIRED,
52+
'Watch messages from a specific stream (e.g. "stream-*")',
53+
)
4854
->addOption(
4955
'aggregate',
5056
null,
5157
InputOption::VALUE_REQUIRED,
52-
'filter aggregate name',
58+
'Filter aggregate name',
5359
)
5460
->addOption(
5561
'aggregate-id',
5662
null,
5763
InputOption::VALUE_REQUIRED,
58-
'filter aggregate id',
64+
'Filter aggregate id',
5965
);
6066
}
6167

@@ -64,9 +70,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6470
$console = new OutputStyle($input, $output);
6571

6672
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
73+
$stream = InputHelper::nullableString($input->getOption('stream'));
6774
$aggregate = InputHelper::nullableString($input->getOption('aggregate'));
6875
$aggregateId = InputHelper::nullableString($input->getOption('aggregate-id'));
6976

77+
if ($stream !== null && ($aggregate !== null || $aggregateId !== null)) {
78+
$console->error('You can only provide stream or aggregate and aggregate-id');
79+
80+
return 1;
81+
}
82+
7083
$index = $this->currentIndex();
7184

7285
if ($this->store instanceof SubscriptionStore) {
@@ -75,6 +88,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
7588

7689
$criteriaBuilder = new CriteriaBuilder();
7790

91+
if ($stream !== null) {
92+
$criteriaBuilder->streamName($stream);
93+
}
94+
7895
if ($this->store instanceof StreamStore) {
7996
if ($aggregate !== null || $aggregateId !== null) {
8097
if ($aggregate === null || $aggregateId === null) {

0 commit comments

Comments
 (0)