Skip to content

Commit 1247a9a

Browse files
committed
[Fix #847] Alternative implementation
Signed-off-by: fjtirado <[email protected]>
1 parent 0d40db7 commit 1247a9a

File tree

9 files changed

+273
-127
lines changed

9 files changed

+273
-127
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.*;
19+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1920

2021
import io.serverlessworkflow.api.types.Input;
2122
import io.serverlessworkflow.api.types.ListenTo;
2223
import io.serverlessworkflow.api.types.Output;
2324
import io.serverlessworkflow.api.types.Schedule;
2425
import io.serverlessworkflow.api.types.Workflow;
2526
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
26-
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
2727
import io.serverlessworkflow.impl.executors.TaskExecutor;
2828
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2929
import io.serverlessworkflow.impl.resources.ResourceLoader;
@@ -41,11 +41,11 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4141
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4242
private Optional<WorkflowFilter> inputFilter = Optional.empty();
4343
private Optional<WorkflowFilter> outputFilter = Optional.empty();
44-
private EventRegistrationInfo registrationInfo;
4544
private final WorkflowApplication application;
4645
private final TaskExecutor<?> taskExecutor;
4746
private final ResourceLoader resourceLoader;
4847
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
48+
private ScheduledEventConsumer scheculedConsumer;
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -83,32 +83,18 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8383
if (schedule != null) {
8484
ListenTo to = schedule.getOn();
8585
if (to != null) {
86-
definition.register(
87-
application.scheduler().eventConsumer(definition, application.modelFactory()::from),
88-
EventRegistrationBuilderInfo.from(application, to, x -> null));
86+
definition.scheculedConsumer =
87+
application
88+
.scheduler()
89+
.eventConsumer(
90+
definition,
91+
application.modelFactory()::from,
92+
EventRegistrationBuilderInfo.from(application, to, x -> null));
8993
}
9094
}
9195
return definition;
9296
}
9397

