2424import io .serverlessworkflow .api .types .Input ;
2525import io .serverlessworkflow .api .types .Output ;
2626import io .serverlessworkflow .api .types .TaskBase ;
27+ import io .serverlessworkflow .api .types .TaskTimeout ;
28+ import io .serverlessworkflow .api .types .Timeout ;
2729import io .serverlessworkflow .api .types .Workflow ;
2830import io .serverlessworkflow .impl .TaskContext ;
2931import io .serverlessworkflow .impl .WorkflowApplication ;
3032import io .serverlessworkflow .impl .WorkflowContext ;
3133import io .serverlessworkflow .impl .WorkflowDefinition ;
34+ import io .serverlessworkflow .impl .WorkflowError ;
35+ import io .serverlessworkflow .impl .WorkflowException ;
3236import io .serverlessworkflow .impl .WorkflowFilter ;
3337import io .serverlessworkflow .impl .WorkflowModel ;
3438import io .serverlessworkflow .impl .WorkflowMutablePosition ;
3539import io .serverlessworkflow .impl .WorkflowPosition ;
3640import io .serverlessworkflow .impl .WorkflowPredicate ;
3741import io .serverlessworkflow .impl .WorkflowStatus ;
42+ import io .serverlessworkflow .impl .WorkflowUtils ;
43+ import io .serverlessworkflow .impl .WorkflowValueResolver ;
3844import io .serverlessworkflow .impl .lifecycle .TaskCancelledEvent ;
3945import io .serverlessworkflow .impl .lifecycle .TaskCompletedEvent ;
4046import io .serverlessworkflow .impl .lifecycle .TaskFailedEvent ;
4147import io .serverlessworkflow .impl .lifecycle .TaskRetriedEvent ;
4248import io .serverlessworkflow .impl .lifecycle .TaskStartedEvent ;
4349import io .serverlessworkflow .impl .resources .ResourceLoader ;
4450import io .serverlessworkflow .impl .schema .SchemaValidator ;
51+ import java .time .Duration ;
4552import java .time .Instant ;
4653import java .util .Iterator ;
4754import java .util .Map ;
55+ import java .util .Objects ;
4856import java .util .Optional ;
4957import java .util .concurrent .CancellationException ;
5058import java .util .concurrent .CompletableFuture ;
5159import java .util .concurrent .CompletionException ;
60+ import java .util .concurrent .TimeUnit ;
5261
5362public abstract class AbstractTaskExecutor <T extends TaskBase > implements TaskExecutor <T > {
5463
@@ -62,6 +71,7 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
6271 private final Optional <SchemaValidator > outputSchemaValidator ;
6372 private final Optional <SchemaValidator > contextSchemaValidator ;
6473 private final Optional <WorkflowPredicate > ifFilter ;
74+ private final Optional <WorkflowValueResolver <Duration >> timeout ;
6575
6676 public abstract static class AbstractTaskExecutorBuilder <
6777 T extends TaskBase , V extends AbstractTaskExecutor <T >>
@@ -80,6 +90,7 @@ public abstract static class AbstractTaskExecutorBuilder<
8090 protected final Workflow workflow ;
8191 protected final ResourceLoader resourceLoader ;
8292 private final WorkflowDefinition definition ;
93+ private Optional <WorkflowValueResolver <Duration >> timeout ;
8394
8495 private V instance ;
8596
@@ -113,6 +124,28 @@ protected AbstractTaskExecutorBuilder(
113124 getSchemaValidator (application .validatorFactory (), resourceLoader , export .getSchema ());
114125 }
115126 this .ifFilter = application .expressionFactory ().buildIfFilter (task );
127+ this .timeout = getTaskTimeout ();
128+ }
129+
130+ private Optional <WorkflowValueResolver <Duration >> getTaskTimeout () {
131+ TaskTimeout timeout = task .getTimeout ();
132+ if (timeout != null ) {
133+ Timeout timeoutDef = timeout .getTaskTimeoutDefinition ();
134+ if (timeoutDef == null && timeout .getTaskTimeoutReference () != null ) {
135+ timeoutDef =
136+ Objects .requireNonNull (
137+ Objects .requireNonNull (
138+ workflow .getUse ().getTimeouts (),
139+ "Timeout reference "
140+ + timeout .getTaskTimeoutReference ()
141+ + " specified, but use timeout was not defined" )
142+ .getAdditionalProperties ()
143+ .get (timeout .getTaskTimeoutReference ()),
144+ "Timeout reference " + timeout .getTaskTimeoutReference () + "cannot be found" );
145+ }
146+ return Optional .of (WorkflowUtils .fromTimeoutAfter (application , timeoutDef .getAfter ()));
147+ }
148+ return Optional .empty ();
116149 }
117150
118151 protected final TransitionInfoBuilder next (
@@ -171,6 +204,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
171204 this .outputSchemaValidator = builder .outputSchemaValidator ;
172205 this .contextSchemaValidator = builder .contextSchemaValidator ;
173206 this .ifFilter = builder .ifFilter ;
207+ this .timeout = builder .timeout ;
174208 }
175209
176210 protected final CompletableFuture <TaskContext > executeNext (
@@ -200,7 +234,7 @@ public CompletableFuture<TaskContext> apply(
200234 } else if (taskContext .isCompleted ()) {
201235 return executeNext (completable , workflowContext );
202236 } else if (ifFilter .map (f -> f .test (workflowContext , taskContext , input )).orElse (true )) {
203- return executeNext (
237+ completable =
204238 completable
205239 .thenCompose (workflowContext .instance ()::suspendedCheck )
206240 .thenApply (
@@ -247,8 +281,22 @@ public CompletableFuture<TaskContext> apply(
247281 l .onTaskCompleted (
248282 new TaskCompletedEvent (workflowContext , taskContext )));
249283 return t ;
250- }),
251- workflowContext );
284+ });
285+ if (timeout .isPresent ()) {
286+ completable =
287+ completable
288+ .orTimeout (
289+ timeout
290+ .map (t -> t .apply (workflowContext , taskContext , input ))
291+ .orElseThrow ()
292+ .toMillis (),
293+ TimeUnit .MILLISECONDS )
294+ .exceptionallyCompose (
295+ e ->
296+ CompletableFuture .failedFuture (
297+ new WorkflowException (WorkflowError .timeout ().build (), e )));
298+ }
299+ return executeNext (completable , workflowContext );
252300 } else {
253301 taskContext .transition (getSkipTransition ());
254302 return executeNext (completable , workflowContext );
0 commit comments