1818import com .fasterxml .jackson .databind .JsonNode ;
1919import com .fasterxml .jackson .databind .node .ArrayNode ;
2020import io .cloudevents .CloudEvent ;
21+ import io .serverlessworkflow .api .types .AllEventConsumptionStrategy ;
22+ import io .serverlessworkflow .api .types .AnyEventConsumptionStrategy ;
23+ import io .serverlessworkflow .api .types .EventConsumptionStrategy ;
2124import io .serverlessworkflow .api .types .EventFilter ;
2225import io .serverlessworkflow .api .types .ListenTask ;
2326import io .serverlessworkflow .api .types .ListenTaskConfiguration ;
2427import io .serverlessworkflow .api .types .ListenTaskConfiguration .ListenAndReadAs ;
2528import io .serverlessworkflow .api .types .ListenTo ;
29+ import io .serverlessworkflow .api .types .OneEventConsumptionStrategy ;
2630import io .serverlessworkflow .api .types .SubscriptionIterator ;
31+ import io .serverlessworkflow .api .types .Until ;
2732import io .serverlessworkflow .api .types .Workflow ;
2833import io .serverlessworkflow .impl .TaskContext ;
2934import io .serverlessworkflow .impl .WorkflowApplication ;
3035import io .serverlessworkflow .impl .WorkflowContext ;
3136import io .serverlessworkflow .impl .WorkflowFilter ;
3237import io .serverlessworkflow .impl .WorkflowPosition ;
38+ import io .serverlessworkflow .impl .WorkflowUtils ;
3339import io .serverlessworkflow .impl .events .CloudEventUtils ;
40+ import io .serverlessworkflow .impl .events .EventConsumer ;
3441import io .serverlessworkflow .impl .events .EventRegistration ;
3542import io .serverlessworkflow .impl .events .EventRegistrationBuilder ;
3643import io .serverlessworkflow .impl .json .JsonUtils ;
4047import java .util .List ;
4148import java .util .Optional ;
4249import java .util .concurrent .CompletableFuture ;
43- import java .util .function .Consumer ;
50+ import java .util .function .BiConsumer ;
4451import java .util .function .Function ;
4552import java .util .stream .Collectors ;
4653
4754public abstract class ListenExecutor extends RegularTaskExecutor <ListenTask > {
4855
49- protected final Collection <EventRegistrationBuilder > regBuilders ;
56+ protected final EventRegistrationBuilderCollection regBuilders ;
57+ protected final EventRegistrationBuilderCollection untilRegBuilders ;
5058 protected final Optional <WorkflowFilter > until ;
5159 protected final Optional <TaskExecutor <?>> loop ;
5260 protected final Function <CloudEvent , JsonNode > converter ;
61+ protected final EventConsumer eventConsumer ;
62+
63+ private static record EventRegistrationBuilderCollection (
64+ Collection <EventRegistrationBuilder > registrations , boolean isAnd ) {}
5365
5466 public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder <ListenTask > {
5567
56- private Collection < EventRegistrationBuilder > registrations ;
68+ private EventRegistrationBuilderCollection registrations ;
5769 private WorkflowFilter until ;
70+ private EventRegistrationBuilderCollection untilRegistrations ;
5871 private TaskExecutor <?> loop ;
5972 private Function <CloudEvent , JsonNode > converter = this ::defaultCEConverter ;
60- private boolean isAnd ;
73+
74+ private EventRegistrationBuilderCollection allEvents (AllEventConsumptionStrategy allStrategy ) {
75+ return new EventRegistrationBuilderCollection (from (allStrategy .getAll ()), true );
76+ }
77+
78+ private EventRegistrationBuilderCollection anyEvents (AnyEventConsumptionStrategy anyStrategy ) {
79+ List <EventFilter > eventFilters = anyStrategy .getAny ();
80+ return new EventRegistrationBuilderCollection (
81+ eventFilters .isEmpty () ? registerToAll () : from (eventFilters ), false );
82+ }
83+
84+ private EventRegistrationBuilderCollection oneEvent (OneEventConsumptionStrategy oneStrategy ) {
85+ return new EventRegistrationBuilderCollection (List .of (from (oneStrategy .getOne ())), false );
86+ }
6187
6288 protected ListenExecutorBuilder (
6389 WorkflowPosition position ,
@@ -69,15 +95,29 @@ protected ListenExecutorBuilder(
6995 ListenTaskConfiguration listen = task .getListen ();
7096 ListenTo to = listen .getTo ();
7197 if (to .getAllEventConsumptionStrategy () != null ) {
72- isAnd = true ;
73- registrations = from (to .getAllEventConsumptionStrategy ().getAll ());
98+ registrations = allEvents (to .getAllEventConsumptionStrategy ());
7499 } else if (to .getAnyEventConsumptionStrategy () != null ) {
75- isAnd = false ;
76- List <EventFilter > eventFilters = to .getAnyEventConsumptionStrategy ().getAny ();
77- registrations = eventFilters .isEmpty () ? registerToAll () : from (eventFilters );
100+ AnyEventConsumptionStrategy any = to .getAnyEventConsumptionStrategy ();
101+ registrations = anyEvents (any );
102+ Until untilDesc = any .getUntil ();
103+ if (untilDesc != null ) {
104+ if (untilDesc .getAnyEventUntilCondition () != null ) {
105+ until =
106+ WorkflowUtils .buildWorkflowFilter (
107+ application .expressionFactory (), untilDesc .getAnyEventUntilCondition ());
108+ } else if (untilDesc .getAnyEventUntilConsumed () != null ) {
109+ EventConsumptionStrategy strategy = untilDesc .getAnyEventUntilConsumed ();
110+ if (strategy .getAllEventConsumptionStrategy () != null ) {
111+ untilRegistrations = allEvents (strategy .getAllEventConsumptionStrategy ());
112+ } else if (strategy .getAnyEventConsumptionStrategy () != null ) {
113+ untilRegistrations = anyEvents (strategy .getAnyEventConsumptionStrategy ());
114+ } else if (strategy .getOneEventConsumptionStrategy () != null ) {
115+ untilRegistrations = oneEvent (strategy .getOneEventConsumptionStrategy ());
116+ }
117+ }
118+ }
78119 } else if (to .getOneEventConsumptionStrategy () != null ) {
79- isAnd = false ;
80- registrations = List .of (from (to .getOneEventConsumptionStrategy ().getOne ()));
120+ registrations = oneEvent (to .getOneEventConsumptionStrategy ());
81121 }
82122 SubscriptionIterator forEach = task .getForeach ();
83123 if (forEach != null ) {
@@ -116,7 +156,7 @@ private EventRegistrationBuilder from(EventFilter filter) {
116156
117157 @ Override
118158 public TaskExecutor <ListenTask > buildInstance () {
119- return isAnd ? new AndListenExecutor (this ) : new OrListenExecutor (this );
159+ return registrations . isAnd () ? new AndListenExecutor (this ) : new OrListenExecutor (this );
120160 }
121161 }
122162
@@ -176,6 +216,59 @@ protected abstract void internalProcessCe(
176216 TaskContext taskContext ,
177217 CompletableFuture <JsonNode > future );
178218
219+ @ Override
220+ protected CompletableFuture <JsonNode > internalExecute (
221+ WorkflowContext workflow , TaskContext taskContext ) {
222+ ArrayNode output = JsonUtils .mapper ().createArrayNode ();
223+ Collection <EventRegistration > registrations = new ArrayList <>();
224+ CompletableFuture <?> combinedFuture =
225+ combine (
226+ toCompletables (
227+ regBuilders ,
228+ registrations ,
229+ (ce , future ) ->
230+ processCe (converter .apply (ce ), output , workflow , taskContext , future )));
231+ CompletableFuture <JsonNode > resultFuture =
232+ combinedFuture .thenApply (
233+ v -> {
234+ registrations .forEach (reg -> eventConsumer .unregister (reg ));
235+ return output ;
236+ });
237+ if (untilRegBuilders != null ) {
238+ Collection <EventRegistration > untilRegistrations = new ArrayList <>();
239+ CompletableFuture <?>[] futures =
240+ toCompletables (
241+ untilRegBuilders , untilRegistrations , (ce , future ) -> future .complete (null ));
242+ CompletableFuture <?> untilFuture =
243+ untilRegBuilders .isAnd ()
244+ ? CompletableFuture .allOf (futures )
245+ : CompletableFuture .anyOf (futures );
246+ untilFuture
247+ .thenAccept (v -> combinedFuture .complete (null ))
248+ .thenAccept (v -> untilRegistrations .forEach (reg -> eventConsumer .unregister (reg )));
249+ }
250+ return resultFuture ;
251+ }
252+
253+ private <T > CompletableFuture <T >[] toCompletables (
254+ EventRegistrationBuilderCollection regCollection ,
255+ Collection <EventRegistration > registrations ,
256+ BiConsumer <CloudEvent , CompletableFuture <T >> consumer ) {
257+ return regCollection .registrations ().stream ()
258+ .map (reg -> toCompletable (reg , registrations , consumer ))
259+ .toArray (size -> new CompletableFuture [size ]);
260+ }
261+
262+ private <T > CompletableFuture <T > toCompletable (
263+ EventRegistrationBuilder regBuilder ,
264+ Collection <EventRegistration > registrations ,
265+ BiConsumer <CloudEvent , CompletableFuture <T >> ceConsumer ) {
266+ final CompletableFuture <T > future = new CompletableFuture <>();
267+ registrations .add (
268+ eventConsumer .register (regBuilder , ce -> ceConsumer .accept ((CloudEvent ) ce , future )));
269+ return future ;
270+ }
271+
179272 private void processCe (
180273 JsonNode node ,
181274 ArrayNode arrayNode ,
@@ -199,48 +292,13 @@ private void processCe(
199292 () -> internalProcessCe (node , arrayNode , workflow , taskContext , future ));
200293 }
201294
202- protected CompletableFuture <JsonNode > toCompletable (
203- WorkflowContext workflow ,
204- TaskContext taskContext ,
205- EventRegistrationBuilder regBuilder ,
206- Collection <EventRegistration > registrations ,
207- ArrayNode arrayNode ) {
208- final CompletableFuture <JsonNode > future = new CompletableFuture <>();
209- registrations .add (
210- workflow
211- .definition ()
212- .application ()
213- .eventConsumer ()
214- .register (
215- regBuilder ,
216- (Consumer <CloudEvent >)
217- (ce ->
218- processCe (converter .apply (ce ), arrayNode , workflow , taskContext , future ))));
219- return future ;
220- }
221-
222- @ Override
223- protected CompletableFuture <JsonNode > internalExecute (
224- WorkflowContext workflow , TaskContext taskContext ) {
225- ArrayNode output = JsonUtils .mapper ().createArrayNode ();
226- Collection <EventRegistration > registrations = new ArrayList <>();
227- return combine (
228- regBuilders .stream ()
229- .map (reg -> toCompletable (workflow , taskContext , reg , registrations , output ))
230- .toArray (size -> new CompletableFuture [size ]))
231- .thenApply (
232- v -> {
233- registrations .forEach (
234- reg -> workflow .definition ().application ().eventConsumer ().unregister (reg ));
235- return output ;
236- });
237- }
238-
239295 protected ListenExecutor (ListenExecutorBuilder builder ) {
240296 super (builder );
297+ this .eventConsumer = builder .application .eventConsumer ();
241298 this .regBuilders = builder .registrations ;
242299 this .until = Optional .ofNullable (builder .until );
243300 this .loop = Optional .ofNullable (builder .loop );
244301 this .converter = builder .converter ;
302+ this .untilRegBuilders = builder .untilRegistrations ;
245303 }
246304}
0 commit comments