Skip to content

Commit b3797d6

Browse files
authored
Interceptor refactoring to support headers (#311)
1 parent 72f746b commit b3797d6

32 files changed

+1249
-706
lines changed
Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,26 @@
1919

2020
package io.temporal.common.interceptors;
2121

22-
public interface ActivityInterceptor {
23-
ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next);
22+
import io.temporal.api.common.v1.Payload;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
public class Header {
27+
private final Map<String, Payload> values;
28+
29+
public Header(io.temporal.api.common.v1.Header header) {
30+
values = header.getFieldsMap();
31+
}
32+
33+
public static Header empty() {
34+
return new Header(new HashMap<>());
35+
}
36+
37+
public Header(Map<String, Payload> values) {
38+
this.values = values;
39+
}
40+
41+
public Map<String, Payload> getValues() {
42+
return values;
43+
}
2444
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInterceptor.java renamed to temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
package io.temporal.common.interceptors;
2121

2222
/**
23-
* Intercepts workflow execution.
23+
* Intercepts workflow and activity executions.
2424
*
2525
* <p>TODO(maxim): JavaDoc with sample
2626
*/
27-
public interface WorkflowInterceptor {
27+
public interface WorkerInterceptor {
2828
WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next);
29+
30+
ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next);
2931
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,91 @@
2424
* restrictions on the workflow code should be obeyed.
2525
*/
2626
public interface WorkflowInboundCallsInterceptor {
27+
28+
final class WorkflowInput {
29+
private final Header header;
30+
private final Object[] arguments;
31+
32+
public WorkflowInput(Header header, Object[] arguments) {
33+
this.header = header;
34+
this.arguments = arguments;
35+
}
36+
37+
public Header getHeader() {
38+
return header;
39+
}
40+
41+
public Object[] getArguments() {
42+
return arguments;
43+
}
44+
}
45+
46+
final class WorkflowOutput {
47+
private final Object result;
48+
49+
public WorkflowOutput(Object result) {
50+
this.result = result;
51+
}
52+
53+
public Object getResult() {
54+
return result;
55+
}
56+
}
57+
58+
final class SignalInput {
59+
private final String signalName;
60+
private final Object[] arguments;
61+
private final long EventId;
62+
63+
public SignalInput(String signalName, Object[] arguments, long eventId) {
64+
this.signalName = signalName;
65+
this.arguments = arguments;
66+
EventId = eventId;
67+
}
68+
69+
public String getSignalName() {
70+
return signalName;
71+
}
72+
73+
public Object[] getArguments() {
74+
return arguments;
75+
}
76+
77+
public long getEventId() {
78+
return EventId;
79+
}
80+
}
81+
82+
final class QueryInput {
83+
private final String queryName;
84+
private final Object[] arguments;
85+
86+
public QueryInput(String signalName, Object[] arguments) {
87+
this.queryName = signalName;
88+
this.arguments = arguments;
89+
}
90+
91+
public String getQueryName() {
92+
return queryName;
93+
}
94+
95+
public Object[] getArguments() {
96+
return arguments;
97+
}
98+
}
99+
100+
final class QueryOutput {
101+
private final Object result;
102+
103+
public QueryOutput(Object result) {
104+
this.result = result;
105+
}
106+
107+
public Object getResult() {
108+
return result;
109+
}
110+
}
111+
27112
/**
28113
* Called when workflow class is instantiated.
29114
*
@@ -36,8 +121,11 @@ public interface WorkflowInboundCallsInterceptor {
36121
*
37122
* @return result of the workflow execution.
38123
*/
39-
Object execute(Object[] arguments);
124+
WorkflowOutput execute(WorkflowInput input);
125+
126+
/** Called when signal is delivered to a workflow execution. */
127+
void handleSignal(SignalInput input);
40128

41-
/** Called when signal is delivered to the workflow instance. */
42-
void processSignal(String signalName, Object[] arguments, long EventId);
129+
/** Called when a workflow is queried. */
130+
QueryOutput handleQuery(QueryInput input);
43131
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptorBase.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,17 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
3333
}
3434

3535
@Override
36-
public Object execute(Object[] arguments) {
37-
return next.execute(arguments);
36+
public WorkflowOutput execute(WorkflowInput input) {
37+
return next.execute(input);
3838
}
3939

4040
@Override
41-
public void processSignal(String signalName, Object[] arguments, long eventId) {
42-
next.processSignal(signalName, arguments, eventId);
41+
public void handleSignal(SignalInput input) {
42+
next.handleSignal(input);
43+
}
44+
45+
@Override
46+
public QueryOutput handleQuery(QueryInput input) {
47+
return next.handleQuery(input);
4348
}
4449
}

0 commit comments

Comments
 (0)