Skip to content

Commit 11073da

Browse files
committed
[Fix #847] Alternative implementation
1 parent 0d40db7 commit 11073da

File tree

6 files changed

+125
-52
lines changed

6 files changed

+125
-52
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.*;
19+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1920

2021
import io.serverlessworkflow.api.types.Input;
2122
import io.serverlessworkflow.api.types.ListenTo;
2223
import io.serverlessworkflow.api.types.Output;
2324
import io.serverlessworkflow.api.types.Schedule;
2425
import io.serverlessworkflow.api.types.Workflow;
2526
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
26-
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
2727
import io.serverlessworkflow.impl.executors.TaskExecutor;
2828
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2929
import io.serverlessworkflow.impl.resources.ResourceLoader;
@@ -41,11 +41,11 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4141
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4242
private Optional<WorkflowFilter> inputFilter = Optional.empty();
4343
private Optional<WorkflowFilter> outputFilter = Optional.empty();
44-
private EventRegistrationInfo registrationInfo;
4544
private final WorkflowApplication application;
4645
private final TaskExecutor<?> taskExecutor;
4746
private final ResourceLoader resourceLoader;
4847
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
48+
private ScheduledEventConsumer scheculedConsumer;
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -83,32 +83,18 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8383
if (schedule != null) {
8484
ListenTo to = schedule.getOn();
8585
if (to != null) {
86-
definition.register(
87-
application.scheduler().eventConsumer(definition, application.modelFactory()::from),
88-
EventRegistrationBuilderInfo.from(application, to, x -> null));
86+
definition.scheculedConsumer =
87+
application
88+
.scheduler()
89+
.eventConsumer(
90+
definition,
91+
application.modelFactory()::from,
92+
EventRegistrationBuilderInfo.from(application, to, x -> null));
8993
}
9094
}
9195
return definition;
9296
}
9397

