@@ -45,17 +45,23 @@ protected function configure(): void
4545 'How much time should elapse before the next job is executed in milliseconds ' ,
4646 1000 ,
4747 )
48+ ->addOption (
49+ 'stream ' ,
50+ null ,
51+ InputOption::VALUE_REQUIRED ,
52+ 'Watch messages from a specific stream (e.g. "stream-*") ' ,
53+ )
4854 ->addOption (
4955 'aggregate ' ,
5056 null ,
5157 InputOption::VALUE_REQUIRED ,
52- 'filter aggregate name ' ,
58+ 'Filter aggregate name ' ,
5359 )
5460 ->addOption (
5561 'aggregate-id ' ,
5662 null ,
5763 InputOption::VALUE_REQUIRED ,
58- 'filter aggregate id ' ,
64+ 'Filter aggregate id ' ,
5965 );
6066 }
6167
@@ -64,9 +70,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
6470 $ console = new OutputStyle ($ input , $ output );
6571
6672 $ sleep = InputHelper::positiveIntOrZero ($ input ->getOption ('sleep ' ));
73+ $ stream = InputHelper::nullableString ($ input ->getOption ('stream ' ));
6774 $ aggregate = InputHelper::nullableString ($ input ->getOption ('aggregate ' ));
6875 $ aggregateId = InputHelper::nullableString ($ input ->getOption ('aggregate-id ' ));
6976
77+ if ($ stream !== null && ($ aggregate !== null || $ aggregateId !== null )) {
78+ $ console ->error ('You can only provide stream or aggregate and aggregate-id ' );
79+
80+ return 1 ;
81+ }
82+
7083 $ index = $ this ->currentIndex ();
7184
7285 if ($ this ->store instanceof SubscriptionStore) {
@@ -75,6 +88,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
7588
7689 $ criteriaBuilder = new CriteriaBuilder ();
7790
91+ if ($ stream !== null ) {
92+ $ criteriaBuilder ->streamName ($ stream );
93+ }
94+
7895 if ($ this ->store instanceof StreamStore) {
7996 if ($ aggregate !== null || $ aggregateId !== null ) {
8097 if ($ aggregate === null || $ aggregateId === null ) {
0 commit comments