Skip to content

Update Kueue and Jobset controller default limit value #502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@
get_gke_server_config,
zone_to_region,
)
from ..core.jobset import update_jobset_resources_if_necessary
from ..core.kjob import apply_kjob_crds, prepare_kjob, verify_kjob_installed
from ..core.kueue import (
cluster_preheat_yml,
install_kueue_crs,
install_kueue_on_cluster,
wait_for_kueue_available,
update_kueue_resources_if_necessary,
)
from ..core.nap import enable_autoprovisioning_on_cluster
from ..core.network import (
Expand Down Expand Up @@ -170,7 +172,6 @@ def cluster_adapt(args) -> None:
install_kueue(args, system, autoprovisioning_config)

install_kjob(args)

if system.accelerator_type == AcceleratorType['GPU']:
prepare_gpus(args, system)

Expand Down Expand Up @@ -308,6 +309,9 @@ def cluster_create(args) -> None:
set_jobset_on_cluster_code = set_jobset_on_cluster(args)
if set_jobset_on_cluster_code != 0:
xpk_exit(set_jobset_on_cluster_code)
update_jobset_resources_code = update_jobset_resources_if_necessary(args)
if update_jobset_resources_code != 0:
xpk_exit(update_jobset_resources_code)

set_pathways_job_on_cluster_code = set_pathways_job_on_cluster(args)
if set_pathways_job_on_cluster_code != 0:
Expand Down Expand Up @@ -957,6 +961,11 @@ def install_kueue(args, system: SystemCharacteristics, autoprovisioning_config):
if enable_kueue_credentials_code != 0:
xpk_exit(enable_kueue_credentials_code)

xpk_print('Update Kueue Controller Manager resources')
update_kueue_resources_code = update_kueue_resources_if_necessary(args)
if update_kueue_resources_code != 0:
xpk_exit(update_kueue_resources_code)


def prepare_gpus(args, system: SystemCharacteristics):
xpk_print('Installing NCCL Plugin for cluster')
Expand Down
143 changes: 143 additions & 0 deletions src/xpk/core/jobset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
Copyright 2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import math

from ..utils.console import xpk_exit, xpk_print
from ..utils.file import write_tmp_file
from ..core.kueue import (
MEMORY_SIZE_PER_VM,
MIN_MEMORY_LIMIT_SIZE,
)
from .commands import (
run_command_for_value,
run_command_with_updates_retry,
)

jobset_controller_manager_yml = """
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/component: manager
app.kubernetes.io/created-by: jobset
app.kubernetes.io/instance: controller-manager
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/name: deployment
app.kubernetes.io/part-of: jobset
control-plane: controller-manager
name: jobset-controller-manager
namespace: jobset-system
spec:
replicas: 1
selector:
matchLabels:
control-plane: controller-manager
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: manager
labels:
control-plane: controller-manager
spec:
containers:
- args:
- --config=/controller_manager_config.yaml
- --zap-log-level=2
command:
- /manager
image: registry.k8s.io/jobset/jobset:v0.8.0
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
name: manager
ports:
- containerPort: 9443
name: webhook-server
protocol: TCP
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
memory: {memory_limit_size}
requests:
cpu: 500m
memory: 128Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumeMounts:
- mountPath: /controller_manager_config.yaml
name: manager-config
subPath: controller_manager_config.yaml
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: cert
readOnly: true
securityContext:
runAsNonRoot: true
serviceAccountName: jobset-controller-manager
terminationGracePeriodSeconds: 10
volumes:
- configMap:
name: jobset-manager-config
name: manager-config
- name: cert
secret:
defaultMode: 420
secretName: jobset-webhook-server-cert
"""


def update_jobset_resources_if_necessary(args):
"""Update the jobset manifest to increase the resources for the jobset controller manager.

Args:
args: user provided arguments for running the command.

Returns:
0 if successful and 1 otherwise.
"""
# Get total number of nodes
cmd_total_node_num = 'kubectl get node --no-headers | wc -l'
return_code, out = run_command_for_value(
cmd_total_node_num, 'Count total nodes', args
)
if return_code != 0:
xpk_exit(1)
# 1.2MiB per VM or 4GiB (whichever is greater).
new_memory_limit = (
f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi'
)
yml_string = jobset_controller_manager_yml.format(
memory_limit_size=new_memory_limit,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'

task = 'Updating jobset Controller Manager resources'
return_code = run_command_with_updates_retry(command, task, args)
if return_code != 0:
xpk_print(f'{task} returned ERROR {return_code}')
return return_code
129 changes: 129 additions & 0 deletions src/xpk/core/kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from argparse import Namespace

import math
import packaging
from packaging.version import Version

Expand Down Expand Up @@ -43,6 +44,8 @@
CLUSTER_QUEUE_NAME = 'cluster-queue'
LOCAL_QUEUE_NAME = 'multislice-queue'
WAIT_FOR_KUEUE_TIMEOUT = '5m'
MEMORY_SIZE_PER_VM = 1.2
MIN_MEMORY_LIMIT_SIZE = 4096

packaging.version.VERSION_PATTERN = r'^v\d+\.\d+\.\d+$'

Expand Down Expand Up @@ -166,6 +169,99 @@
command: [ "sleep", "inf" ]
"""

kueue_controller_manager_yml = """
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/component: controller
app.kubernetes.io/name: kueue
control-plane: controller-manager
name: kueue-controller-manager
namespace: kueue-system
spec:
replicas: 1
selector:
matchLabels:
control-plane: controller-manager
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: manager
labels:
app.kubernetes.io/component: controller
app.kubernetes.io/name: kueue
control-plane: controller-manager
spec:
containers:
- args:
- --config=/controller_manager_config.yaml
- --zap-log-level=2
command:
- /manager
image: registry.k8s.io/kueue/kueue:v0.10.0
imagePullPolicy: Always
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
name: manager
ports:
- containerPort: 8082
name: visibility
protocol: TCP
- containerPort: 9443
name: webhook-server
protocol: TCP
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: {memory_limit_size}
requests:
cpu: 500m
memory: 512Mi
securityContext:
allowPrivilegeEscalation: false
volumeMounts:
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: cert
readOnly: true
- mountPath: /controller_manager_config.yaml
name: manager-config
subPath: controller_manager_config.yaml
- args:
- --secure-listen-address=0.0.0.0:8443
- --upstream=http://127.0.0.1:8080/
- --logtostderr=true
- --v=10
image: registry.k8s.io/kubebuilder/kube-rbac-proxy:v0.16.0
name: kube-rbac-proxy
ports:
- containerPort: 8443
name: https
protocol: TCP
securityContext:
runAsNonRoot: true
serviceAccountName: kueue-controller-manager
terminationGracePeriodSeconds: 10
volumes:
- name: cert
secret:
defaultMode: 420
secretName: kueue-webhook-server-cert
- configMap:
name: kueue-manager-config
name: manager-config
"""


def verify_kueuectl(args: Namespace) -> None:
"""Verify if kueuectl is installed.
Expand Down Expand Up @@ -386,3 +482,36 @@ def get_kueue_covered_resources_config(
total_chips=total_chips,
)
return config_string


def update_kueue_resources_if_necessary(args):
"""Update the kueue manifest to increase the resources for the kueue controller manager.

Args:
args: user provided arguments for running the command.

Returns:
0 if successful and 1 otherwise.
"""
# Get total number of nodes
cmd_total_node_num = 'kubectl get node --no-headers | wc -l'
return_code, out = run_command_for_value(
cmd_total_node_num, 'Count total nodes', args
)
if return_code != 0:
xpk_exit(1)
# 1.2MiB per VM or 4GiB (whichever is greater).
new_memory_limit = (
f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi'
)
yml_string = kueue_controller_manager_yml.format(
memory_limit_size=new_memory_limit,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'

task = 'Updating Kueue Controller Manager resources'
return_code = run_command_with_updates_retry(command, task, args)
if return_code != 0:
xpk_print(f'{task} returned ERROR {return_code}')
return return_code
Loading