Skip to content

Commit 4384a68

Browse files
committed
image pull before run
Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent cdc2557 commit 4384a68

File tree

1 file changed

+112
-36
lines changed

1 file changed

+112
-36
lines changed

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java

Lines changed: 112 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
*/
1616
package io.serverlessworkflow.impl.container.executors;
1717

18-
import static io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy.*;
18+
import static io.serverlessworkflow.api.types.ContainerLifetime.*;
1919

2020
import com.github.dockerjava.api.DockerClient;
2121
import com.github.dockerjava.api.command.CreateContainerCmd;
2222
import com.github.dockerjava.api.command.CreateContainerResponse;
23+
import com.github.dockerjava.api.command.PullImageResultCallback;
2324
import com.github.dockerjava.api.command.WaitContainerResultCallback;
24-
import com.github.dockerjava.api.exception.DockerClientException;
2525
import com.github.dockerjava.core.DefaultDockerClientConfig;
2626
import com.github.dockerjava.core.DockerClientImpl;
27+
import com.github.dockerjava.core.NameParser;
2728
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
2829
import io.serverlessworkflow.api.types.Container;
2930
import io.serverlessworkflow.impl.TaskContext;
@@ -32,7 +33,6 @@
3233
import io.serverlessworkflow.impl.WorkflowModel;
3334
import io.serverlessworkflow.impl.WorkflowUtils;
3435
import io.serverlessworkflow.impl.WorkflowValueResolver;
35-
import java.io.IOException;
3636
import java.time.Duration;
3737
import java.util.ArrayList;
3838
import java.util.List;
@@ -71,48 +71,124 @@ private ContainerRunner(
7171
CompletableFuture<WorkflowModel> startSync(
7272
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
7373

74-
StringExpressionResolver resolver =
75-
new StringExpressionResolver(workflowContext, taskContext, input);
74+
try {
75+
var resolver = new StringExpressionResolver(workflowContext, taskContext, input);
76+
applyPropertySetters(resolver);
77+
pullImageIfNeeded(container.getName());
7678

77-
propertySetters.forEach(setter -> setter.accept(resolver));
79+
String id = createAndStartContainer();
80+
int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input);
7881

79-
CreateContainerResponse createContainerResponse = createContainerCmd.exec();
80-
String containerId = createContainerResponse.getId();
82+
return mapExitCode(exit, input);
8183

82-
if (containerId == null || containerId.isEmpty()) {
83-
return failed("Container creation failed: empty container ID");
84+
} catch (InterruptedException ie) {
85+
Thread.currentThread().interrupt();
86+
return failed("Interrupted while waiting for container");
87+
} catch (Exception e) {
88+
return failed("Container run failed: " + e.getMessage());
8489
}
90+
}
91+
92+
private void applyPropertySetters(StringExpressionResolver resolver) {
93+
for (var setter : propertySetters) setter.accept(resolver);
94+
}
95+
96+
private void pullImageIfNeeded(String imageRef) throws InterruptedException {
97+
NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef);
98+
NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef);
8599

