Skip to content

Commit a7e38ec

Browse files
Add summary to side effect and mutable side effect (#2699)
Add summary to side effect and mutable side effect
1 parent fdf9994 commit a7e38ec

27 files changed

+640
-120
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,9 +748,19 @@ public DynamicUpdateHandler getHandler() {
748748

749749
<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func);
750750

751+
<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options);
752+
751753
<R> R mutableSideEffect(
752754
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func);
753755

756+
<R> R mutableSideEffect(
757+
String id,
758+
Class<R> resultClass,
759+
Type resultType,
760+
BiPredicate<R, R> updated,
761+
Func<R> func,
762+
MutableSideEffectOptions options);
763+
754764
int getVersion(String changeId, int minSupported, int maxSupported);
755765

756766
void continueAsNew(ContinueAsNewInput input);

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.uber.m3.tally.Scope;
44
import io.temporal.common.SearchAttributeUpdate;
55
import io.temporal.workflow.Functions.Func;
6+
import io.temporal.workflow.MutableSideEffectOptions;
67
import io.temporal.workflow.Promise;
8+
import io.temporal.workflow.SideEffectOptions;
79
import io.temporal.workflow.TimerOptions;
810
import java.lang.reflect.Type;
911
import java.time.Duration;
@@ -88,12 +90,29 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
8890
return next.sideEffect(resultClass, resultType, func);
8991
}
9092

93+
@Override
94+
public <R> R sideEffect(
95+
Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options) {
96+
return next.sideEffect(resultClass, resultType, func, options);
97+
}
98+
9199
@Override
92100
public <R> R mutableSideEffect(
93101
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
94102
return next.mutableSideEffect(id, resultClass, resultType, updated, func);
95103
}
96104

