Skip to content

Commit d1e0f6b

Browse files
vitarbmfateev
andauthored
Avoid activity task prefetching when handlers are busy (#286)
Co-authored-by: Maxim Fateev <[email protected]>
1 parent bf1e538 commit d1e0f6b

File tree

8 files changed

+503
-68
lines changed

8 files changed

+503
-68
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,21 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
7272
service = new TestWorkflowService();
7373
timeLockingInterceptor = new TimeLockingInterceptor(service);
7474
service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
75-
workflowServiceStubs =
76-
WorkflowServiceStubs.newInstance(
77-
service,
78-
WorkflowServiceStubsOptions.newBuilder()
79-
.setMetricsScope(options.getMetricsScope())
80-
.build());
8175

76+
if (this.testEnvironmentOptions.isUseExternalService()) {
77+
workflowServiceStubs =
78+
WorkflowServiceStubs.newInstance(
79+
WorkflowServiceStubsOptions.newBuilder()
80+
.setTarget(this.testEnvironmentOptions.getTarget())
81+
.build());
82+
} else {
83+
workflowServiceStubs =
84+
WorkflowServiceStubs.newInstance(
85+
service,
86+
WorkflowServiceStubsOptions.newBuilder()
87+
.setMetricsScope(options.getMetricsScope())
88+
.build());
89+
}
8290
WorkflowClient client = WorkflowClient.newInstance(workflowServiceStubs, workflowClientOptions);
8391
workerFactory = WorkerFactory.newInstance(client, options.getWorkerFactoryOptions());
8492
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232
import io.temporal.internal.common.ProtobufTimeUtils;
3333
import io.temporal.internal.metrics.MetricsType;
3434
import io.temporal.serviceclient.WorkflowServiceStubs;
35+
import java.util.concurrent.Semaphore;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

38-
final class ActivityPollTask implements Poller.PollTask<PollActivityTaskQueueResponse> {
39+
final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
3940

4041
private final WorkflowServiceStubs service;
4142
private final String namespace;
@@ -44,6 +45,7 @@ final class ActivityPollTask implements Poller.PollTask<PollActivityTaskQueueRes
4445
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
4546
private final double taskQueueActivitiesPerSecond;
4647
private final Scope metricsScope;
48+
private final Semaphore pollSemaphore;
4749

4850
public ActivityPollTask(
4951
WorkflowServiceStubs service,
@@ -58,10 +60,11 @@ public ActivityPollTask(
5860
this.options = options;
5961
this.metricsScope = options.getMetricsScope();
6062
this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
63+
this.pollSemaphore = new Semaphore(options.getTaskExecutorThreadPoolSize());
6164
}
6265

6366
@Override
64-
public PollActivityTaskQueueResponse poll() {
67+
public ActivityTask poll() {
6568
PollActivityTaskQueueRequest.Builder pollRequest =
6669
PollActivityTaskQueueRequest.newBuilder()
6770
.setNamespace(namespace)
@@ -86,31 +89,40 @@ public PollActivityTaskQueueResponse poll() {
8689
if (log.isTraceEnabled()) {
8790
log.trace("poll request begin: " + pollRequest);
8891
}
89-
PollActivityTaskQueueResponse result;
92+
PollActivityTaskQueueResponse response;
93+
boolean isSuccessful = false;
94+
95+
try {
96+
pollSemaphore.acquire();
97+
} catch (InterruptedException e) {
98+
return null;
99+
}
90100
try {
91-
result =
101+
response =
92102
service
93103
.blockingStub()
94104
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
95105
.pollActivityTaskQueue(pollRequest.build());
106+
107+
if (response == null || response.getTaskToken().isEmpty()) {
108+
metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
109+
return null;
110+
}
111+
metricsScope
112+
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
113+
.record(
114+
ProtobufTimeUtils.toM3Duration(
115+
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
116+
isSuccessful = true;
96117
} catch (StatusRuntimeException e) {
97118
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE
98119
&& e.getMessage().startsWith("UNAVAILABLE: Channel shutdown")) {
99120
return null;
100121
}
101122
throw e;
123+
} finally {
124+
if (!isSuccessful) pollSemaphore.release();
102125
}
103-
104-
if (result == null || result.getTaskToken().isEmpty()) {
105-
metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
106-
return null;
107-
}
108-
metricsScope
109-
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
110-
.record(
111-
ProtobufTimeUtils.toM3Duration(
112-
result.getStartedTime(), result.getCurrentAttemptScheduledTime()));
113-
114-
return result;
126+
return new ActivityTask(response, pollSemaphore::release);
115127
}
116128
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.internal.worker;
21+
22+
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
23+
import io.temporal.workflow.Functions;
24+
25+
final class ActivityTask {
26+
private final PollActivityTaskQueueResponse response;
27+
private final Functions.Proc completionHandle;
28+
29+
public ActivityTask(PollActivityTaskQueueResponse response, Functions.Proc completionHandle) {
30+
this.response = response;
31+
this.completionHandle = completionHandle;
32+
}
33+
34+
public PollActivityTaskQueueResponse getResponse() {
35+
return response;
36+
}
37+
38+
/**
39+
* Completion handle function that must be called by the handler whenever activity processing is
40+
* completed.
41+
*/
42+
public Functions.Proc getCompletionHandle() {
43+
return completionHandle;
44+
}
45+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public ActivityWorker(
9494
public void start() {
9595
if (handler.isAnyTypeSupported()) {
9696
poller =
97-
new Poller<>(
97+
new Poller<ActivityTask>(
9898
options.getIdentity(),
9999
new ActivityPollTask(
100100
service, namespace, taskQueue, options, taskQueueActivitiesPerSecond),
@@ -151,8 +151,7 @@ public boolean isSuspended() {
151151
return poller.isSuspended();
152152
}
153153

154-
private class TaskHandlerImpl
155-
implements PollTaskExecutor.TaskHandler<PollActivityTaskQueueResponse> {
154+
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
156155

157156
final ActivityTaskHandler handler;
158157

@@ -161,44 +160,43 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
161160
}
162161

163162
@Override
164-
public void handle(PollActivityTaskQueueResponse task) throws Exception {
165-
163+
public void handle(ActivityTask task) throws Exception {
164+
PollActivityTaskQueueResponse r = task.getResponse();
166165
Scope metricsScope =
167166
options
168167
.getMetricsScope()
169168
.tagged(
170169
ImmutableMap.of(
171170
MetricsTag.ACTIVITY_TYPE,
172-
task.getActivityType().getName(),
171+
r.getActivityType().getName(),
173172
MetricsTag.WORKFLOW_TYPE,
174-
task.getWorkflowType().getName()));
175-
176-
metricsScope
177-
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
178-
.record(
179-
ProtobufTimeUtils.toM3Duration(
180-
task.getStartedTime(), task.getCurrentAttemptScheduledTime()));
173+
r.getWorkflowType().getName()));
174+
try {
175+
metricsScope
176+
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
177+
.record(
178+
ProtobufTimeUtils.toM3Duration(
179+
r.getStartedTime(), r.getCurrentAttemptScheduledTime()));
181180

182-
// The following tags are for logging.
183-
MDC.put(LoggerTag.ACTIVITY_ID, task.getActivityId());
184-
MDC.put(LoggerTag.ACTIVITY_TYPE, task.getActivityType().getName());
185-
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
186-
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
181+
// The following tags are for logging.
182+
MDC.put(LoggerTag.ACTIVITY_ID, r.getActivityId());
183+
MDC.put(LoggerTag.ACTIVITY_TYPE, r.getActivityType().getName());
184+
MDC.put(LoggerTag.WORKFLOW_ID, r.getWorkflowExecution().getWorkflowId());
185+
MDC.put(LoggerTag.RUN_ID, r.getWorkflowExecution().getRunId());
187186

188-
propagateContext(task);
187+
propagateContext(r);
189188

190-
try {
191189
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
192190
ActivityTaskHandler.Result response;
193191
try {
194-
response = handler.handle(task, metricsScope, false);
192+
response = handler.handle(r, metricsScope, false);
195193
} finally {
196194
sw.stop();
197195
}
198-
sendReply(task, response, metricsScope);
196+
sendReply(r, response, metricsScope);
199197

200198
Duration duration =
201-
ProtobufTimeUtils.toM3DurationSinceNow(task.getCurrentAttemptScheduledTime());
199+
ProtobufTimeUtils.toM3DurationSinceNow(r.getCurrentAttemptScheduledTime());
202200
metricsScope.timer(MetricsType.ACTIVITY_E2E_LATENCY).record(duration);
203201

204202
} catch (FailureWrapperException e) {
@@ -213,15 +211,16 @@ public void handle(PollActivityTaskQueueResponse task) throws Exception {
213211
canceledRequest.setDetails(info.getDetails());
214212
}
215213
sendReply(
216-
task,
217-
new Result(task.getActivityId(), null, null, canceledRequest.build(), null),
214+
r,
215+
new Result(r.getActivityId(), null, null, canceledRequest.build(), null),
218216
metricsScope);
219217
}
220218
} finally {
221219
MDC.remove(LoggerTag.ACTIVITY_ID);
222220
MDC.remove(LoggerTag.ACTIVITY_TYPE);
223221
MDC.remove(LoggerTag.WORKFLOW_ID);
224222
MDC.remove(LoggerTag.RUN_ID);
223+
task.getCompletionHandle().apply();
225224
}
226225
}
227226

@@ -243,17 +242,18 @@ void propagateContext(PollActivityTaskQueueResponse response) {
243242
}
244243

245244
@Override
246-
public Throwable wrapFailure(PollActivityTaskQueueResponse task, Throwable failure) {
247-
WorkflowExecution execution = task.getWorkflowExecution();
245+
public Throwable wrapFailure(ActivityTask t, Throwable failure) {
246+
PollActivityTaskQueueResponse response = t.getResponse();
247+
WorkflowExecution execution = response.getWorkflowExecution();
248248
return new RuntimeException(
249-
"Failure processing activity task. WorkflowId="
249+
"Failure processing activity response. WorkflowId="
250250
+ execution.getWorkflowId()
251251
+ ", RunId="
252252
+ execution.getRunId()
253253
+ ", ActivityType="
254-
+ task.getActivityType().getName()
254+
+ response.getActivityType().getName()
255255
+ ", ActivityId="
256-
+ task.getActivityId(),
256+
+ response.getActivityId(),
257257
failure);
258258
}
259259

temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.ArrayBlockingQueue;
3030
import java.util.concurrent.CountDownLatch;
3131
import java.util.concurrent.RejectedExecutionException;
32-
import java.util.concurrent.Semaphore;
3332
import java.util.concurrent.ThreadPoolExecutor;
3433
import java.util.concurrent.TimeUnit;
3534
import java.util.concurrent.atomic.AtomicReference;
@@ -260,24 +259,14 @@ public void run() {
260259
}
261260

262261
private class PollExecutionTask implements Poller.ThrowingRunnable {
263-
private final Semaphore pollSemaphore;
264-
265-
PollExecutionTask() {
266-
this.pollSemaphore = new Semaphore(pollerOptions.getPollThreadCount());
267-
}
268262

269263
@Override
270264
public void run() throws Exception {
271-
try {
272-
pollSemaphore.acquire();
273-
T task = pollTask.poll();
274-
if (task == null) {
275-
return;
276-
}
277-
taskExecutor.process(task);
278-
} finally {
279-
pollSemaphore.release();
265+
T task = pollTask.poll();
266+
if (task == null) {
267+
return;
280268
}
269+
taskExecutor.process(task);
281270
}
282271
}
283272
}

0 commit comments

Comments
 (0)