86-
dockerClient.startContainerCmd(containerId).exec();
87-
88-
int exitCode;
89-
try (WaitContainerResultCallback resultCallback =
90-
dockerClient.waitContainerCmd(containerId).exec(new WaitContainerResultCallback())) {
91-
if (container.getLifetime() != null
92-
&& container.getLifetime().getCleanup() != null
93-
&& container.getLifetime().getCleanup().equals(EVENTUALLY)) {
94-
try {
95-
WorkflowValueResolver<Duration> durationResolver =
96-
WorkflowUtils.fromTimeoutAfter(
97-
definition.application(), container.getLifetime().getAfter());
98-
Duration timeout = durationResolver.apply(workflowContext, taskContext, input);
99-
exitCode = resultCallback.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
100-
} catch (DockerClientException e) {
101-
return failed(
102-
String.format("Error while waiting for container to finish: %s ", e.getMessage()));
103-
} finally {
104-
dockerClient.removeContainerCmd(containerId).withForce(true).exec();
100+
String repository = hr.reposName;
101+
String tag = rt.tag != null ? rt.tag : "latest";
102+
103+
dockerClient
104+
.pullImageCmd(repository)
105+
.withTag(tag)
106+
.exec(new PullImageResultCallback())
107+
.awaitCompletion();
108+
}
109+
110+
private String createAndStartContainer() {
111+
CreateContainerResponse resp = createContainerCmd.exec();
112+
String id = resp.getId();
113+
if (id == null || id.isEmpty()) {
114+
throw new IllegalStateException("Container creation failed: empty ID");
115+
}
116+
dockerClient.startContainerCmd(id).exec();
117+
return id;
118+
}
119+
120+
private int waitAccordingToLifetime(
121+
String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input)
122+
throws Exception {
123+
124+
var lifetime = container.getLifetime();
125+
var policy = lifetime != null ? lifetime.getCleanup() : null;
126+
127+
try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) {
128+
129+
if (policy == ContainerCleanupPolicy.EVENTUALLY) {
130+
Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input);
131+
int exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
132+
133+
if (isRunning(id)) {
134+
safeStop(id, Duration.ofSeconds(10));
105135
}
136+
safeRemove(id);
137+
return exit;
138+
106139
} else {
107-
exitCode = resultCallback.awaitStatusCode();
140+
int exit = cb.awaitStatusCode();
141+
if (policy == ContainerCleanupPolicy.ALWAYS) {
142+
safeRemove(id);
143+
}
144+
return exit;
108145
}
109-
} catch (IOException e) {
110-
return failed(
111-
String.format("Error while waiting for container to finish: %s ", e.getMessage()));
112146
}
147+
}
148+
149+
private Duration resolveAfter(
150+
io.serverlessworkflow.api.types.ContainerLifetime lifetime,
151+
WorkflowContext workflowContext,
152+
TaskContext taskContext,
153+
WorkflowModel input) {
154+
155+
if (lifetime == null || lifetime.getAfter() == null) {
156+
return Duration.ZERO;
157+
}
158+
WorkflowValueResolver<Duration> r =
159+
WorkflowUtils.fromTimeoutAfter(definition.application(), lifetime.getAfter());
160+
return r.apply(workflowContext, taskContext, input);
161+
}
162+
163+
private boolean isRunning(String id) {
164+
try {
165+
var st = dockerClient.inspectContainerCmd(id).exec().getState();
166+
return st != null && Boolean.TRUE.equals(st.getRunning());
167+
} catch (Exception e) {
168+
return false; // must be already removed
169+
}
170+
}
171+
172+
private void safeStop(String id, Duration timeout) {
173+
try {
174+
dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec();
175+
} catch (Exception ignore) {
176+
// we can ignore this
177+
}
178+
}
179+
180+
// must be removed because of withAutoRemove(true), but just in case
181+
private void safeRemove(String id) {
182+
try {
183+
dockerClient.removeContainerCmd(id).withForce(true).exec();
184+
} catch (Exception ignore) {
185+
// we can ignore this
186+
}
187+
}
113188

114-
return switch (exitCode) {
115-
case 0 -> CompletableFuture.completedFuture(input);
189+
private static <T> CompletableFuture<T> mapExitCode(int exit, T ok) {
190+
return switch (exit) {
191+
case 0 -> CompletableFuture.completedFuture(ok);
116192
case 1 -> failed("General error (exit code 1)");
117193
case 2 -> failed("Shell syntax error (exit code 2)");
118194
case 126 -> failed("Command found but not executable (exit code 126)");
@@ -121,7 +197,7 @@ CompletableFuture<WorkflowModel> startSync(
121197
case 137 -> failed("Killed by SIGKILL (exit code 137)");
122198
case 139 -> failed("Segmentation fault (exit code 139)");
123199
case 143 -> failed("Terminated by SIGTERM (exit code 143)");
124-
default -> failed("Process exited with code " + exitCode);
200+
default -> failed("Process exited with code " + exit);
125201
};
126202
}
127203

0 commit comments

Comments
 (0)