Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,19 @@ public DynamicUpdateHandler getHandler() {

<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func);

<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options);

<R> R mutableSideEffect(
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func);

<R> R mutableSideEffect(
String id,
Class<R> resultClass,
Type resultType,
BiPredicate<R, R> updated,
Func<R> func,
MutableSideEffectOptions options);

int getVersion(String changeId, int minSupported, int maxSupported);

void continueAsNew(ContinueAsNewInput input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.uber.m3.tally.Scope;
import io.temporal.common.SearchAttributeUpdate;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.MutableSideEffectOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SideEffectOptions;
import io.temporal.workflow.TimerOptions;
import java.lang.reflect.Type;
import java.time.Duration;
Expand Down Expand Up @@ -88,12 +90,29 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
return next.sideEffect(resultClass, resultType, func);
}

@Override
public <R> R sideEffect(
Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options) {
return next.sideEffect(resultClass, resultType, func, options);
}

@Override
public <R> R mutableSideEffect(
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
return next.mutableSideEffect(id, resultClass, resultType, updated, func);
}

@Override
public <R> R mutableSideEffect(
String id,
Class<R> resultClass,
Type resultType,
BiPredicate<R, R> updated,
Func<R> func,
MutableSideEffectOptions options) {
return next.mutableSideEffect(id, resultClass, resultType, updated, func, options);
}

