Skip to content

Commit 41d9e94

Browse files
committed
[Fix #490] Listen & Wait Events unit test
1 parent 10e34fe commit 41d9e94

File tree

8 files changed

+194
-41
lines changed

8 files changed

+194
-41
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,50 @@
2424
import java.util.AbstractCollection;
2525
import java.util.Collection;
2626
import java.util.Iterator;
27+
import java.util.List;
2728
import java.util.Map;
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.CopyOnWriteArrayList;
3031
import java.util.function.Consumer;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3134

3235
public abstract class AbstractTypeConsumer
3336
implements EventConsumer<TypeEventRegistration, TypeEventRegistrationBuilder> {
3437

38+
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
39+
40+
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
41+
42+
protected abstract void unregisterFromAll();
43+
3544
protected abstract void register(String topicName, Consumer<CloudEvent> consumer);
3645

3746
protected abstract void unregister(String topicName);
3847

3948
private Map<String, CloudEventConsumer> registrations = new ConcurrentHashMap<>();
4049

4150
@Override
42-
public TypeEventRegistrationBuilder build(EventFilter register, WorkflowApplication application) {
51+
public TypeEventRegistrationBuilder listen(
52+
EventFilter register, WorkflowApplication application) {
4353
EventProperties properties = register.getWith();
4454
String type = properties.getType();
4555
return new TypeEventRegistrationBuilder(
4656
type, new DefaultCloudEventPredicate(properties, application.expressionFactory()));
4757
}
4858

59+
@Override
60+
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
61+
return List.of(new TypeEventRegistrationBuilder(null, null));
62+
}
63+
4964
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
5065
implements Consumer<CloudEvent> {
5166
private Collection<TypeEventRegistration> registrations = new CopyOnWriteArrayList<>();
5267

5368
@Override
5469
public void accept(CloudEvent ce) {
70+
logger.debug("Received cloud event {}", ce);
5571
for (TypeEventRegistration registration : registrations) {
5672
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
5773
registration.consumer().accept(ce);
@@ -80,32 +96,42 @@ public TypeEventRegistration register(
8096
Consumer<CloudEvent> ce,
8197
WorkflowContext workflow,
8298
TaskContext taskContext) {
83-
TypeEventRegistration registration =
84-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, taskContext);
85-
registrations
86-
.computeIfAbsent(
87-
registration.type(),
88-
k -> {
89-
CloudEventConsumer consumer = new CloudEventConsumer();
90-
register(k, consumer);
91-
return consumer;
92-
})
93-
.add(registration);
94-
return registration;
99+
if (builder.type() == null) {
100+
registerToAll(ce);
101+
return new TypeEventRegistration(null, ce, null, workflow, taskContext);
102+
} else {
103+
TypeEventRegistration registration =
104+
new TypeEventRegistration(
105+
builder.type(), ce, builder.cePredicate(), workflow, taskContext);
106+
registrations
107+
.computeIfAbsent(
108+
registration.type(),
109+
k -> {
110+
CloudEventConsumer consumer = new CloudEventConsumer();
111+
register(k, consumer);
112+
return consumer;
113+
})
114+
.add(registration);
115+
return registration;
116+
}
95117
}
96118

97119
@Override
98120
public void unregister(TypeEventRegistration registration) {
99-
registrations.computeIfPresent(
100-
registration.type(),
101-
(k, v) -> {
102-
v.remove(registration);
103-
if (v.isEmpty()) {
104-
unregister(registration.type());
105-
return null;
106-
} else {
107-
return v;
108-
}
109-
});
121+
if (registration.type() == null) {
122+
unregisterFromAll();
123+
} else {
124+
registrations.computeIfPresent(
125+
registration.type(),
126+
(k, v) -> {
127+
v.remove(registration);
128+
if (v.isEmpty()) {
129+
unregister(registration.type());
130+
return null;
131+
} else {
132+
return v;
133+
}
134+
});
135+
}
110136
}
111137
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import io.serverlessworkflow.impl.TaskContext;
2121
import io.serverlessworkflow.impl.WorkflowApplication;
2222
import io.serverlessworkflow.impl.WorkflowContext;
23+
import java.util.Collection;
2324
import java.util.function.Consumer;
2425

2526
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
2627

27-
V build(EventFilter filter, WorkflowApplication workflowApplication);
28+
V listen(EventFilter filter, WorkflowApplication workflowApplication);
29+
30+
Collection<V> listenToAll(WorkflowApplication workflowApplication);
2831

2932
T register(V builder, Consumer<CloudEvent> consumer, WorkflowContext context, TaskContext task);
3033

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
import java.util.Map;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.atomic.AtomicReference;
2223
import java.util.function.Consumer;
2324

2425
/*
25-
* Straighforward implementation of in memory event broker.
26+
* Straightforward implementation of in memory event broker.
2627
* User might invoke notifyCE to simulate event reception.
2728
*/
2829
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {
@@ -37,6 +38,8 @@ public static InMemoryEvents get() {
3738

3839
private Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();
3940

41+
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
42+
4043
@Override
4144
protected void register(String topicName, Consumer<CloudEvent> consumer) {
4245
topicMap.put(topicName, consumer);
@@ -51,10 +54,24 @@ protected void unregister(String topicName) {
5154
public CompletableFuture<Void> publish(CloudEvent ce) {
5255
return CompletableFuture.runAsync(
5356
() -> {
57+
Consumer<CloudEvent> allConsumer = allConsumerRef.get();
58+
if (allConsumer != null) {
59+
allConsumer.accept(ce);
60+
}
5461
Consumer<CloudEvent> consumer = topicMap.get(ce.getType());
5562
if (consumer != null) {
5663
consumer.accept(ce);
5764
}
5865
});
5966
}
67+
68+
@Override
69+
protected void registerToAll(Consumer<CloudEvent> consumer) {
70+
allConsumerRef.set(consumer);
71+
}
72+
73+
@Override
74+
protected void unregisterFromAll() {
75+
allConsumerRef.set(null);
76+
}
6077
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.serverlessworkflow.impl.events.EventPropertiesFilter;
3030
import io.serverlessworkflow.impl.resources.ResourceLoader;
3131
import java.net.URI;
32+
import java.util.UUID;
3233
import java.util.concurrent.CompletableFuture;
3334

3435
public class EmitExecutor extends RegularTaskExecutor<EmitTask> {
@@ -75,22 +76,27 @@ protected CompletableFuture<JsonNode> internalExecute(
7576

7677
private CloudEvent buildCloudEvent(WorkflowContext workflow, TaskContext taskContext) {
7778
io.cloudevents.core.v1.CloudEventBuilder ceBuilder = CloudEventBuilder.v1();
78-
props
79-
.idFilter()
80-
.map(filter -> filter.apply(workflow, taskContext))
81-
.ifPresent(value -> ceBuilder.withId(value));
79+
ceBuilder.withId(
80+
props
81+
.idFilter()
82+
.map(filter -> filter.apply(workflow, taskContext))
83+
.orElse(UUID.randomUUID().toString()));
84+
ceBuilder.withSource(
85+
props
86+
.sourceFilter()
87+
.map(filter -> filter.apply(workflow, taskContext))
88+
.map(URI::create)
89+
.orElse(URI.create("reference-impl")));
90+
ceBuilder.withType(
91+
props
92+
.typeFilter()
93+
.map(filter -> filter.apply(workflow, taskContext))
94+
.orElseThrow(
95+
() -> new IllegalArgumentException("Type is required for emitting events")));
8296
props
8397
.timeFilter()
8498
.map(filter -> filter.apply(workflow, taskContext))
8599
.ifPresent(value -> ceBuilder.withTime(value));
86-
props
87-
.sourceFilter()
88-
.map(filter -> filter.apply(workflow, taskContext))
89-
.ifPresent(value -> ceBuilder.withSource(URI.create(value)));
90-
props
91-
.typeFilter()
92-
.map(filter -> filter.apply(workflow, taskContext))
93-
.ifPresent(value -> ceBuilder.withType(value));
94100
props
95101
.subjectFilter()
96102
.map(filter -> filter.apply(workflow, taskContext))

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ protected ListenExecutorBuilder(
7373
registrations = from(to.getAllEventConsumptionStrategy().getAll());
7474
} else if (to.getAnyEventConsumptionStrategy() != null) {
7575
isAnd = false;
76-
registrations = from(to.getAnyEventConsumptionStrategy().getAny());
76+
List<EventFilter> eventFilters = to.getAnyEventConsumptionStrategy().getAny();
77+
registrations = eventFilters.isEmpty() ? registerToAll() : from(eventFilters);
7778
} else if (to.getOneEventConsumptionStrategy() != null) {
7879
isAnd = false;
7980
registrations = List.of(from(to.getOneEventConsumptionStrategy().getOne()));
@@ -97,6 +98,10 @@ protected ListenExecutorBuilder(
9798
}
9899
}
99100

101+
private Collection<EventRegistrationBuilder> registerToAll() {
102+
return application.eventConsumer().listenToAll(application);
103+
}
104+
100105
private JsonNode defaultCEConverter(CloudEvent ce) {
101106
return CloudEventUtils.toJsonNode(ce.getData());
102107
}
@@ -106,7 +111,7 @@ private Collection<EventRegistrationBuilder> from(List<EventFilter> filters) {
106111
}
107112

108113
private EventRegistrationBuilder from(EventFilter filter) {
109-
return application.eventConsumer().build(filter, application);
114+
return application.eventConsumer().listen(filter, application);
110115
}
111116

112117
@Override
@@ -175,7 +180,7 @@ private void processCe(
175180
WorkflowContext workflow,
176181
TaskContext taskContext,
177182
CompletableFuture<JsonNode> future) {
178-
arrayNode.add(arrayNode);
183+
arrayNode.add(node);
179184
loop.ifPresentOrElse(
180185
t -> {
181186
SubscriptionIterator forEach = task.getForeach();
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.fasterxml.jackson.databind.node.ObjectNode;
23+
import io.serverlessworkflow.api.WorkflowReader;
24+
import io.serverlessworkflow.impl.json.JsonUtils;
25+
import java.io.IOException;
26+
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
31+
public class EventDefinitionTest {
32+
33+
private static WorkflowApplication appl;
34+
35+
@BeforeAll
36+
static void init() {
37+
appl = WorkflowApplication.builder().build();
38+
}
39+
40+
@Test
41+
void testAnyEvent() throws IOException {
42+
WorkflowDefinition listenDefinition =
43+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("listen.yaml"));
44+
WorkflowDefinition emitDefinition =
45+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit.yaml"));
46+
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of());
47+
CompletableFuture<JsonNode> future = waitingInstance.start();
48+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING);
49+
emitDefinition.instance(Map.of()).start().join();
50+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
51+
assertThat(future).isCompleted();
52+
assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(cruellaDeVil());
53+
}
54+
55+
private static JsonNode cruellaDeVil() {
56+
ObjectMapper mapper = JsonUtils.mapper();
57+
ObjectNode node = mapper.createObjectNode();
58+
node.set(
59+
"client", mapper.createObjectNode().put("firstName", "Cruella").put("lastName", "de Vil"));
60+
node.set(
61+
"items",
62+
mapper
63+
.createArrayNode()
64+
.add(mapper.createObjectNode().put("breed", "dalmatian").put("quantity", 101)));
65+
return mapper.createArrayNode().add(node);
66+
}
67+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: emit
5+
version: '0.1.0'
6+
do:
7+
- emitEvent:
8+
emit:
9+
event:
10+
with:
11+
source: https://petstore.com
12+
type: com.petstore.order.placed.v1
13+
data:
14+
client:
15+
firstName: Cruella
16+
lastName: de Vil
17+
items:
18+
- breed: dalmatian
19+
quantity: 101
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-any
5+
version: '0.1.0'
6+
do:
7+
- callDoctor:
8+
listen:
9+
to:
10+
any: []

0 commit comments

Comments
 (0)