Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Lazy Loading of Pod Templates #17701

Merged
merged 11 commits into from
Feb 19, 2025
137 changes: 119 additions & 18 deletions docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha

## How it works

The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no affect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.
The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.


## Configuration
Expand Down Expand Up @@ -363,6 +363,19 @@ The custom template pod adapter allows you to specify a pod template file per ta

The base pod template must be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.base: /path/to/basePodSpec.yaml`

The below runtime properties need to be passed to the Job's peon process.

```
druid.port=8100 (what port the peon should run on)
druid.peon.mode=remote
druid.service=druid/peon (for metrics reporting)
druid.indexer.task.baseTaskDir=/druid/data (this should match the argument to the ./peon.sh run command in the PodTemplate)
druid.indexer.runner.type=k8s
druid.indexer.task.encapsulatedTask=true
```

#### Example 1: Using a Pod Template that retrieves values from a ConfigMap

<details>
<summary>Example Pod Template that uses the regular druid docker image</summary>

Expand All @@ -372,7 +385,7 @@ kind: "PodTemplate"
template:
metadata:
annotations:
sidecar.istio.io/proxyCPU: "512m" # to handle a injected istio sidecar
sidecar.istio.io/proxyCPU: "512m" # to handle an injected istio sidecar
labels:
app.kubernetes.io/name: "druid-realtime-backend"
spec:
Expand Down Expand Up @@ -436,28 +449,17 @@ template:
```
</details>

The below runtime properties need to be passed to the Job's peon process.

```
druid.port=8100 (what port the peon should run on)
druid.peon.mode=remote
druid.service=druid/peon (for metrics reporting)
druid.indexer.task.baseTaskDir=/druid/data (this should match the argument to the ./peon.sh run command in the PodTemplate)
druid.indexer.runner.type=k8s
druid.indexer.task.encapsulatedTask=true
```

Any runtime property or JVM config used by the peon process can also be passed. E.G. below is a example of a ConfigMap that can be used to generate the `nodetype-config-volume` mount in the above template.
Any runtime property or JVM config used by the peon process can also be passed. E.G. below is an example of a ConfigMap that can be used to generate the `nodetype-config-volume` mount in the above template.

<details>
<summary>Example ConfigMap</summary>

```
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: druid-tiny-cluster-peons-config
namespace: default
apiVersion: v1
data:
jvm.config: |-
-server
Expand Down Expand Up @@ -494,8 +496,107 @@ data:
```
</details>

#### Example 2: Using a ConfigMap to upload the Pod Template file

Alternatively, we can mount the ConfigMap onto Overlord services, and use the ConfigMap to generate the pod template files we want.

<details>
<summary>Mounting to Overlord deployment</summary>

```yaml
volumeMounts:
- name: druid-pod-templates
mountPath: /path/to/podTemplate/directory

volumes:
- name: druid-pod-templates
configMap:
name: druid-pod-templates
```
</details>

<details>
<summary>Example ConfigMap that generates the Base Pod Template</summary>

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: druid-pod-templates
data:
basePodSpec.yaml: |-
apiVersion: "v1"
kind: "PodTemplate"
template:
metadata:
labels:
app.kubernetes.io/name: "druid-realtime-backend"
annotations:
sidecar.istio.io/proxyCPU: "512m"
spec:
containers:
- name: main
image: apache/druid:{{DRUIDVERSION}}
command:
- sh
- -c
- |
/peon.sh /druid/data 1
env:
- name: druid_port
value: 8100
- name: druid_plaintextPort
value: 8100
- name: druid_tlsPort
value: 8091
- name: druid_peon_mode
value: remote
- name: druid_service
value: "druid/peon"
- name: druid_indexer_task_baseTaskDir
value: /druid/data
- name: druid_indexer_runner_type
value: k8s
- name: druid_indexer_task_encapsulatedTask
value: true
ports:
- containerPort: 8091
name: druid-tls-port
protocol: TCP
- containerPort: 8100
name: druid-port
protocol: TCP
resources:
limits:
cpu: "1"
memory: 2400M
requests:
cpu: "1"
memory: 2400M
restartPolicy: "Never"
securityContext:
fsGroup: 1000
runAsGroup: 1000
runAsUser: 1000
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300

