Skip to content

Commit 0f2c55a

Browse files
committed
[Fix #782] Adding MVStore persistence
Signed-off-by: fjtirado <[email protected]>
1 parent 2b085e0 commit 0f2c55a

File tree

19 files changed

+594
-2
lines changed

19 files changed

+594
-2
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class WorkflowApplication implements AutoCloseable {
6262
private final Collection<EventPublisher> eventPublishers;
6363
private final boolean lifeCycleCEPublishingEnabled;
6464

65-
private WorkflowApplication(Builder builder) {
65+
protected WorkflowApplication(Builder builder) {
6666
this.taskFactory = builder.taskFactory;
6767
this.exprFactory = builder.exprFactory;
6868
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -150,7 +150,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
150150
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
151151
private boolean lifeCycleCEPublishingEnabled = true;
152152

153-
private Builder() {}
153+
protected Builder() {}
154154

155155
public Builder withListener(WorkflowExecutionListener listener) {
156156
listeners.add(listener);

impl/persistence/api/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-persistence</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-persistence-api</artifactId>
9+
<name>Serverless Workflow :: Impl :: Pesistence:: API</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-impl-core</artifactId>
14+
</dependency>
15+
</dependencies>
16+
</project>
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowInstance;
19+
import java.util.Optional;
20+
21+
public interface WorkflowIdPersistentReader extends WorkflowMinimumPersistenceReader {
22+
23+
/**
24+
* Allow recovering by process instance id
25+
*
26+
* @param workflowInstanceId
27+
* @return
28+
*/
29+
Optional<WorkflowInstance> findById(String workflowInstanceId);
30+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowInstance;
19+
import java.util.stream.Stream;
20+
21+
public interface WorkflowMinimumPersistenceReader extends AutoCloseable {
22+
23+
/**
24+
* Allow streaming over all stored workflow instances
25+
*
26+
* @return
27+
*/
28+
Stream<WorkflowInstance> all();
29+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowApplication;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
20+
21+
public class WorkflowPersistenceApplication<T extends WorkflowMinimumPersistenceReader>
22+
extends WorkflowApplication {
23+
24+
private final T reader;
25+
private final WorkflowPersistenceWriter writer;
26+
27+
protected WorkflowPersistenceApplication(Builder<T> builder) {
28+
super(builder);
29+
this.reader = builder.reader;
30+
this.writer = builder.writer;
31+
if (builder.resumeAfterReboot) {
32+
reader.all().forEach(WorkflowInstance::resume);
33+
}
34+
}
35+
36+
public T persitenceReader() {
37+
return reader;
38+
}
39+
40+
public void close() {
41+
super.close();
42+
try {
43+
reader.close();
44+
} catch (Exception e) {
45+
}
46+
try {
47+
writer.close();
48+
} catch (Exception e) {
49+
}
50+
}
51+
52+
public static <T extends WorkflowMinimumPersistenceReader> Builder<T> builder(
53+
WorkflowPersistenceWriter writer, T reader) {
54+
return new Builder<>(writer, reader);
55+
}
56+
57+
public static class Builder<T extends WorkflowMinimumPersistenceReader>
58+
extends io.serverlessworkflow.impl.WorkflowApplication.Builder {
59+
60+
private final WorkflowPersistenceWriter writer;
61+
private final T reader;
62+
private boolean resumeAfterReboot = true;
63+
64+
protected Builder(WorkflowPersistenceWriter writer, T reader) {
65+
this.writer = writer;
66+
this.reader = reader;
67+
super.withListener(new WorkflowPersistenceListener(writer));
68+
}
69+
70+
public Builder<T> resumeAfterReboot(boolean resumeAfterReboot) {
71+
this.resumeAfterReboot = resumeAfterReboot;
72+
return this;
73+
}
74+
75+
public WorkflowPersistenceApplication<T> build() {
76+
return new WorkflowPersistenceApplication<>(this);
77+
}
78+
}
79+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
19+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
20+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
22+
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
23+
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
24+
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
25+
26+
public class WorkflowPersistenceListener implements WorkflowExecutionListener {
27+
28+
private final WorkflowPersistenceWriter persistenceStore;
29+
30+
public WorkflowPersistenceListener(WorkflowPersistenceWriter persistenceStore) {
31+
this.persistenceStore = persistenceStore;
32+
}
33+
34+
@Override
35+
public void onWorkflowStarted(WorkflowStartedEvent ev) {
36+
persistenceStore.started(ev.workflowContext());
37+
}
38+
39+
@Override
40+
public void onWorkflowFailed(WorkflowFailedEvent ev) {
41+
persistenceStore.failed(ev.workflowContext(), ev.cause());
42+
}
43+
44+
@Override
45+
public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
46+
persistenceStore.aborted(ev.workflowContext());
47+
}
48+
49+
@Override
50+
public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
51+
persistenceStore.suspended(ev.workflowContext());
52+
}
53+
54+
@Override
55+
public void onWorkflowResumed(WorkflowResumedEvent ev) {
56+
persistenceStore.resumed(ev.workflowContext());
57+
}
58+
59+
public void onTaskCompleted(TaskCompletedEvent ev) {
60+
persistenceStore.updated(ev.workflowContext(), ev.taskContext());
61+
}
62+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.TaskContextData;
19+
import io.serverlessworkflow.impl.WorkflowContextData;
20+
21+
public interface WorkflowPersistenceWriter extends AutoCloseable {
22+
23+
void started(WorkflowContextData workflowContext);
24+
25+
void completed(WorkflowContextData workflowContext);
26+
27+
void failed(WorkflowContextData workflowContext, Throwable ex);
28+
29+
void aborted(WorkflowContextData workflowContext);
30+
31+
void suspended(WorkflowContextData workflowContext);
32+
33+
void resumed(WorkflowContextData workflowContext);
34+
35+
void updated(WorkflowContextData workflowContext, TaskContextData taskContext);
36+
}

impl/persistence/bigmap/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-persistence</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-persistence-big-map</artifactId>
9+
<name>Serverless Workflow :: Impl :: Pesistence:: BigMap</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-persistence-api</artifactId>
14+
</dependency>
15+
</dependencies>
16+
</project>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence.bigmap;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
20+
import io.serverlessworkflow.impl.persistence.WorkflowIdPersistentReader;
21+
import java.util.Optional;
22+
23+
public abstract class BigMapIdPersistenceStore<V> extends BigMapPersistenceStore<String, V>
24+
implements WorkflowIdPersistentReader {
25+
26+
@Override
27+
protected String key(WorkflowContextData workflowContext) {
28+
return workflowContext.instanceData().id();
29+
}
30+
31+
@Override
32+
public Optional<WorkflowInstance> findById(String workflowInstanceId) {
33+
return Optional.ofNullable(instances().get(workflowInstanceId)).map(this::unmarshall);
34+
}
35+
}

0 commit comments

Comments
 (0)