94-
private void register(ScheduledEventConsumer consumer, EventRegistrationBuilderInfo builderInfo) {
95-
WorkflowModelCollection model = application.modelFactory().createCollection();
96-
registrationInfo =
97-
EventRegistrationInfo.<WorkflowModel>build(
98-
builderInfo.registrations(),
99-
(ce, f) -> consumer.accept(ce, f, model),
100-
application.eventConsumer());
101-
registrationInfo
102-
.completableFuture()
103-
.thenAccept(
104-
x -> {
105-
EventRegistrationInfo prevRegistrationInfo = registrationInfo;
106-
register(consumer, builderInfo);
107-
consumer.start(model);
108-
prevRegistrationInfo.registrations().forEach(application.eventConsumer()::unregister);
109-
});
110-
}
111-
11298
public WorkflowInstance instance(Object input) {
11399
WorkflowModel inputModel = application.modelFactory().fromAny(input);
114100
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
@@ -159,8 +145,6 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
159145

160146
@Override
161147
public void close() {
162-
if (registrationInfo != null) {
163-
registrationInfo.registrations().forEach(application.eventConsumer()::unregister);
164-
}
148+
safeClose(scheculedConsumer);
165149
}
166150
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
1920
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
2021
import java.util.Collection;
2122
import java.util.function.Function;
@@ -24,5 +25,7 @@ public interface WorkflowScheduler {
2425
Collection<WorkflowInstance> scheduledInstances();
2526

2627
ScheduledEventConsumer eventConsumer(
27-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter);
28+
WorkflowDefinition definition,
29+
Function<CloudEvent, WorkflowModel> converter,
30+
EventRegistrationBuilderInfo info);
2831
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowScheduler;
23+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.function.Function;
2728

2829
public class DefaultWorkflowScheduler implements WorkflowScheduler {
2930

30-
private Collection<WorkflowInstance> instances = new ArrayList<>();
31+
private Collection<WorkflowInstance> instances = Collections.synchronizedCollection(new ArrayList<>());
3132

3233
@Override
3334
public Collection<WorkflowInstance> scheduledInstances() {
@@ -36,8 +37,10 @@ public Collection<WorkflowInstance> scheduledInstances() {
3637

3738
@Override
3839
public ScheduledEventConsumer eventConsumer(
39-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
40-
return new ScheduledEventConsumer(definition, converter) {
40+
WorkflowDefinition definition,
41+
Function<CloudEvent, WorkflowModel> converter,
42+
EventRegistrationBuilderInfo builderInfo) {
43+
return new ScheduledEventConsumer(definition, converter, builderInfo) {
4144
@Override
4245
protected void addScheduledInstance(WorkflowInstance instance) {
4346
instances.add(instance);

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,97 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
23-
import java.util.concurrent.CompletableFuture;
23+
import io.serverlessworkflow.impl.events.EventConsumer;
24+
import io.serverlessworkflow.impl.events.EventRegistration;
25+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
26+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
2432
import java.util.function.Function;
2533

26-
public abstract class ScheduledEventConsumer {
34+
public abstract class ScheduledEventConsumer implements AutoCloseable {
2735

2836
private final Function<CloudEvent, WorkflowModel> converter;
2937
private final WorkflowDefinition definition;
38+
private final EventRegistrationBuilderInfo builderInfo;
39+
private final EventConsumer eventConsumer;
40+
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
41+
private Collection<EventRegistration> registrations = new ArrayList<>();
3042

3143
protected ScheduledEventConsumer(
32-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
44+
WorkflowDefinition definition,
45+
Function<CloudEvent, WorkflowModel> converter,
46+
EventRegistrationBuilderInfo builderInfo) {
3347
this.definition = definition;
3448
this.converter = converter;
49+
this.builderInfo = builderInfo;
50+
this.eventConsumer = definition.application().eventConsumer();
51+
if (builderInfo.registrations().isAnd()) {
52+
this.correlatedEvents = new HashMap<>();
53+
builderInfo
54+
.registrations()
55+
.registrations()
56+
.forEach(
57+
reg -> {
58+
correlatedEvents.put(reg, new ArrayList<>());
59+
registrations.add(
60+
eventConsumer.register(
61+
reg,
62+
ce -> {
63+
Collection<WorkflowModelCollection> collections = new ArrayList<>();
64+
synchronized (correlatedEvents) {
65+
correlatedEvents.get(reg).add((CloudEvent) ce);
66+
while (satisfyCondition()) {
67+
WorkflowModelCollection collection =
68+
definition.application().modelFactory().createCollection();
69+
for (List<CloudEvent> values : correlatedEvents.values()) {
70+
collection.add(converter.apply(values.remove(0)));
71+
}
72+
collections.add(collection);
73+
}
74+
}
75+
collections.forEach(this::start);
76+
}));
77+
});
78+
} else {
79+
builderInfo
80+
.registrations()
81+
.registrations()
82+
.forEach(
83+
reg -> registrations.add(eventConsumer.register(reg, ce -> start((CloudEvent) ce))));
84+
}
3585
}
3686

37-
public void accept(
38-
CloudEvent t, CompletableFuture<WorkflowModel> u, WorkflowModelCollection col) {
39-
WorkflowModel model = converter.apply(t);
40-
col.add(model);
41-
u.complete(model);
87+
private boolean satisfyCondition() {
88+
for (List<CloudEvent> values : correlatedEvents.values()) {
89+
if (values.isEmpty()) {
90+
return false;
91+
}
92+
}
93+
return true;
4294
}
4395

44-
public void start(Object model) {
96+
private void start(CloudEvent ce) {
97+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
98+
model.add(converter.apply(ce));
99+
start(model);
100+
}
101+
102+
private void start(WorkflowModel model) {
45103
WorkflowInstance instance = definition.instance(model);
46104
addScheduledInstance(instance);
47105
instance.start();
48106
}
49107

108+
public void close() {
109+
if (correlatedEvents != null) {
110+
correlatedEvents.clear();
111+
}
112+
registrations.forEach(eventConsumer::unregister);
113+
}
114+
50115
protected abstract void addScheduledInstance(WorkflowInstance instace);
51116
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventTest.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.concurrent.ExecutionException;
35-
import org.junit.jupiter.api.AfterAll;
36-
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
3838

3939
class ScheduleEventTest {
4040

41-
private static WorkflowApplication appl;
41+
private WorkflowApplication appl;
4242

43-
@BeforeAll
44-
static void init() throws IOException {
45-
appl = WorkflowApplication.builder().build();
43+
@BeforeEach
44+
void init() throws IOException {
45+
appl = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build();
4646
}
4747

48-
@AfterAll
49-
static void tearDown() throws IOException {
48+
@AfterEach
49+
void tearDown() throws IOException {
5050
appl.close();
5151
}
5252

@@ -65,8 +65,26 @@ void testStartUsingEvent() throws IOException, InterruptedException, ExecutionEx
6565
.atMost(Duration.ofMillis(200))
6666
.until(() -> instances.size() == 2);
6767
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
68-
assertThat(outputs.get(0)).isEqualTo(Map.of("recovered", "Javierito"));
69-
assertThat(outputs.get(1)).isEqualTo(Map.of("recovered", "Fulanito"));
68+
assertThat(outputs)
69+
.containsExactlyInAnyOrder(
70+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
71+
}
72+
73+
@Test
74+
void testStartUsingConsecutiveEvent()
75+
throws IOException, InterruptedException, ExecutionException {
76+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
77+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
78+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
79+
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances();
80+
await()
81+
.pollDelay(Duration.ofMillis(25))
82+
.atMost(Duration.ofMillis(500))
83+
.until(() -> instances.stream().filter(i -> i.output() != null).count() == 2);
84+
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
85+
assertThat(outputs)
86+
.containsExactlyInAnyOrder(
87+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
7088
}
7189

7290
private CloudEvent buildCloudEvent(Object data) {

impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public class TraceExecutionListener implements WorkflowExecutionListener {
3737
private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class);
3838

3939
public void onWorkflowStarted(WorkflowStartedEvent ev) {
40-
4140
logger.info(
42-
"Workflow definition {} with id {} started at {}",
41+
"Workflow definition {} with id {} started at {} with data {}",
4342
ev.workflowContext().definition().workflow().getDocument().getName(),
4443
ev.workflowContext().instanceData().id(),
45-
ev.eventDate());
44+
ev.eventDate(),
45+
ev.workflowContext().instanceData().input());
4646
}
4747

4848
public void onWorkflowResumed(WorkflowResumedEvent ev) {

0 commit comments

Comments
 (0)