1515 */
1616package io .serverlessworkflow .impl .container .executors ;
1717
18- import static io .serverlessworkflow .api .types .ContainerLifetime .ContainerCleanupPolicy . *;
18+ import static io .serverlessworkflow .api .types .ContainerLifetime .*;
1919
2020import com .github .dockerjava .api .DockerClient ;
2121import com .github .dockerjava .api .command .CreateContainerCmd ;
2222import com .github .dockerjava .api .command .CreateContainerResponse ;
23+ import com .github .dockerjava .api .command .PullImageResultCallback ;
2324import com .github .dockerjava .api .command .WaitContainerResultCallback ;
24- import com .github .dockerjava .api .exception .DockerClientException ;
2525import com .github .dockerjava .core .DefaultDockerClientConfig ;
2626import com .github .dockerjava .core .DockerClientImpl ;
27+ import com .github .dockerjava .core .NameParser ;
2728import com .github .dockerjava .httpclient5 .ApacheDockerHttpClient ;
2829import io .serverlessworkflow .api .types .Container ;
2930import io .serverlessworkflow .impl .TaskContext ;
3233import io .serverlessworkflow .impl .WorkflowModel ;
3334import io .serverlessworkflow .impl .WorkflowUtils ;
3435import io .serverlessworkflow .impl .WorkflowValueResolver ;
35- import java .io .IOException ;
3636import java .time .Duration ;
3737import java .util .ArrayList ;
3838import java .util .List ;
@@ -71,48 +71,122 @@ private ContainerRunner(
7171 CompletableFuture <WorkflowModel > startSync (
7272 WorkflowContext workflowContext , TaskContext taskContext , WorkflowModel input ) {
7373
74- StringExpressionResolver resolver =
75- new StringExpressionResolver (workflowContext , taskContext , input );
74+ try {
75+ var resolver = new StringExpressionResolver (workflowContext , taskContext , input );
76+ applyPropertySetters (resolver );
77+ pullImageIfNeeded (container .getImage ());
7678
77- propertySetters .forEach (setter -> setter .accept (resolver ));
79+ String id = createAndStartContainer ();
80+ int exit = waitAccordingToLifetime (id , workflowContext , taskContext , input );
7881
79- CreateContainerResponse createContainerResponse = createContainerCmd .exec ();
80- String containerId = createContainerResponse .getId ();
82+ return mapExitCode (exit , input );
83+ } catch (InterruptedException ie ) {
84+ Thread .currentThread ().interrupt ();
85+ return failed ("Interrupted while waiting for container" );
86+ } catch (Exception e ) {
87+ return failed ("Container run failed: " + e .getMessage ());
88+ }
89+ }
90+
91+ private void applyPropertySetters (StringExpressionResolver resolver ) {
92+ for (var setter : propertySetters ) setter .accept (resolver );
93+ }
8194
82- if (containerId == null || containerId .isEmpty ()) {
83- return failed ("Container creation failed: empty container ID" );
95+ private void pullImageIfNeeded (String imageRef ) throws InterruptedException {
96+ NameParser .ReposTag rt = NameParser .parseRepositoryTag (imageRef );
97+ NameParser .HostnameReposName hr = NameParser .resolveRepositoryName (imageRef );
98+
99+ String repository = hr .reposName ;
100+ String tag = rt .tag != null && rt .tag .isEmpty () ? rt .tag : "latest" ;
101+ dockerClient
102+ .pullImageCmd (repository )
103+ .withTag (tag )
104+ .exec (new PullImageResultCallback ())
105+ .awaitCompletion ();
106+ }
107+
108+ private String createAndStartContainer () {
109+ CreateContainerResponse resp = createContainerCmd .exec ();
110+ String id = resp .getId ();
111+ if (id == null || id .isEmpty ()) {
112+ throw new IllegalStateException ("Container creation failed: empty ID" );
84113 }
114+ dockerClient .startContainerCmd (id ).exec ();
115+ return id ;
116+ }
85117
86- dockerClient .startContainerCmd (containerId ).exec ();
87-
88- int exitCode ;
89- try (WaitContainerResultCallback resultCallback =
90- dockerClient .waitContainerCmd (containerId ).exec (new WaitContainerResultCallback ())) {
91- if (container .getLifetime () != null
92- && container .getLifetime ().getCleanup () != null
93- && container .getLifetime ().getCleanup ().equals (EVENTUALLY )) {
94- try {
95- WorkflowValueResolver <Duration > durationResolver =
96- WorkflowUtils .fromTimeoutAfter (
97- definition .application (), container .getLifetime ().getAfter ());
98- Duration timeout = durationResolver .apply (workflowContext , taskContext , input );
99- exitCode = resultCallback .awaitStatusCode (timeout .toMillis (), TimeUnit .MILLISECONDS );
100- } catch (DockerClientException e ) {
101- return failed (
102- String .format ("Error while waiting for container to finish: %s " , e .getMessage ()));
103- } finally {
104- dockerClient .removeContainerCmd (containerId ).withForce (true ).exec ();
118+ private int waitAccordingToLifetime (
119+ String id , WorkflowContext workflowContext , TaskContext taskContext , WorkflowModel input )
120+ throws Exception {
121+
122+ var lifetime = container .getLifetime ();
123+ var policy = lifetime != null ? lifetime .getCleanup () : null ;
124+
125+ try (var cb = dockerClient .waitContainerCmd (id ).exec (new WaitContainerResultCallback ())) {
126+
127+ if (policy == ContainerCleanupPolicy .EVENTUALLY ) {
128+ Duration timeout = resolveAfter (lifetime , workflowContext , taskContext , input );
129+ int exit = cb .awaitStatusCode (timeout .toMillis (), TimeUnit .MILLISECONDS );
130+
131+ if (isRunning (id )) {
132+ safeStop (id , Duration .ofSeconds (10 ));
105133 }
134+ safeRemove (id );
135+ return exit ;
136+
106137 } else {
107- exitCode = resultCallback .awaitStatusCode ();
138+ int exit = cb .awaitStatusCode ();
139+ if (policy == ContainerCleanupPolicy .ALWAYS ) {
140+ safeRemove (id );
141+ }
142+ return exit ;
108143 }
109- } catch (IOException e ) {
110- return failed (
111- String .format ("Error while waiting for container to finish: %s " , e .getMessage ()));
112144 }
145+ }
146+
147+ private Duration resolveAfter (
148+ io .serverlessworkflow .api .types .ContainerLifetime lifetime ,
149+ WorkflowContext workflowContext ,
150+ TaskContext taskContext ,
151+ WorkflowModel input ) {
152+
153+ if (lifetime == null || lifetime .getAfter () == null ) {
154+ return Duration .ZERO ;
155+ }
156+ WorkflowValueResolver <Duration > r =
157+ WorkflowUtils .fromTimeoutAfter (definition .application (), lifetime .getAfter ());
158+ return r .apply (workflowContext , taskContext , input );
159+ }
160+
161+ private boolean isRunning (String id ) {
162+ try {
163+ var st = dockerClient .inspectContainerCmd (id ).exec ().getState ();
164+ return st != null && Boolean .TRUE .equals (st .getRunning ());
165+ } catch (Exception e ) {
166+ return false ; // must be already removed
167+ }
168+ }
169+
170+ private void safeStop (String id , Duration timeout ) {
171+ try {
172+ dockerClient .stopContainerCmd (id ).withTimeout ((int ) Math .max (1 , timeout .toSeconds ())).exec ();
173+ } catch (Exception ignore ) {
174+ // we can ignore this
175+ }
176+ }
177+
178+ // must be removed because of withAutoRemove(true), but just in case
179+ private void safeRemove (String id ) {
180+ try {
181+ dockerClient .removeContainerCmd (id ).withForce (true ).exec ();
182+ } catch (Exception ignore ) {
183+ // we can ignore this
184+ }
185+ }
113186
114- return switch (exitCode ) {
115- case 0 -> CompletableFuture .completedFuture (input );
187+ private static <T > CompletableFuture <T > mapExitCode (int exit , T ok ) {
188+ return switch (exit ) {
189+ case 0 -> CompletableFuture .completedFuture (ok );
116190 case 1 -> failed ("General error (exit code 1)" );
117191 case 2 -> failed ("Shell syntax error (exit code 2)" );
118192 case 126 -> failed ("Command found but not executable (exit code 126)" );
@@ -121,7 +195,7 @@ CompletableFuture<WorkflowModel> startSync(
121195 case 137 -> failed ("Killed by SIGKILL (exit code 137)" );
122196 case 139 -> failed ("Segmentation fault (exit code 139)" );
123197 case 143 -> failed ("Terminated by SIGTERM (exit code 143)" );
124- default -> failed ("Process exited with code " + exitCode );
198+ default -> failed ("Process exited with code " + exit );
125199 };
126200 }
127201
0 commit comments