1818import com .fasterxml .jackson .databind .JsonNode ;
1919import com .fasterxml .jackson .databind .node .ArrayNode ;
2020import io .cloudevents .CloudEvent ;
21+ import io .serverlessworkflow .api .types .AllEventConsumptionStrategy ;
22+ import io .serverlessworkflow .api .types .AnyEventConsumptionStrategy ;
23+ import io .serverlessworkflow .api .types .EventConsumptionStrategy ;
2124import io .serverlessworkflow .api .types .EventFilter ;
2225import io .serverlessworkflow .api .types .ListenTask ;
2326import io .serverlessworkflow .api .types .ListenTaskConfiguration ;
2427import io .serverlessworkflow .api .types .ListenTaskConfiguration .ListenAndReadAs ;
2528import io .serverlessworkflow .api .types .ListenTo ;
29+ import io .serverlessworkflow .api .types .OneEventConsumptionStrategy ;
2630import io .serverlessworkflow .api .types .SubscriptionIterator ;
31+ import io .serverlessworkflow .api .types .Until ;
2732import io .serverlessworkflow .api .types .Workflow ;
2833import io .serverlessworkflow .impl .TaskContext ;
2934import io .serverlessworkflow .impl .WorkflowApplication ;
3035import io .serverlessworkflow .impl .WorkflowContext ;
3136import io .serverlessworkflow .impl .WorkflowFilter ;
3237import io .serverlessworkflow .impl .WorkflowPosition ;
38+ import io .serverlessworkflow .impl .WorkflowUtils ;
3339import io .serverlessworkflow .impl .events .CloudEventUtils ;
40+ import io .serverlessworkflow .impl .events .EventConsumer ;
3441import io .serverlessworkflow .impl .events .EventRegistration ;
3542import io .serverlessworkflow .impl .events .EventRegistrationBuilder ;
3643import io .serverlessworkflow .impl .json .JsonUtils ;
4047import java .util .List ;
4148import java .util .Optional ;
4249import java .util .concurrent .CompletableFuture ;
43- import java .util .function .Consumer ;
50+ import java .util .concurrent .atomic .AtomicBoolean ;
51+ import java .util .function .BiConsumer ;
4452import java .util .function .Function ;
4553import java .util .stream .Collectors ;
4654
4755public abstract class ListenExecutor extends RegularTaskExecutor <ListenTask > {
4856
49- protected final Collection <EventRegistrationBuilder > regBuilders ;
57+ protected final EventRegistrationBuilderCollection regBuilders ;
58+ protected final EventRegistrationBuilderCollection untilRegBuilders ;
5059 protected final Optional <WorkflowFilter > until ;
5160 protected final Optional <TaskExecutor <?>> loop ;
5261 protected final Function <CloudEvent , JsonNode > converter ;
62+ protected final EventConsumer eventConsumer ;
63+ protected final AtomicBoolean untilEvent = new AtomicBoolean (true );
64+
65+ private static record EventRegistrationBuilderCollection (
66+ Collection <EventRegistrationBuilder > registrations , boolean isAnd ) {}
5367
5468 public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder <ListenTask > {
5569
56- private Collection < EventRegistrationBuilder > registrations ;
70+ private EventRegistrationBuilderCollection registrations ;
5771 private WorkflowFilter until ;
72+ private EventRegistrationBuilderCollection untilRegistrations ;
5873 private TaskExecutor <?> loop ;
5974 private Function <CloudEvent , JsonNode > converter = this ::defaultCEConverter ;
60- private boolean isAnd ;
75+
76+ private EventRegistrationBuilderCollection allEvents (AllEventConsumptionStrategy allStrategy ) {
77+ return new EventRegistrationBuilderCollection (from (allStrategy .getAll ()), true );
78+ }
79+
80+ private EventRegistrationBuilderCollection anyEvents (AnyEventConsumptionStrategy anyStrategy ) {
81+ List <EventFilter > eventFilters = anyStrategy .getAny ();
82+ return new EventRegistrationBuilderCollection (
83+ eventFilters .isEmpty () ? registerToAll () : from (eventFilters ), false );
84+ }
85+
86+ private EventRegistrationBuilderCollection oneEvent (OneEventConsumptionStrategy oneStrategy ) {
87+ return new EventRegistrationBuilderCollection (List .of (from (oneStrategy .getOne ())), false );
88+ }
6189
6290 protected ListenExecutorBuilder (
6391 WorkflowPosition position ,
@@ -69,15 +97,29 @@ protected ListenExecutorBuilder(
6997 ListenTaskConfiguration listen = task .getListen ();
7098 ListenTo to = listen .getTo ();
7199 if (to .getAllEventConsumptionStrategy () != null ) {
72- isAnd = true ;
73- registrations = from (to .getAllEventConsumptionStrategy ().getAll ());
100+ registrations = allEvents (to .getAllEventConsumptionStrategy ());
74101 } else if (to .getAnyEventConsumptionStrategy () != null ) {
75- isAnd = false ;
76- List <EventFilter > eventFilters = to .getAnyEventConsumptionStrategy ().getAny ();
77- registrations = eventFilters .isEmpty () ? registerToAll () : from (eventFilters );
102+ AnyEventConsumptionStrategy any = to .getAnyEventConsumptionStrategy ();
103+ registrations = anyEvents (any );
104+ Until untilDesc = any .getUntil ();
105+ if (untilDesc != null ) {
106+ if (untilDesc .getAnyEventUntilCondition () != null ) {
107+ until =
108+ WorkflowUtils .buildWorkflowFilter (
109+ application .expressionFactory (), untilDesc .getAnyEventUntilCondition ());
110+ } else if (untilDesc .getAnyEventUntilConsumed () != null ) {
111+ EventConsumptionStrategy strategy = untilDesc .getAnyEventUntilConsumed ();
112+ if (strategy .getAllEventConsumptionStrategy () != null ) {
113+ untilRegistrations = allEvents (strategy .getAllEventConsumptionStrategy ());
114+ } else if (strategy .getAnyEventConsumptionStrategy () != null ) {
115+ untilRegistrations = anyEvents (strategy .getAnyEventConsumptionStrategy ());
116+ } else if (strategy .getOneEventConsumptionStrategy () != null ) {
117+ untilRegistrations = oneEvent (strategy .getOneEventConsumptionStrategy ());
118+ }
119+ }
120+ }
78121 } else if (to .getOneEventConsumptionStrategy () != null ) {
79- isAnd = false ;
80- registrations = List .of (from (to .getOneEventConsumptionStrategy ().getOne ()));
122+ registrations = oneEvent (to .getOneEventConsumptionStrategy ());
81123 }
82124 SubscriptionIterator forEach = task .getForeach ();
83125 if (forEach != null ) {
@@ -116,7 +158,7 @@ private EventRegistrationBuilder from(EventFilter filter) {
116158
117159 @ Override
118160 public TaskExecutor <ListenTask > buildInstance () {
119- return isAnd ? new AndListenExecutor (this ) : new OrListenExecutor (this );
161+ return registrations . isAnd () ? new AndListenExecutor (this ) : new OrListenExecutor (this );
120162 }
121163 }
122164
@@ -160,8 +202,11 @@ protected void internalProcessCe(
160202 TaskContext taskContext ,
161203 CompletableFuture <JsonNode > future ) {
162204 arrayNode .add (node );
163- if (until .isEmpty ()
164- || until .filter (u -> u .apply (workflow , taskContext , arrayNode ).asBoolean ()).isPresent ()) {
205+ if ((until .isEmpty ()
206+ || until
207+ .filter (u -> u .apply (workflow , taskContext , arrayNode ).asBoolean ())
208+ .isPresent ())
209+ && untilEvent .get ()) {
165210 future .complete (arrayNode );
166211 }
167212 }
@@ -176,6 +221,65 @@ protected abstract void internalProcessCe(
176221 TaskContext taskContext ,
177222 CompletableFuture <JsonNode > future );
178223
224+ @ Override
225+ protected CompletableFuture <JsonNode > internalExecute (
226+ WorkflowContext workflow , TaskContext taskContext ) {
227+ ArrayNode output = JsonUtils .mapper ().createArrayNode ();
228+ Collection <EventRegistration > registrations = new ArrayList <>();
229+ if (untilRegBuilders != null ) {
230+ untilEvent .set (false );
231+ }
232+ CompletableFuture <?> combinedFuture =
233+ combine (
234+ toCompletables (
235+ regBuilders ,
236+ registrations ,
237+ (ce , future ) ->
238+ processCe (converter .apply (ce ), output , workflow , taskContext , future )));
239+ CompletableFuture <JsonNode > resultFuture =
240+ combinedFuture .thenApply (
241+ v -> {
242+ registrations .forEach (reg -> eventConsumer .unregister (reg ));
243+ return output ;
244+ });
245+ if (untilRegBuilders != null ) {
246+ Collection <EventRegistration > untilRegistrations = new ArrayList <>();
247+ CompletableFuture <?>[] futures =
248+ toCompletables (
249+ untilRegBuilders , untilRegistrations , (ce , future ) -> future .complete (null ));
250+ CompletableFuture <?> untilFuture =
251+ untilRegBuilders .isAnd ()
252+ ? CompletableFuture .allOf (futures )
253+ : CompletableFuture .anyOf (futures );
254+ untilFuture .thenAccept (
255+ v -> {
256+ untilEvent .set (true );
257+ combinedFuture .complete (null );
258+ untilRegistrations .forEach (reg -> eventConsumer .unregister (reg ));
259+ });
260+ }
261+ return resultFuture ;
262+ }
263+
264+ private <T > CompletableFuture <T >[] toCompletables (
265+ EventRegistrationBuilderCollection regCollection ,
266+ Collection <EventRegistration > registrations ,
267+ BiConsumer <CloudEvent , CompletableFuture <T >> consumer ) {
268+ return regCollection .registrations ().stream ()
269+ .map (reg -> toCompletable (reg , registrations , consumer ))
270+ .toArray (size -> new CompletableFuture [size ]);
271+ }
272+
273+ private <T > CompletableFuture <T > toCompletable (
274+ EventRegistrationBuilder regBuilder ,
275+ Collection <EventRegistration > registrations ,
276+ BiConsumer <CloudEvent , CompletableFuture <T >> ceConsumer ) {
277+ final CompletableFuture <T > future = new CompletableFuture <>();
278+ registrations .add (
279+ eventConsumer .register (regBuilder , ce -> ceConsumer .accept ((CloudEvent ) ce , future )));
280+ return future ;
281+ }
282+
179283 private void processCe (
180284 JsonNode node ,
181285 ArrayNode arrayNode ,
@@ -199,48 +303,13 @@ private void processCe(
199303 () -> internalProcessCe (node , arrayNode , workflow , taskContext , future ));
200304 }
201305
202- protected CompletableFuture <JsonNode > toCompletable (
203- WorkflowContext workflow ,
204- TaskContext taskContext ,
205- EventRegistrationBuilder regBuilder ,
206- Collection <EventRegistration > registrations ,
207- ArrayNode arrayNode ) {
208- final CompletableFuture <JsonNode > future = new CompletableFuture <>();
209- registrations .add (
210- workflow
211- .definition ()
212- .application ()
213- .eventConsumer ()
214- .register (
215- regBuilder ,
216- (Consumer <CloudEvent >)
217- (ce ->
218- processCe (converter .apply (ce ), arrayNode , workflow , taskContext , future ))));
219- return future ;
220- }
221-
222- @ Override
223- protected CompletableFuture <JsonNode > internalExecute (
224- WorkflowContext workflow , TaskContext taskContext ) {
225- ArrayNode output = JsonUtils .mapper ().createArrayNode ();
226- Collection <EventRegistration > registrations = new ArrayList <>();
227- return combine (
228- regBuilders .stream ()
229- .map (reg -> toCompletable (workflow , taskContext , reg , registrations , output ))
230- .toArray (size -> new CompletableFuture [size ]))
231- .thenApply (
232- v -> {
233- registrations .forEach (
234- reg -> workflow .definition ().application ().eventConsumer ().unregister (reg ));
235- return output ;
236- });
237- }
238-
239306 protected ListenExecutor (ListenExecutorBuilder builder ) {
240307 super (builder );
308+ this .eventConsumer = builder .application .eventConsumer ();
241309 this .regBuilders = builder .registrations ;
242310 this .until = Optional .ofNullable (builder .until );
243311 this .loop = Optional .ofNullable (builder .loop );
244312 this .converter = builder .converter ;
313+ this .untilRegBuilders = builder .untilRegistrations ;
245314 }
246315}
0 commit comments