@Override
public int getVersion(String changeId, int minSupported, int maxSupported) {
return next.getVersion(changeId, minSupported, maxSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,13 @@ Functions.Proc1<RuntimeException> newTimer(
* executing operations that rely on non-global dependencies and can fail.
*
* @param func function that is called once to return a value.
* @param userMetadata user metadata to be associated with the side effect.
* @param callback function that accepts the result of the side effect.
*/
void sideEffect(Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback);
void sideEffect(
Func<Optional<Payloads>> func,
UserMetadata userMetadata,
Functions.Proc1<Optional<Payloads>> callback);

/**
* {@code mutableSideEffect} is similar to {@code sideEffect} in allowing calls of
Expand Down Expand Up @@ -253,6 +257,7 @@ Functions.Proc1<RuntimeException> newTimer(
*/
void mutableSideEffect(
String id,
UserMetadata userMetadata,
Func1<Optional<Payloads>, Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,19 @@ private void handleTimerCallback(Functions.Proc1<RuntimeException> callback, His

@Override
public void sideEffect(
Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
workflowStateMachines.sideEffect(func, callback);
Func<Optional<Payloads>> func,
UserMetadata metadata,
Functions.Proc1<Optional<Payloads>> callback) {
workflowStateMachines.sideEffect(func, metadata, callback);
}

@Override
public void mutableSideEffect(
String id,
UserMetadata metadata,
Func1<Optional<Payloads>, Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback) {
workflowStateMachines.mutableSideEffect(id, func, callback);
workflowStateMachines.mutableSideEffect(id, metadata, func, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.StdConverterBackwardsCompatAdapter;
import io.temporal.workflow.Functions;
Expand All @@ -24,6 +25,7 @@ final class MutableSideEffectStateMachine {
static final String MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect";

private final String id;
private UserMetadata metadata;
private final Functions.Func<Boolean> replaying;
private final Functions.Proc1<CancellableCommand> commandSink;

Expand Down Expand Up @@ -174,7 +176,8 @@ State createMarker() {
.setMarkerName(MUTABLE_SIDE_EFFECT_MARKER_NAME)
.putAllDetails(details)
.build();
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes));
addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes, metadata));
metadata = null; // only used once
currentSkipCount = 0;
return State.MARKER_COMMAND_CREATED;
}
Expand Down Expand Up @@ -244,18 +247,22 @@ void cancelCommandNotifyCachedResult() {
/** Creates new MutableSideEffectStateMachine */
public static MutableSideEffectStateMachine newInstance(
String id,
UserMetadata metadata,
Functions.Func<Boolean> replaying,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
return new MutableSideEffectStateMachine(id, replaying, commandSink, stateMachineSink);
return new MutableSideEffectStateMachine(
id, metadata, replaying, commandSink, stateMachineSink);
}

private MutableSideEffectStateMachine(
String id,
UserMetadata metadata,
Functions.Func<Boolean> replaying,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
this.id = Objects.requireNonNull(id);
this.metadata = metadata;
this.replaying = Objects.requireNonNull(replaying);
this.commandSink = Objects.requireNonNull(commandSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.workflow.Functions;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -33,6 +34,7 @@ enum State {
static final String MARKER_DATA_KEY = "data";
static final String SIDE_EFFECT_MARKER_NAME = "SideEffect";

private UserMetadata metadata;
private final Functions.Proc1<Optional<Payloads>> callback;
private final Functions.Func<Optional<Payloads>> func;
private final Functions.Func<Boolean> replaying;
Expand Down Expand Up @@ -77,21 +79,24 @@ enum State {
* @param commandSink callback to send commands to
*/
public static void newInstance(
UserMetadata metadata,
Functions.Func<Boolean> replaying,
Functions.Func<Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
new SideEffectStateMachine(replaying, func, callback, commandSink, stateMachineSink);
new SideEffectStateMachine(metadata, replaying, func, callback, commandSink, stateMachineSink);
}

private SideEffectStateMachine(
UserMetadata metadata,
Functions.Func<Boolean> replaying,
Functions.Func<Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
this.metadata = metadata;
this.replaying = replaying;
this.func = func;
this.callback = callback;
Expand Down Expand Up @@ -121,11 +126,18 @@ private State createMarkerCommand() {
.build();
transitionTo = State.MARKER_COMMAND_CREATED;
}
addCommand(

Command.Builder command =
Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
.setRecordMarkerCommandAttributes(markerAttributes)
.build());
.setRecordMarkerCommandAttributes(markerAttributes);

if (metadata != null) {
command.setUserMetadata(metadata);
metadata = null;
}

addCommand(command.build());
return transitionTo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.sdk.v1.UserMetadata;
import javax.annotation.Nullable;

class StateMachineCommandUtils {
public static final Command RECORD_MARKER_FAKE_COMMAND =
createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance());
createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance(), null);

public static Command createRecordMarker(RecordMarkerCommandAttributes attributes) {
return Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
.setRecordMarkerCommandAttributes(attributes)
.build();
public static Command createRecordMarker(
RecordMarkerCommandAttributes attributes, @Nullable UserMetadata metadata) {
Command.Builder command =
Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
.setRecordMarkerCommandAttributes(attributes);
if (metadata != null) {
command.setUserMetadata(metadata);
}
return command.build();
}

public static Command createFakeMarkerCommand(String markerName) {
return createRecordMarker(
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build());
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ State createMarkerExecuting() {
writeVersionChangeSA = sa != null;
RecordMarkerCommandAttributes markerAttributes =
VersionMarkerUtils.createMarkerAttributes(changeId, version, writeVersionChangeSA);
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes);
Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes, null);
addCommand(markerCommand);
if (writeVersionChangeSA) {
hasWrittenVersionChangeSA = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,9 +1157,12 @@ public Random newRandom() {
}

public void sideEffect(
Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
Functions.Func<Optional<Payloads>> func,
UserMetadata userMetadata,
Functions.Proc1<Optional<Payloads>> callback) {
checkEventLoopExecuting();
SideEffectStateMachine.newInstance(
userMetadata,
this::isReplaying,
func,
(payloads) -> {
Expand All @@ -1179,6 +1182,7 @@ public void sideEffect(
*/
public void mutableSideEffect(
String id,
UserMetadata userMetadata,
Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback) {
checkEventLoopExecuting();
Expand All @@ -1187,7 +1191,7 @@ public void mutableSideEffect(
id,
(idKey) ->
MutableSideEffectStateMachine.newInstance(
idKey, this::isReplaying, commandSink, stateMachineSink));
idKey, userMetadata, this::isReplaying, commandSink, stateMachineSink));
stateMachine.mutableSideEffect(
func,
(r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,15 @@ public Promise<Void> newTimer(Duration delay, TimerOptions options) {

@Override
public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
return sideEffect(resultClass, resultType, func, SideEffectOptions.newBuilder().build());
}

@Override
public <R> R sideEffect(
Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options) {
@Nullable
UserMetadata userMetadata =
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
try {
CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
replayContext.sideEffect(
Expand All @@ -1039,6 +1048,7 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
readOnly = false;
}
},
userMetadata,
(p) ->
runner.executeInWorkflowThread(
"side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
Expand All @@ -1054,8 +1064,23 @@ public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
@Override
public <R> R mutableSideEffect(
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
return mutableSideEffect(
id, resultClass, resultType, updated, func, MutableSideEffectOptions.newBuilder().build());
}

@Override
public <R> R mutableSideEffect(
String id,
Class<R> resultClass,
Type resultType,
BiPredicate<R, R> updated,
Func<R> func,
MutableSideEffectOptions options) {
@Nullable
UserMetadata userMetadata =
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
try {
return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
return mutableSideEffectImpl(id, userMetadata, resultClass, resultType, updated, func);
} catch (Exception e) {
// MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
// behavior. So fail the workflow task by throwing an Error.
Expand All @@ -1064,11 +1089,17 @@ public <R> R mutableSideEffect(
}

private <R> R mutableSideEffectImpl(
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
String id,
UserMetadata metadata,
Class<R> resultClass,
Type resultType,
BiPredicate<R, R> updated,
Func<R> func) {
CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
AtomicReference<R> unserializedResult = new AtomicReference<>();
replayContext.mutableSideEffect(
id,
metadata,
(storedBinary) -> {
Optional<R> stored =
storedBinary.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,31 @@ public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> fu
return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func);
}

public static <R> R sideEffect(
Class<R> resultClass, Type resultType, Func<R> func, SideEffectOptions options) {
assertNotReadOnly("side effect");
return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func, options);
}

public static <R> R mutableSideEffect(
String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
assertNotReadOnly("mutable side effect");
return getWorkflowOutboundInterceptor()
.mutableSideEffect(id, resultClass, resultType, updated, func);
}

public static <R> R mutableSideEffect(
String id,
Class<R> resultClass,
Type resultType,
BiPredicate<R, R> updated,
Func<R> func,
MutableSideEffectOptions options) {
assertNotReadOnly("mutable side effect");
return getWorkflowOutboundInterceptor()
.mutableSideEffect(id, resultClass, resultType, updated, func, options);
}

public static int getVersion(String changeId, int minSupported, int maxSupported) {
assertNotReadOnly("get version");
return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
Expand Down
Loading
Loading