Skip to content

Commit 0d40db7

Browse files
committed
[Fix #847] Implementing schedule.on
Signed-off-by: fjtirado <[email protected]>
1 parent eb92e70 commit 0d40db7

File tree

11 files changed

+497
-128
lines changed

11 files changed

+497
-128
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
3131
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
3232
import io.serverlessworkflow.impl.resources.StaticResource;
33+
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
3334
import io.serverlessworkflow.impl.schema.SchemaValidator;
3435
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
3536
import java.util.ArrayList;
@@ -59,6 +60,7 @@ public class WorkflowApplication implements AutoCloseable {
5960
private final Collection<EventPublisher> eventPublishers;
6061
private final boolean lifeCycleCEPublishingEnabled;
6162
private final WorkflowModelFactory modelFactory;
63+
private final WorkflowScheduler scheduler;
6264

6365
private WorkflowApplication(Builder builder) {
6466
this.taskFactory = builder.taskFactory;
@@ -75,6 +77,7 @@ private WorkflowApplication(Builder builder) {
7577
this.eventPublishers = builder.eventPublishers;
7678
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
7779
this.modelFactory = builder.modelFactory;
80+
this.scheduler = builder.scheduler;
7881
}
7982

8083
public TaskExecutorFactory taskFactory() {
@@ -142,6 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
142145
private SchemaValidatorFactory schemaValidatorFactory;
143146
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
144147
private WorkflowInstanceIdFactory idFactory;
148+
private WorkflowScheduler scheduler;
145149
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
146150
private EventConsumer<?, ?> eventConsumer;
147151
private Collection<EventPublisher> eventPublishers = new ArrayList<>();
@@ -167,6 +171,11 @@ public Builder withExpressionFactory(ExpressionFactory factory) {
167171
return this;
168172
}
169173

174+
public Builder withScheduler(WorkflowScheduler scheduler) {
175+
this.scheduler = scheduler;
176+
return this;
177+
}
178+
170179
public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
171180
this.resourceLoaderFactory = resourceLoader;
172181
return this;
@@ -257,6 +266,9 @@ public WorkflowApplication build() {
257266
if (idFactory == null) {
258267
idFactory = new MonotonicUlidWorkflowInstanceIdFactory();
259268
}
269+
if (scheduler == null) {
270+
scheduler = new DefaultWorkflowScheduler();
271+
}
260272
return new WorkflowApplication(this);
261273
}
262274
}
@@ -313,4 +325,8 @@ public ExecutorService executorService() {
313325
public boolean isLifeCycleCEPublishingEnabled() {
314326
return lifeCycleCEPublishingEnabled;
315327
}
328+
329+
public WorkflowScheduler scheduler() {
330+
return scheduler;
331+
}
316332
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818
import static io.serverlessworkflow.impl.WorkflowUtils.*;
1919

2020
import io.serverlessworkflow.api.types.Input;
21+
import io.serverlessworkflow.api.types.ListenTo;
2122
import io.serverlessworkflow.api.types.Output;
23+
import io.serverlessworkflow.api.types.Schedule;
2224
import io.serverlessworkflow.api.types.Workflow;
25+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
26+
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
2327
import io.serverlessworkflow.impl.executors.TaskExecutor;
2428
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2529
import io.serverlessworkflow.impl.resources.ResourceLoader;
30+
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
2631
import io.serverlessworkflow.impl.schema.SchemaValidator;
2732
import java.nio.file.Path;
2833
import java.util.HashMap;
@@ -36,6 +41,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3641
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
3742
private Optional<WorkflowFilter> inputFilter = Optional.empty();
3843
private Optional<WorkflowFilter> outputFilter = Optional.empty();
44+
private EventRegistrationInfo registrationInfo;
3945
private final WorkflowApplication application;
4046
private final TaskExecutor<?> taskExecutor;
4147
private final ResourceLoader resourceLoader;
@@ -46,14 +52,16 @@ private WorkflowDefinition(
4652
this.workflow = workflow;
4753
this.application = application;
4854
this.resourceLoader = resourceLoader;
49-
if (workflow.getInput() != null) {
50-
Input input = workflow.getInput();
55+
56+
Input input = workflow.getInput();
57+
if (input != null) {
5158
this.inputSchemaValidator =
5259
getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema());
5360
this.inputFilter = buildWorkflowFilter(application, input.getFrom());
5461
}
55-
if (workflow.getOutput() != null) {
56-
Output output = workflow.getOutput();
62+
63+
Output output = workflow.getOutput();
64+
if (output != null) {
5765
this.outputSchemaValidator =
5866
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
5967
this.outputFilter = buildWorkflowFilter(application, output.getAs());
@@ -68,8 +76,37 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow)
6876
}
6977

7078
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) {
71-
return new WorkflowDefinition(
72-
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
79+
WorkflowDefinition definition =
80+
new WorkflowDefinition(
81+
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
82+
Schedule schedule = workflow.getSchedule();
83+
if (schedule != null) {
84+
ListenTo to = schedule.getOn();
85+
if (to != null) {
86+
definition.register(
87+
application.scheduler().eventConsumer(definition, application.modelFactory()::from),
88+
EventRegistrationBuilderInfo.from(application, to, x -> null));
89+
}
90+
}
91+
return definition;
92+
}
93+
94+
private void register(ScheduledEventConsumer consumer, EventRegistrationBuilderInfo builderInfo) {
95+
WorkflowModelCollection model = application.modelFactory().createCollection();
96+
registrationInfo =
97+
EventRegistrationInfo.<WorkflowModel>build(
98+
builderInfo.registrations(),
99+
(ce, f) -> consumer.accept(ce, f, model),
100+
application.eventConsumer());
101+
registrationInfo
102+
.completableFuture()
103+
.thenAccept(
104+
x -> {
105+
EventRegistrationInfo prevRegistrationInfo = registrationInfo;
106+
register(consumer, builderInfo);
107+
consumer.start(model);
108+
prevRegistrationInfo.registrations().forEach(application.eventConsumer()::unregister);
109+
});
73110
}
74111

75112
public WorkflowInstance instance(Object input) {
@@ -121,5 +158,9 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
121158
}
122159

123160
@Override
124-
public void close() {}
161+
public void close() {
162+
if (registrationInfo != null) {
163+
registrationInfo.registrations().forEach(application.eventConsumer()::unregister);
164+
}
165+
}
125166
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
20+
import java.util.Collection;
21+
import java.util.function.Function;
22+
23+
public interface WorkflowScheduler {
24+
Collection<WorkflowInstance> scheduledInstances();
25+
26+
ScheduledEventConsumer eventConsumer(
27+
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter);
28+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import java.util.Collection;
19+
20+
public record EventRegistrationBuilderCollection(
21+
Collection<EventRegistrationBuilder> registrations, boolean isAnd) {}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.serverlessworkflow.api.types.AllEventConsumptionStrategy;
19+
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
20+
import io.serverlessworkflow.api.types.EventConsumptionStrategy;
21+
import io.serverlessworkflow.api.types.EventFilter;
22+
import io.serverlessworkflow.api.types.ListenTo;
23+
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
24+
import io.serverlessworkflow.api.types.Until;
25+
import io.serverlessworkflow.impl.WorkflowApplication;
26+
import io.serverlessworkflow.impl.WorkflowPredicate;
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
31+
32+
public record EventRegistrationBuilderInfo(
33+
EventRegistrationBuilderCollection registrations,
34+
EventRegistrationBuilderCollection untilRegistrations,
35+
WorkflowPredicate until) {
36+
37+
public static EventRegistrationBuilderInfo from(
38+
WorkflowApplication application,
39+
ListenTo to,
40+
Function<Until, WorkflowPredicate> predBuilder) {
41+
EventRegistrationBuilderCollection registrations;
42+
EventRegistrationBuilderCollection untilRegistrations = null;
43+
WorkflowPredicate until = null;
44+
if (to.getAllEventConsumptionStrategy() != null) {
45+
registrations = allEvents(to.getAllEventConsumptionStrategy(), application);
46+
} else if (to.getAnyEventConsumptionStrategy() != null) {
47+
AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy();
48+
registrations = anyEvents(any, application);
49+
Until untilDesc = any.getUntil();
50+
if (untilDesc != null) {
51+
until = predBuilder.apply(untilDesc);
52+
if (until == null) {
53+
if (untilDesc.getAnyEventUntilConsumed() != null) {
54+
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
55+
if (strategy.getAllEventConsumptionStrategy() != null) {
56+
untilRegistrations =
57+
allEvents(strategy.getAllEventConsumptionStrategy(), application);
58+
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
59+
untilRegistrations =
60+
anyEvents(strategy.getAnyEventConsumptionStrategy(), application);
61+
} else if (strategy.getOneEventConsumptionStrategy() != null) {
62+
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy(), application);
63+
}
64+
}
65+
}
66+
}
67+
} else {
68+
registrations = oneEvent(to.getOneEventConsumptionStrategy(), application);
69+
}
70+
return new EventRegistrationBuilderInfo(registrations, untilRegistrations, until);
71+
}
72+
73+
private static EventRegistrationBuilderCollection allEvents(
74+
AllEventConsumptionStrategy allStrategy, WorkflowApplication application) {
75+
return new EventRegistrationBuilderCollection(from(allStrategy.getAll(), application), true);
76+
}
77+
78+
private static EventRegistrationBuilderCollection anyEvents(
79+
AnyEventConsumptionStrategy anyStrategy, WorkflowApplication application) {
80+
List<EventFilter> eventFilters = anyStrategy.getAny();
81+
return new EventRegistrationBuilderCollection(
82+
eventFilters.isEmpty() ? registerToAll(application) : from(eventFilters, application),
83+
false);
84+
}
85+
86+
private static EventRegistrationBuilderCollection oneEvent(
87+
OneEventConsumptionStrategy oneStrategy, WorkflowApplication application) {
88+
return new EventRegistrationBuilderCollection(
89+
List.of(from(oneStrategy.getOne(), application)), true);
90+
}
91+
92+
private static Collection<EventRegistrationBuilder> registerToAll(
93+
WorkflowApplication application) {
94+
return application.eventConsumer().listenToAll(application);
95+
}
96+
97+
private static Collection<EventRegistrationBuilder> from(
98+
List<EventFilter> filters, WorkflowApplication application) {
99+
return filters.stream().map(filter -> from(filter, application)).collect(Collectors.toList());
100+
}
101+
102+
private static EventRegistrationBuilder from(
103+
EventFilter filter, WorkflowApplication application) {
104+
return application.eventConsumer().listen(filter, application);
105+
}
106+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.cloudevents.CloudEvent;
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.BiConsumer;
23+
24+
public record EventRegistrationInfo(
25+
CompletableFuture<?> completableFuture, Collection<EventRegistration> registrations) {
26+
27+
public static final <T> EventRegistrationInfo build(
28+
EventRegistrationBuilderCollection builderInfo,
29+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
30+
EventConsumer eventConsumer) {
31+
Collection<EventRegistration> registrations = new ArrayList();
32+
CompletableFuture<T>[] futures =
33+
builderInfo.registrations().stream()
34+
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer))
35+
.toArray(size -> new CompletableFuture[size]);
36+
return new EventRegistrationInfo(
37+
builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures),
38+
registrations);
39+
}
40+
41+
private static final <T> CompletableFuture<T> toCompletable(
42+
EventRegistrationBuilder regBuilder,
43+
Collection<EventRegistration> registrations,
44+
BiConsumer<CloudEvent, CompletableFuture<T>> ceConsumer,
45+
EventConsumer eventConsumer) {
46+
final CompletableFuture<T> future = new CompletableFuture<>();
47+
registrations.add(
48+
eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future)));
49+
return future;
50+
}
51+
}

0 commit comments

Comments
 (0)