94-
private void register(ScheduledEventConsumer consumer, EventRegistrationBuilderInfo builderInfo) {
95-
WorkflowModelCollection model = application.modelFactory().createCollection();
96-
registrationInfo =
97-
EventRegistrationInfo.<WorkflowModel>build(
98-
builderInfo.registrations(),
99-
(ce, f) -> consumer.accept(ce, f, model),
100-
application.eventConsumer());
101-
registrationInfo
102-
.completableFuture()
103-
.thenAccept(
104-
x -> {
105-
EventRegistrationInfo prevRegistrationInfo = registrationInfo;
106-
register(consumer, builderInfo);
107-
consumer.start(model);
108-
prevRegistrationInfo.registrations().forEach(application.eventConsumer()::unregister);
109-
});
110-
}
111-
11298
public WorkflowInstance instance(Object input) {
11399
WorkflowModel inputModel = application.modelFactory().fromAny(input);
114100
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
@@ -159,8 +145,6 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
159145

160146
@Override
161147
public void close() {
162-
if (registrationInfo != null) {
163-
registrationInfo.registrations().forEach(application.eventConsumer()::unregister);
164-
}
148+
safeClose(scheculedConsumer);
165149
}
166150
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
1920
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
2021
import java.util.Collection;
2122
import java.util.function.Function;
2223

2324
public interface WorkflowScheduler {
24-
Collection<WorkflowInstance> scheduledInstances();
25+
Collection<WorkflowInstance> scheduledInstances(WorkflowDefinition def);
2526

2627
ScheduledEventConsumer eventConsumer(
27-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter);
28+
WorkflowDefinition definition,
29+
Function<CloudEvent, WorkflowModel> converter,
30+
EventRegistrationBuilderInfo info);
2831
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,38 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowScheduler;
23+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
27+
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
2629
import java.util.function.Function;
2730

2831
public class DefaultWorkflowScheduler implements WorkflowScheduler {
2932

30-
private Collection<WorkflowInstance> instances = new ArrayList<>();
33+
private Map<WorkflowDefinition, Collection<WorkflowInstance>> instances =
34+
new ConcurrentHashMap<>();
3135

3236
@Override
33-
public Collection<WorkflowInstance> scheduledInstances() {
34-
return Collections.unmodifiableCollection(instances);
37+
public Collection<WorkflowInstance> scheduledInstances(WorkflowDefinition definition) {
38+
return Collections.unmodifiableCollection(theInstances(definition));
3539
}
3640

3741
@Override
3842
public ScheduledEventConsumer eventConsumer(
39-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
40-
return new ScheduledEventConsumer(definition, converter) {
43+
WorkflowDefinition definition,
44+
Function<CloudEvent, WorkflowModel> converter,
45+
EventRegistrationBuilderInfo builderInfo) {
46+
return new ScheduledEventConsumer(definition, converter, builderInfo) {
4147
@Override
4248
protected void addScheduledInstance(WorkflowInstance instance) {
43-
instances.add(instance);
49+
theInstances(definition).add(instance);
4450
}
4551
};
4652
}
53+
54+
private Collection<WorkflowInstance> theInstances(WorkflowDefinition definition) {
55+
return instances.computeIfAbsent(definition, def -> new ArrayList<>());
56+
}
4757
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,107 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
23-
import java.util.concurrent.CompletableFuture;
23+
import io.serverlessworkflow.impl.events.EventConsumer;
24+
import io.serverlessworkflow.impl.events.EventRegistration;
25+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
26+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
2432
import java.util.function.Function;
2533

26-
public abstract class ScheduledEventConsumer {
34+
public abstract class ScheduledEventConsumer implements AutoCloseable {
2735

2836
private final Function<CloudEvent, WorkflowModel> converter;
2937
private final WorkflowDefinition definition;
38+
private final EventRegistrationBuilderInfo builderInfo;
39+
private final EventConsumer eventConsumer;
40+
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
41+
private Collection<EventRegistration> registrations = new ArrayList<>();
3042

3143
protected ScheduledEventConsumer(
32-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
44+
WorkflowDefinition definition,
45+
Function<CloudEvent, WorkflowModel> converter,
46+
EventRegistrationBuilderInfo builderInfo) {
3347
this.definition = definition;
3448
this.converter = converter;
49+
this.builderInfo = builderInfo;
50+
this.eventConsumer = definition.application().eventConsumer();
51+
if (builderInfo.registrations().isAnd()
52+
&& builderInfo.registrations().registrations().size() > 1) {
53+
this.correlatedEvents = new HashMap<>();
54+
builderInfo
55+
.registrations()
56+
.registrations()
57+
.forEach(
58+
reg -> {
59+
correlatedEvents.put(reg, new ArrayList<>());
60+
registrations.add(
61+
eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce)));
62+
});
63+
} else {
64+
builderInfo
65+
.registrations()
66+
.registrations()
67+
.forEach(
68+
reg -> registrations.add(eventConsumer.register(reg, ce -> start((CloudEvent) ce))));
69+
}
3570
}
3671

37-
public void accept(
38-
CloudEvent t, CompletableFuture<WorkflowModel> u, WorkflowModelCollection col) {
39-
WorkflowModel model = converter.apply(t);
40-
col.add(model);
41-
u.complete(model);
72+
private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) {
73+
Collection<Collection<CloudEvent>> collections = new ArrayList<>();
74+
// to minimize the critical section, conversion is done later, here we are
75+
// performing
76+
// just collection, if any
77+
synchronized (correlatedEvents) {
78+
correlatedEvents.get(reg).add((CloudEvent) ce);
79+
while (satisfyCondition()) {
80+
Collection<CloudEvent> collection = new ArrayList<>();
81+
for (List<CloudEvent> values : correlatedEvents.values()) {
82+
collection.add(values.remove(0));
83+
}
84+
collections.add(collection);
85+
}
86+
}
87+
// convert and start outside synchronized
88+
collections.forEach(this::start);
4289
}
4390

44-
public void start(Object model) {
91+
private boolean satisfyCondition() {
92+
for (List<CloudEvent> values : correlatedEvents.values()) {
93+
if (values.isEmpty()) {
94+
return false;
95+
}
96+
}
97+
return true;
98+
}
99+
100+
protected void start(CloudEvent ce) {
101+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
102+
model.add(converter.apply(ce));
103+
start(model);
104+
}
105+
106+
protected void start(Collection<CloudEvent> ces) {
107+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
108+
ces.forEach(ce -> model.add(converter.apply(ce)));
109+
start(model);
110+
}
111+
112+
private void start(WorkflowModel model) {
45113
WorkflowInstance instance = definition.instance(model);
46114
addScheduledInstance(instance);
47115
instance.start();
48116
}
49117

118+
public void close() {
119+
if (correlatedEvents != null) {
120+
correlatedEvents.clear();
121+
}
122+
registrations.forEach(eventConsumer::unregister);
123+
}
124+
50125
protected abstract void addScheduledInstance(WorkflowInstance instace);
51126
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.test;
17+
18+
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.awaitility.Awaitility.await;
21+
22+
import io.cloudevents.CloudEvent;
23+
import io.cloudevents.core.builder.CloudEventBuilder;
24+
import io.cloudevents.jackson.JsonCloudEventData;
25+
import io.serverlessworkflow.impl.WorkflowApplication;
26+
import io.serverlessworkflow.impl.WorkflowDefinition;
27+
import io.serverlessworkflow.impl.WorkflowInstance;
28+
import io.serverlessworkflow.impl.WorkflowStatus;
29+
import io.serverlessworkflow.impl.jackson.JsonUtils;
30+
import java.io.IOException;
31+
import java.net.URI;
32+
import java.time.Duration;
33+
import java.util.Collection;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.ExecutionException;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
41+
class ScheduleEventConsumerTest {
42+
43+
private WorkflowApplication appl;
44+
45+
@BeforeEach
46+
void init() throws IOException {
47+
appl = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build();
48+
}
49+
50+
@AfterEach
51+
void tearDown() throws IOException {
52+
appl.close();
53+
}
54+
55+
@Test
56+
void testAllEvent() throws IOException, InterruptedException, ExecutionException {
57+
58+
WorkflowDefinition definition =
59+
appl.workflowDefinition(
60+
readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml"));
61+
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances(definition);
62+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
63+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
64+
await()
65+
.pollDelay(Duration.ofMillis(20))
66+
.atMost(Duration.ofMillis(500))
67+
.until(
68+
() ->
69+
instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()
70+
== 1);
71+
assertThat((Collection) assertThat(instances).singleElement().actual().output().asJavaObject())
72+
.containsExactlyInAnyOrder("Javierito", "Fulanito");
73+
}
74+
75+
@Test
76+
void testOneEvent() throws IOException, InterruptedException, ExecutionException {
77+
WorkflowDefinition definition =
78+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
79+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
80+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
81+
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances(definition);
82+
await()
83+
.pollDelay(Duration.ofMillis(20))
84+
.atMost(Duration.ofMillis(500))
85+
.until(
86+
() ->
87+
instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()
88+
== 2);
89+
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
90+
assertThat(outputs)
91+
.containsExactlyInAnyOrder(
92+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
93+
}
94+
95+
@Test
96+
void testTogether() throws IOException, InterruptedException, ExecutionException {
97+
WorkflowDefinition oneDef =
98+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
99+
WorkflowDefinition allDef =
100+
appl.workflowDefinition(
101+
readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml"));
102+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
103+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
104+
Collection<WorkflowInstance> oneDefInstances = appl.scheduler().scheduledInstances(oneDef);
105+
Collection<WorkflowInstance> allDefInstances = appl.scheduler().scheduledInstances(allDef);
106+
await()
107+
.pollDelay(Duration.ofMillis(20))
108+
.atMost(Duration.ofMillis(500))
109+
.until(
110+
() ->
111+
oneDefInstances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count()
112+
== 2
113+
&& allDefInstances.stream()
114+
.filter(i -> i.status() == WorkflowStatus.COMPLETED)
115+
.count()
116+
== 1);
117+
118+
List<Object> outputs = oneDefInstances.stream().map(i -> i.output().asJavaObject()).toList();
119+
assertThat(outputs)
120+
.containsExactlyInAnyOrder(
121+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
122+
assertThat(
123+
(Collection)
124+
assertThat(allDefInstances).singleElement().actual().output().asJavaObject())
125+
.containsExactlyInAnyOrder("Javierito", "Fulanito");
126+
}
127+
128+
private CloudEvent buildCloudEvent(Object data) {
129+
return CloudEventBuilder.v1()
130+
.withId("1")
131+
.withType("com.example.hospital.events.patients.recover")
132+
.withSource(URI.create("http://www.fakejavieritotest.com"))
133+
.withData(JsonCloudEventData.wrap(JsonUtils.fromValue(data)))
134+
.build();
135+
}
136+
}

0 commit comments

Comments
 (0)