```
</details>

#### Lazy Loading of Pod Templates

Whenever the Overlord wants to spin up a Kubernetes task pod, it will first read the relevant pod template file, and then create a task pod according to the specifications of the pod template file. This is helpful when you want to make configuration changes to the task pods (e.g. increase/decrease CPU limit or resources). You can edit the pod template files directly, and the next task pod spun up by the Overlord will reflect these changes in its configurations.

#### Pod template selection

The pod template adapter can select which pod template should be used for a task using the [task runner execution config](#dynamic-config)

##### Select based on task type
Expand All @@ -512,7 +613,7 @@ Task specific pod templates can be specified as the runtime property
`druid.indexer.runner.k8s.podTemplate.{taskType}: /path/to/taskSpecificPodSpec.yaml` where {taskType} is the name of the
task type. For example, `index_parallel`.

If you are trying to use the default image's environment variable parsing feature to set runtime properties, you need to add a extra escape underscore when specifying pod templates.
If you are trying to use the default image's environment variable parsing feature to set runtime properties, you need to add an extra escape underscore when specifying pod templates.
For example, set the environment variable `druid_indexer_runner_k8s_podTemplate_index__kafka` when you set the runtime property `druid.indexer.runner.k8s.podTemplate.index_kafka`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Supplier;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateWithName;
Expand All @@ -45,5 +46,5 @@ public interface PodTemplateSelectStrategy
* @param task The task for which the Pod template is determined.
* @return The PodTemplateWithName POJO that contains the name of the template selected and the template itself.
*/
@NotNull PodTemplateWithName getPodTemplateForTask(Task task, Map<String, PodTemplate> templates);
@NotNull PodTemplateWithName getPodTemplateForTask(Task task, Map<String, Supplier<PodTemplate>> templates);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand Down Expand Up @@ -55,7 +56,7 @@ public SelectorBasedPodTemplateSelectStrategy(
* @return the template if a selector matches, otherwise fallback to base template
*/
@Override
public PodTemplateWithName getPodTemplateForTask(Task task, Map<String, PodTemplate> templates)
public PodTemplateWithName getPodTemplateForTask(Task task, Map<String, Supplier<PodTemplate>> templates)
{
String templateKey = selectors.stream()
.filter(selector -> selector.evaluate(task))
Expand All @@ -66,7 +67,8 @@ public PodTemplateWithName getPodTemplateForTask(Task task, Map<String, PodTempl
if (!templates.containsKey(templateKey)) {
templateKey = DruidK8sConstants.BASE_TEMPLATE_NAME;
}
return new PodTemplateWithName(templateKey, templates.get(templateKey));
Supplier<PodTemplate> podTemplateSupplier = templates.get(templateKey);
return new PodTemplateWithName(templateKey, podTemplateSupplier.get());
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Supplier;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -42,10 +43,11 @@ public TaskTypePodTemplateSelectStrategy()
}

@Override
public PodTemplateWithName getPodTemplateForTask(Task task, Map<String, PodTemplate> templates)
public PodTemplateWithName getPodTemplateForTask(Task task, Map<String, Supplier<PodTemplate>> templates)
{
String templateKey = templates.containsKey(task.getType()) ? task.getType() : DruidK8sConstants.BASE_TEMPLATE_NAME;
return new PodTemplateWithName(templateKey, templates.get(templateKey));
Supplier<PodTemplate> podSupplier = templates.get(templateKey);
return new PodTemplateWithName(templateKey, podSupplier.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public class DynamicConfigPodTemplateSelector implements PodTemplateSelector
+ ".k8s.podTemplate.";

private final Properties properties;
private HashMap<String, PodTemplate> podTemplates;
private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
// Supplier allows Overlord to read the most recent pod template file without calling initializeTemplatesFromFileSystem() again.
private HashMap<String, Supplier<PodTemplate>> podTemplates;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a one line comment to explain why the Supplier is defined here


public DynamicConfigPodTemplateSelector(
Properties properties,
Expand All @@ -56,25 +57,25 @@ public DynamicConfigPodTemplateSelector(
initializeTemplatesFromFileSystem();
}

private void initializeTemplatesFromFileSystem()
private void initializeTemplatesFromFileSystem() throws IAE
{
Set<String> taskAdapterTemplateKeys = getTaskAdapterTemplates(properties);
Set<String> taskAdapterTemplateKeys = getTaskAdapterTemplatesKeys(properties);
if (!taskAdapterTemplateKeys.contains("base")) {
throw new IAE(
"Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
}

HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
HashMap<String, Supplier<PodTemplate>> podTemplateMap = new HashMap<>();
for (String taskAdapterTemplateKey : taskAdapterTemplateKeys) {
Optional<PodTemplate> template = loadPodTemplate(taskAdapterTemplateKey, properties);
if (template.isPresent()) {
podTemplateMap.put(taskAdapterTemplateKey, template.get());
}
Supplier<PodTemplate> templateSupplier = () -> loadPodTemplate(taskAdapterTemplateKey, properties);
validateTemplateSupplier(templateSupplier);
podTemplateMap.put(taskAdapterTemplateKey, templateSupplier);
}

podTemplates = podTemplateMap;
}

private Set<String> getTaskAdapterTemplates(Properties properties)
private Set<String> getTaskAdapterTemplatesKeys(Properties properties)
{
Set<String> taskAdapterTemplates = new HashSet<>();

Expand All @@ -88,25 +89,34 @@ private Set<String> getTaskAdapterTemplates(Properties properties)
return taskAdapterTemplates;
}

private Optional<PodTemplate> loadPodTemplate(String key, Properties properties)
private PodTemplate loadPodTemplate(String key, Properties properties) throws IAE
{
String property = TASK_PROPERTY + key;
String podTemplateFile = properties.getProperty(property);
if (podTemplateFile == null) {
throw new IAE("Pod template file not specified for [%s]", property);

}

try {
return Optional.of(Serialization.unmarshal(
// Use Optional to assert unmarshal result is non-null.
Optional<PodTemplate> maybeTemplate = Optional.of(Serialization.unmarshal(
Files.newInputStream(new File(podTemplateFile).toPath()),
PodTemplate.class
));

return maybeTemplate.get();
}
catch (Exception e) {
throw new IAE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);
}
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void validateTemplateSupplier(Supplier<PodTemplate> templateSupplier) throws IAE
{
templateSupplier.get();
}

@Override
public Optional<PodTemplateWithName> getPodTemplateForTask(Task task)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Job fromTask(Task task) throws IOException
task.getId()
);
}
PodTemplateWithName podTemplateWithName = podTemplateSelector.getPodTemplateForTask(task).get();
PodTemplateWithName podTemplateWithName = selectedPodTemplate.get();

return new JobBuilder()
.withNewMetadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord.execution;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -41,14 +42,14 @@

public class SelectorBasedPodTemplateSelectStrategyTest
{
private Map<String, PodTemplate> templates;
private Map<String, Supplier<PodTemplate>> templates;

@Before
public void setup()
{
templates = ImmutableMap.of(
"mock",
new PodTemplate(null, null, new ObjectMeta()
() -> new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
Expand All @@ -57,7 +58,7 @@ public String getName()
}
}, null),
"no_match",
new PodTemplate(null, null, new ObjectMeta()
() -> new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
Expand All @@ -66,7 +67,7 @@ public String getName()
}
}, null),
"match",
new PodTemplate(null, null, new ObjectMeta()
() -> new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
Expand All @@ -75,7 +76,7 @@ public String getName()
}
}, null),
"base",
new PodTemplate(null, "base", new ObjectMeta()
() -> new PodTemplate(null, "base", new ObjectMeta()
{
@Override
public String getName()
Expand Down
Loading
Loading