105+
@Override
106+
public <R> R mutableSideEffect(
107+
String id,
108+
Class<R> resultClass,
109+
Type resultType,
110+
BiPredicate<R, R> updated,
111+
Func<R> func,
112+
MutableSideEffectOptions options) {
113+
return next.mutableSideEffect(id, resultClass, resultType, updated, func, options);
114+
}
115+
97116
@Override
98117
public int getVersion(String changeId, int minSupported, int maxSupported) {
99118
return next.getVersion(changeId, minSupported, maxSupported);

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,13 @@ Functions.Proc1<RuntimeException> newTimer(
224224
* executing operations that rely on non-global dependencies and can fail.
225225
*
226226
* @param func function that is called once to return a value.
227+
* @param userMetadata user metadata to be associated with the side effect.
227228
* @param callback function that accepts the result of the side effect.
228229
*/
229-
void sideEffect(Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback);
230+
void sideEffect(
231+
Func<Optional<Payloads>> func,
232+
UserMetadata userMetadata,
233+
Functions.Proc1<Optional<Payloads>> callback);
230234

231235
/**
232236
* {@code mutableSideEffect} is similar to {@code sideEffect} in allowing calls of
@@ -247,6 +251,7 @@ Functions.Proc1<RuntimeException> newTimer(
247251
*
248252
* @param id id of the side effect call. It links multiple calls together. Calls with different
249253
* ids are completely independent.
254+
* @param userMetadata user metadata to attach to the marker event.
250255
* @param func function that gets as input a result of a previous {@code mutableSideEffect} call.
251256
* The function executes its business logic (like checking config value) and if value didn't
252257
* change returns {@link Optional#empty()}. If value has changed and needs to be recorded in
@@ -256,6 +261,7 @@ Functions.Proc1<RuntimeException> newTimer(
256261
*/
257262
void mutableSideEffect(
258263
String id,
264+
UserMetadata userMetadata,
259265
Func1<Optional<Payloads>, Optional<Payloads>> func,
260266
Functions.Proc1<Optional<Payloads>> callback);
261267

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,16 +315,19 @@ private void handleTimerCallback(Functions.Proc1<RuntimeException> callback, His
315315

316316
@Override
317317
public void sideEffect(
318-
Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
319-
workflowStateMachines.sideEffect(func, callback);
318+
Func<Optional<Payloads>> func,
319+
UserMetadata metadata,
320+
Functions.Proc1<Optional<Payloads>> callback) {
321+
workflowStateMachines.sideEffect(func, metadata, callback);
320322
}
321323

322324
@Override
323325
public void mutableSideEffect(
324326
String id,
327+
UserMetadata metadata,
325328
Func1<Optional<Payloads>, Optional<Payloads>> func,
326329
Functions.Proc1<Optional<Payloads>> callback) {
327-
workflowStateMachines.mutableSideEffect(id, func, callback);
330+
workflowStateMachines.mutableSideEffect(id, metadata, func, callback);
328331
}
329332

330333
@Override

temporal-sdk/src/main/java/io/temporal/internal/statemachines/MutableSideEffectStateMachine.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.temporal.api.enums.v1.EventType;
99
import io.temporal.api.history.v1.HistoryEvent;
1010
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
11+
import io.temporal.api.sdk.v1.UserMetadata;
1112
import io.temporal.common.converter.DefaultDataConverter;
1213
import io.temporal.common.converter.StdConverterBackwardsCompatAdapter;
1314
import io.temporal.workflow.Functions;
@@ -24,6 +25,7 @@ final class MutableSideEffectStateMachine {
2425
static final String MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect";
2526

2627
private final String id;
28+
private UserMetadata metadata;
2729
private final Functions.Func<Boolean> replaying;
2830
private final Functions.Proc1<CancellableCommand> commandSink;
2931

@@ -174,7 +176,8 @@ State createMarker() {
174176
.setMarkerName(MUTABLE_SIDE_EFFECT_MARKER_NAME)
175177
.putAllDetails(details)
176178
.build();
177-
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes));
179+
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes, metadata));
180+
metadata = null; // only used once
178181
currentSkipCount = 0;
179182
return State.MARKER_COMMAND_CREATED;
180183
}
@@ -244,18 +247,22 @@ void cancelCommandNotifyCachedResult() {
244247
/** Creates new MutableSideEffectStateMachine */
245248
public static MutableSideEffectStateMachine newInstance(
246249
String id,
250+
UserMetadata metadata,
247251
Functions.Func<Boolean> replaying,
248252
Functions.Proc1<CancellableCommand> commandSink,
249253
Functions.Proc1<StateMachine> stateMachineSink) {
250-
return new MutableSideEffectStateMachine(id, replaying, commandSink, stateMachineSink);
254+
return new MutableSideEffectStateMachine(
255+
id, metadata, replaying, commandSink, stateMachineSink);
251256
}
252257

253258
private MutableSideEffectStateMachine(
254259
String id,
260+
UserMetadata metadata,
255261
Functions.Func<Boolean> replaying,
256262
Functions.Proc1<CancellableCommand> commandSink,
257263
Functions.Proc1<StateMachine> stateMachineSink) {
258264
this.id = Objects.requireNonNull(id);
265+
this.metadata = metadata;
259266
this.replaying = Objects.requireNonNull(replaying);
260267
this.commandSink = Objects.requireNonNull(commandSink);
261268
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/SideEffectStateMachine.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.temporal.api.enums.v1.CommandType;
77
import io.temporal.api.enums.v1.EventType;
88
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
9+
import io.temporal.api.sdk.v1.UserMetadata;
910
import io.temporal.workflow.Functions;
1011
import java.util.HashMap;
1112
import java.util.Map;
@@ -33,6 +34,7 @@ enum State {
3334
static final String MARKER_DATA_KEY = "data";
3435
static final String SIDE_EFFECT_MARKER_NAME = "SideEffect";
3536

37+
private UserMetadata metadata;
3638
private final Functions.Proc1<Optional<Payloads>> callback;
3739
private final Functions.Func<Optional<Payloads>> func;
3840
private final Functions.Func<Boolean> replaying;
@@ -72,26 +74,30 @@ enum State {
7274
/**
7375
* Creates new SideEffect Marker
7476
*
77+
* @param metadata user metadata to attach to the side effect marker.
7578
* @param func used to produce side effect value. null if replaying.
7679
* @param callback returns side effect value or failure
7780
* @param commandSink callback to send commands to
7881
*/
7982
public static void newInstance(
83+
UserMetadata metadata,
8084
Functions.Func<Boolean> replaying,
8185
Functions.Func<Optional<Payloads>> func,
8286
Functions.Proc1<Optional<Payloads>> callback,
8387
Functions.Proc1<CancellableCommand> commandSink,
8488
Functions.Proc1<StateMachine> stateMachineSink) {
85-
new SideEffectStateMachine(replaying, func, callback, commandSink, stateMachineSink);
89+
new SideEffectStateMachine(metadata, replaying, func, callback, commandSink, stateMachineSink);
8690
}
8791

8892
private SideEffectStateMachine(
93+
UserMetadata metadata,
8994
Functions.Func<Boolean> replaying,
9095
Functions.Func<Optional<Payloads>> func,
9196
Functions.Proc1<Optional<Payloads>> callback,
9297
Functions.Proc1<CancellableCommand> commandSink,
9398
Functions.Proc1<StateMachine> stateMachineSink) {
9499
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
100+
this.metadata = metadata;
95101
this.replaying = replaying;
96102
this.func = func;
97103
this.callback = callback;
@@ -121,11 +127,18 @@ private State createMarkerCommand() {
121127
.build();
122128
transitionTo = State.MARKER_COMMAND_CREATED;
123129
}
124-
addCommand(
130+
131+
Command.Builder command =
125132
Command.newBuilder()
126133
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
127-
.setRecordMarkerCommandAttributes(markerAttributes)
128-
.build());
134+
.setRecordMarkerCommandAttributes(markerAttributes);
135+
136+
if (metadata != null) {
137+
command.setUserMetadata(metadata);
138+
metadata = null;
139+
}
140+
141+
addCommand(command.build());
129142
return transitionTo;
130143
}
131144

temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,27 @@
33
import io.temporal.api.command.v1.Command;
44
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
55
import io.temporal.api.enums.v1.CommandType;
6+
import io.temporal.api.sdk.v1.UserMetadata;
7+
import javax.annotation.Nullable;
68

79
class StateMachineCommandUtils {
810
public static final Command RECORD_MARKER_FAKE_COMMAND =
9-
createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance());
11+
createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance(), null);
1012

11-
public static Command createRecordMarker(RecordMarkerCommandAttributes attributes) {
12-
return Command.newBuilder()
13-
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
14-
.setRecordMarkerCommandAttributes(attributes)
15-
.build();
13+
public static Command createRecordMarker(
14+
RecordMarkerCommandAttributes attributes, @Nullable UserMetadata metadata) {
15+
Command.Builder command =
16+
Command.newBuilder()
17+
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
18+
.setRecordMarkerCommandAttributes(attributes);
19+
if (metadata != null) {
20+
command.setUserMetadata(metadata);
21+
}
22+
return command.build();
1623
}
1724

1825
public static Command createFakeMarkerCommand(String markerName) {
1926
return createRecordMarker(
20-
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build());
27+
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build(), null);
2128
}
2229
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ State createMarkerExecuting() {
240240
writeVersionChangeSA = sa != null;
241241
RecordMarkerCommandAttributes markerAttributes =
242242
VersionMarkerUtils.createMarkerAttributes(changeId, version, writeVersionChangeSA);
243-
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes);
243+
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes, null);
244244
addCommand(markerCommand);
245245
if (writeVersionChangeSA) {
246246
hasWrittenVersionChangeSA = true;

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,9 +1157,12 @@ public Random newRandom() {
11571157
}
11581158

11591159
public void sideEffect(
1160-
Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
1160+
Functions.Func<Optional<Payloads>> func,
1161+
UserMetadata userMetadata,
1162+
Functions.Proc1<Optional<Payloads>> callback) {
11611163
checkEventLoopExecuting();
11621164
SideEffectStateMachine.newInstance(
1165+
userMetadata,
11631166
this::isReplaying,
11641167
func,
11651168
(payloads) -> {
@@ -1179,6 +1182,7 @@ public void sideEffect(
11791182
*/
11801183
public void mutableSideEffect(
11811184
String id,
1185+
UserMetadata userMetadata,
11821186
Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
11831187
Functions.Proc1<Optional<Payloads>> callback) {
11841188
checkEventLoopExecuting();
@@ -1187,7 +1191,7 @@ public void mutableSideEffect(
11871191
id,
11881192
(idKey) ->
11891193
MutableSideEffectStateMachine.newInstance(
1190-
idKey, this::isReplaying, commandSink, stateMachineSink));
1194+
idKey, userMetadata, this::isReplaying, commandSink, stateMachineSink));
11911195
stateMachine.mutableSideEffect(
11921196
func,
11931197
(r) -> {

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,15 @@ public Promise<Void> newTimer(Duration delay, TimerOptions options) {
10271027

10281028
@Override
10291029
public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
1030+
return sideEffect(resultClass, resultType, func, SideEffectOptions.newBuilder().build());
1031+
}
1032+
1033+
@Override
1034+
public <R> R sideEffect(
1035+
Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options) {
1036+
@Nullable
1037+
UserMetadata userMetadata =
1038+
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
10301039
try {
10311040
CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
10321041
replayContext.sideEffect(
@@ -1039,6 +1048,7 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
10391048
readOnly = false;
10401049
}
10411050
},
1051+
userMetadata,
10421052
(p) ->
10431053
runner.executeInWorkflowThread(
10441054
"side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
@@ -1054,8 +1064,23 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
10541064
@Override
10551065
public <R> R mutableSideEffect(
10561066
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1067+
return mutableSideEffect(
1068+
id, resultClass, resultType, updated, func, MutableSideEffectOptions.newBuilder().build());
1069+
}
1070+
1071+
@Override
1072+
public <R> R mutableSideEffect(
1073+
String id,
1074+
Class<R> resultClass,
1075+
Type resultType,
1076+
BiPredicate<R, R> updated,
1077+
Func<R> func,
1078+
MutableSideEffectOptions options) {
1079+
@Nullable
1080+
UserMetadata userMetadata =
1081+
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
10571082
try {
1058-
return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1083+
return mutableSideEffectImpl(id, userMetadata, resultClass, resultType, updated, func);
10591084
} catch (Exception e) {
10601085
// MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
10611086
// behavior. So fail the workflow task by throwing an Error.
@@ -1064,11 +1089,17 @@ public <R> R mutableSideEffect(
10641089
}
10651090

10661091
private <R> R mutableSideEffectImpl(
1067-
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1092+
String id,
1093+
UserMetadata metadata,
1094+
Class<R> resultClass,
1095+
Type resultType,
1096+
BiPredicate<R, R> updated,
1097+
Func<R> func) {
10681098
CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
10691099
AtomicReference<R> unserializedResult = new AtomicReference<>();
10701100
replayContext.mutableSideEffect(
10711101
id,
1102+
metadata,
10721103
(storedBinary) -> {
10731104
Optional<R> stored =
10741105
storedBinary.map(

0 commit comments

Comments
 (0)