Skip to content

Commit 2e19c7f

Browse files
authored
Added poll thread count options (#82)
* Added poll thread count options * Refactored Factory and Worker defaults * Removed taskListActivitiesPerSecond from SinglePollerOptions
1 parent 0d89b75 commit 2e19c7f

File tree

17 files changed

+372
-211
lines changed

17 files changed

+372
-211
lines changed

.buildkite/pipeline.yml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ steps:
99
- docker-compose#v3.0.0:
1010
run: unit-test-test-service
1111
config: docker/buildkite/docker-compose.yaml
12-
- label: ":java: Unit test with docker service"
13-
agents:
14-
queue: "default"
15-
docker: "*"
16-
command: "./gradlew --no-daemon test"
17-
timeout_in_minutes: 15
18-
plugins:
19-
- docker-compose#v3.0.0:
20-
run: unit-test-docker
21-
config: docker/buildkite/docker-compose.yaml
12+
# - label: ":java: Unit test with docker service"
13+
# agents:
14+
# queue: "default"
15+
# docker: "*"
16+
# command: "./gradlew --no-daemon test"
17+
# timeout_in_minutes: 15
18+
# plugins:
19+
# - docker-compose#v3.0.0:
20+
# run: unit-test-docker
21+
# config: docker/buildkite/docker-compose.yaml
2222
- wait

src/main/java/io/temporal/internal/metrics/ServiceMethod.java

Lines changed: 0 additions & 99 deletions
This file was deleted.

src/main/java/io/temporal/internal/replay/DeciderCache.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public final class DeciderCache {
3737
private Lock cacheLock = new ReentrantLock();
3838
private Set<String> inProcessing = new HashSet<>();
3939

40-
public DeciderCache(int maxCacheSize, Scope scope) {
41-
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
40+
public DeciderCache(int workflowCacheSize, Scope scope) {
41+
Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
4242
this.metricsScope = Objects.requireNonNull(scope);
4343
this.cache =
4444
CacheBuilder.newBuilder()
45-
.maximumSize(maxCacheSize)
45+
.maximumSize(workflowCacheSize)
4646
.removalListener(
4747
e -> {
4848
Decider entry = (Decider) e.getValue();

src/main/java/io/temporal/internal/sync/SyncActivityWorker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@ public SyncActivityWorker(
3939
WorkflowServiceStubs service,
4040
String namespace,
4141
String taskList,
42+
double taskListActivitiesPerSecond,
4243
SingleWorkerOptions options) {
4344
taskHandler =
4445
new POJOActivityTaskHandler(
4546
service, namespace, options.getDataConverter(), heartbeatExecutor);
46-
worker = new ActivityWorker(service, namespace, taskList, options, taskHandler);
47+
worker =
48+
new ActivityWorker(
49+
service, namespace, taskList, taskListActivitiesPerSecond, options, taskHandler);
4750
}
4851

4952
public void setActivitiesImplementation(Object... activitiesImplementation) {

src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,10 +1174,6 @@ private static void startDecisionTaskImpl(
11741174
.getEventsList();
11751175
long lastEventId = events.get(events.size() - 1).getEventId();
11761176
if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1177-
if (data.lastSuccessfulStartedEventId <= 0) {
1178-
throw new IllegalStateException(
1179-
"Invalid previousStartedEventId: " + data.lastSuccessfulStartedEventId);
1180-
}
11811177
events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
11821178
}
11831179
if (queryOnly && !data.workflowCompleted) {

src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,20 @@ final class ActivityPollTask implements Poller.PollTask<PollForActivityTaskRespo
4040
private final String taskList;
4141
private final SingleWorkerOptions options;
4242
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
43+
private final double taskListActivitiesPerSecond;
4344

4445
public ActivityPollTask(
4546
WorkflowServiceStubs service,
4647
String namespace,
4748
String taskList,
48-
SingleWorkerOptions options) {
49+
SingleWorkerOptions options,
50+
double taskListActivitiesPerSecond) {
4951

5052
this.service = service;
5153
this.namespace = namespace;
5254
this.taskList = taskList;
5355
this.options = options;
56+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
5457
}
5558

5659
@Override
@@ -63,14 +66,19 @@ public PollForActivityTaskResponse poll() {
6366
.setNamespace(namespace)
6467
.setIdentity(options.getIdentity())
6568
.setTaskList(TaskList.newBuilder().setName(taskList));
69+
if (taskListActivitiesPerSecond > 0) {
70+
pollRequest.setTaskListMetadata(
71+
TaskListMetadata.newBuilder()
72+
.setMaxTasksPerSecond(
73+
DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build())
74+
.build());
75+
}
6676

67-
if (options.getTaskListActivitiesPerSecond() > 0) {
77+
if (taskListActivitiesPerSecond > 0) {
6878
pollRequest.setTaskListMetadata(
6979
TaskListMetadata.newBuilder()
7080
.setMaxTasksPerSecond(
71-
DoubleValue.newBuilder()
72-
.setValue(options.getTaskListActivitiesPerSecond())
73-
.build())
81+
DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build())
7482
.build());
7583
}
7684

src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,19 @@ public final class ActivityWorker implements SuspendableWorker {
5656
private final String namespace;
5757
private final String taskList;
5858
private final SingleWorkerOptions options;
59+
private final double taskListActivitiesPerSecond;
5960

6061
public ActivityWorker(
6162
WorkflowServiceStubs service,
6263
String namespace,
6364
String taskList,
65+
double taskListActivitiesPerSecond,
6466
SingleWorkerOptions options,
6567
ActivityTaskHandler handler) {
6668
this.service = Objects.requireNonNull(service);
6769
this.namespace = Objects.requireNonNull(namespace);
6870
this.taskList = Objects.requireNonNull(taskList);
71+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
6972
this.handler = handler;
7073

7174
PollerOptions pollerOptions = options.getPollerOptions();
@@ -85,7 +88,8 @@ public void start() {
8588
poller =
8689
new Poller<>(
8790
options.getIdentity(),
88-
new ActivityPollTask(service, namespace, taskList, options),
91+
new ActivityPollTask(
92+
service, namespace, taskList, options, taskListActivitiesPerSecond),
8993
new PollTaskExecutor<>(namespace, taskList, options, new TaskHandlerImpl(handler)),
9094
options.getPollerOptions(),
9195
options.getMetricsScope());

src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public static final class Builder {
4242
private String identity;
4343
private DataConverter dataConverter;
4444
private int taskExecutorThreadPoolSize = 100;
45-
private double taskListActivitiesPerSecond;
4645
private PollerOptions pollerOptions;
4746
private Scope metricsScope;
4847
private boolean enableLoggingInReplay;
@@ -57,7 +56,6 @@ private Builder(SingleWorkerOptions options) {
5756
this.identity = options.getIdentity();
5857
this.dataConverter = options.getDataConverter();
5958
this.pollerOptions = options.getPollerOptions();
60-
this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond();
6159
this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize();
6260
this.metricsScope = options.getMetricsScope();
6361
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
@@ -94,11 +92,6 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
9492
return this;
9593
}
9694

97-
public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
98-
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
99-
return this;
100-
}
101-
10295
/** Specifies the list of context propagators to use during this workflow. */
10396
public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
10497
this.contextPropagators = contextPropagators;
@@ -127,7 +120,6 @@ public SingleWorkerOptions build() {
127120
identity,
128121
dataConverter,
129122
taskExecutorThreadPoolSize,
130-
taskListActivitiesPerSecond,
131123
pollerOptions,
132124
metricsScope,
133125
enableLoggingInReplay,
@@ -138,7 +130,6 @@ public SingleWorkerOptions build() {
138130
private final String identity;
139131
private final DataConverter dataConverter;
140132
private final int taskExecutorThreadPoolSize;
141-
private final double taskListActivitiesPerSecond;
142133
private final PollerOptions pollerOptions;
143134
private final Scope metricsScope;
144135
private final boolean enableLoggingInReplay;
@@ -148,15 +139,13 @@ private SingleWorkerOptions(
148139
String identity,
149140
DataConverter dataConverter,
150141
int taskExecutorThreadPoolSize,
151-
double taskListActivitiesPerSecond,
152142
PollerOptions pollerOptions,
153143
Scope metricsScope,
154144
boolean enableLoggingInReplay,
155145
List<ContextPropagator> contextPropagators) {
156146
this.identity = identity;
157147
this.dataConverter = dataConverter;
158148
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
159-
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
160149
this.pollerOptions = pollerOptions;
161150
this.metricsScope = metricsScope;
162151
this.enableLoggingInReplay = enableLoggingInReplay;
@@ -179,10 +168,6 @@ PollerOptions getPollerOptions() {
179168
return pollerOptions;
180169
}
181170

182-
double getTaskListActivitiesPerSecond() {
183-
return taskListActivitiesPerSecond;
184-
}
185-
186171
public Scope getMetricsScope() {
187172
return metricsScope;
188173
}

src/main/java/io/temporal/testing/TestEnvironmentOptions.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,8 @@ public TestEnvironmentOptions build() {
8383

8484
public TestEnvironmentOptions validateAndBuildWithDefaults() {
8585
return new TestEnvironmentOptions(
86-
workflowClientOptions == null
87-
? WorkflowClientOptions.newBuilder().validateAndBuildWithDefaults()
88-
: workflowClientOptions,
89-
workerFactoryOptions == null
90-
? WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults()
91-
: workerFactoryOptions,
86+
WorkflowClientOptions.newBuilder(workflowClientOptions).validateAndBuildWithDefaults(),
87+
WorkerFactoryOptions.newBuilder(workerFactoryOptions).validateAndBuildWithDefaults(),
9288
metricsScope == null ? new NoopScope() : metricsScope);
9389
}
9490
}

src/main/java/io/temporal/worker/Worker.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public final class Worker implements Suspendable {
9292

9393
this.taskList = taskList;
9494
this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults();
95-
this.factoryOptions = factoryOptions;
95+
this.factoryOptions =
96+
WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
9697
WorkflowServiceStubs service = client.getWorkflowServiceStubs();
9798
WorkflowClientOptions clientOptions = client.getOptions();
9899
String namespace = clientOptions.getNamespace();
@@ -105,7 +106,13 @@ public final class Worker implements Suspendable {
105106
taskList,
106107
contextPropagators,
107108
metricsScope);
108-
activityWorker = new SyncActivityWorker(service, namespace, taskList, activityOptions);
109+
activityWorker =
110+
new SyncActivityWorker(
111+
service,
112+
namespace,
113+
taskList,
114+
this.options.getTaskListActivitiesPerSecond(),
115+
activityOptions);
109116

110117
SingleWorkerOptions workflowOptions =
111118
toWorkflowOptions(
@@ -128,12 +135,13 @@ public final class Worker implements Suspendable {
128135
service,
129136
namespace,
130137
taskList,
131-
factoryOptions.getWorkflowInterceptor(),
138+
this.factoryOptions.getWorkflowInterceptor(),
132139
workflowOptions,
133140
localActivityOptions,
134141
this.cache,
135142
this.stickyTaskListName,
136-
Duration.ofSeconds(factoryOptions.getStickyDecisionScheduleToStartTimeoutInSeconds()),
143+
Duration.ofSeconds(
144+
this.factoryOptions.getWorkflowHostLocalTaskListScheduleToStartTimeoutSeconds()),
137145
this.threadPoolExecutor);
138146
}
139147

@@ -155,6 +163,7 @@ private static SingleWorkerOptions toActivityOptions(
155163
.setPollerOptions(
156164
PollerOptions.newBuilder()
157165
.setMaximumPollRatePerSecond(options.getMaxActivitiesPerSecond())
166+
.setPollThreadCount(options.getActivityPollThreadCount())
158167
.build())
159168
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize())
160169
.setMetricsScope(metricsScope.tagged(tags))
@@ -178,7 +187,10 @@ private static SingleWorkerOptions toWorkflowOptions(
178187
return SingleWorkerOptions.newBuilder()
179188
.setDataConverter(clientOptions.getDataConverter())
180189
.setIdentity(clientOptions.getIdentity())
181-
.setPollerOptions(PollerOptions.newBuilder().build())
190+
.setPollerOptions(
191+
PollerOptions.newBuilder()
192+
.setPollThreadCount(options.getWorkflowPollThreadCount())
193+
.build())
182194
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowTaskExecutionSize())
183195
.setMetricsScope(metricsScope.tagged(tags))
184196
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())

0 commit comments

Comments
 (0)