diff --git a/examples/llama-3.1-finetuning-multi-host/train.py b/examples/llama-3.1-finetuning-multi-host/train.py index bdf59c62f..b5d6d445e 100644 --- a/examples/llama-3.1-finetuning-multi-host/train.py +++ b/examples/llama-3.1-finetuning-multi-host/train.py @@ -17,8 +17,13 @@ from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling import torch - +from torch.distributed.elastic.multiprocessing.errors import record # 1. Data Loading and Preprocessing +import os + +for name, value in os.environ.items(): + print(f"{name}: {value}") + dataset = load_dataset("json", data_files="training_data.jsonl", split="train") @@ -75,7 +80,7 @@ def tokenize_function(example): learning_rate=2e-5, num_train_epochs=3, logging_dir="./logs", - logging_steps=10, + logging_steps=1, save_steps=100, eval_strategy="no", fp16=False, @@ -94,6 +99,13 @@ def tokenize_function(example): data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False), ) -trainer.train() -trainer.save_model("./llama3-finetuned-final") -tokenizer.save_pretrained("./llama3-finetuned-final") + +@record +def train_func(): + trainer.train() + trainer.save_model("./llama3-finetuned-final") + tokenizer.save_pretrained("./llama3-finetuned-final") + + +if __name__ == "__main__": + train_func() diff --git a/examples/nccl-batch/README.MD b/examples/nccl-batch/README.MD new file mode 100644 index 000000000..372b0fe7a --- /dev/null +++ b/examples/nccl-batch/README.MD @@ -0,0 +1,55 @@ +# Running NCCL tests on GPU clusters using xpk slurm mode. + +This document provides an introduction to running tests for the NVIDIA Collective Communications Library (NCCL). NCCL is a high-performance, multi-GPU communications library used in deep learning and other applications. The test suite helps verify the correct functionality and performance of NCCL on your system. Please visit [NCCL tests github](https://github.com/NVIDIA/nccl-tests?tab=readme-ov-file#nccl-tests) to learn more about NCCL and running it. + +Steps presented in this document are designed to run on A3 Ultra and A3 Mega machines (`DEVICE_TYPE=h200-141gb-8` or `DEVICE_TYPE=h100-mega-80gb-8`). + +### 1. Create cluster + +Skip this step if you have already provisioned a GKE cluster with A3 Ultra or A3 Mega machines. + +First step is to create a cluster with A3 Ultra or A3 Mega machine. Execute command below: + +``` +python3 xpk.py cluster create \ + --cluster=$CLUSTER_NAME --device-type=$DEVICE_TYPE \ + --zone=$COMPUTE_ZONE --project=$PROJECT_ID \ + --num-nodes=$CLUSTER_NUM_NODES --reservation=$RESERVATION_ID +``` + +### 2. Run NCCL workload + +The command to run NCCL tests on A3 clusters depends on the type of machine. + + +#### A3 Mega + +It is neccessary to set batch image to run nccl tests in slurm mode. + +```bash +python3 xpk.py config set batch-working-directory /home/nccl +``` + +Docker image provided by google will be used. +```bash +python3 xpk.py config set batch-image us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpxo/nccl-plugin-gpudirecttcpx-dev:v1.0.8-1 +``` + +Command `xpk run` will be used to start running nccl tests. + +```bash +python3 xpk.py run \ + --cluster $CLUSTER_NAME --zone $COMPUTE_ZONE \ + --project $PROJECT_ID \ + --nodes $NUM_NODES \ + --gpus-per-task nvidia.com/gpu:8 \ + examples/nccl-batch/a3mega.slurm +``` + +### Troubleshooting + +If you are getting a 403 Forbidden Error when creating docker image, make sure to add `us-docker.pkg.dev` to the list of gcloud credential helpers using this command: + +```bash +gcloud auth configure-docker us-docker.pkg.dev +``` \ No newline at end of file diff --git a/examples/nccl-batch/a3mega.slurm b/examples/nccl-batch/a3mega.slurm new file mode 100644 index 000000000..7ddb7f261 --- /dev/null +++ b/examples/nccl-batch/a3mega.slurm @@ -0,0 +1,46 @@ +#!/bin/bash +#SBATCH --job-name=nccl-tests +#SBATCH --cpus-per-task=8 +#SBATCH --nodes=4 +#SBATCH --ntasks=1 + +set -x +apt update -y +apt install -y iputils-ping +/scripts/container_entry.sh daemon & + +export POSTFIX=$(hostname --fqdn | cut -d . -f 2-) +export WORKERS_BASENAME=$(hostname --fqdn | cut -d . -f 1 | rev | cut -d - -f 2- | rev ) +export NODE_RANK=$SLURM_JOB_ID-1 + +for i in `seq 0 $(($SLURM_NNODES-1))`; do + OTHER=${WORKERS_BASENAME}-${i}.${POSTFIX} + until ssh -p 222 -o StrictHostKeyChecking=no $OTHER hostname; do + echo "Waiting for ${OTHER}..." + sleep 10 + done + echo ${OTHER} port=222 slots=8 | tee -a /tmp/hostfile; +done + +cat /tmp/hostfile +echo "nodepool size" +echo $(( ${SLURM_GPUS} * "${SLURM_NNODES}" )) +if [[ "${NODE_RANK}" -eq "0" ]]; then + export NCCL_TESTS_SPLIT_MASK="0x0"; + export NCCL_LIB_DIR=$LD_LIBRARY_PATH + ENV_VARS=$(echo ${!NCCL*} ${!OMPI*} LD_LIBRARY_PATH PATH | sed 's/ / -x /g') + mpirun --hostfile /tmp/hostfile \ + -x $ENV_VARS \ + --allow-run-as-root \ + -np $(( ${SLURM_GPUS} * "${SLURM_NNODES}" )) \ + --mca orte_keep_fqdn_hostnames 1 \ + --mca btl tcp,self \ + --mca btl_tcp_if_include eth0 \ + --mca plm_rsh_agent "ssh -q -o LogLevel=ERROR -o StrictHostKeyChecking=no -p 222" \ + taskset -c 32-63 /scripts/demo_mpi_entry_with_config_profile.sh all_gather_perf \ + -b 1K -e 8G -f 2 -g 1 -w 5 --iters 100 +else + while ping -c 1 ${WORKERS_BASENAME}-0.${POSTFIX}; do + sleep 5 +done +fi diff --git a/examples/nccl-workload/nccl.md b/examples/nccl-workload/nccl.md deleted file mode 100644 index 05e9ce705..000000000 --- a/examples/nccl-workload/nccl.md +++ /dev/null @@ -1,29 +0,0 @@ -# Running NCCL tests on GPU clusters using xpk - -This document provides an introduction to running tests for the NVIDIA Collective Communications Library (NCCL). NCCL is a high-performance, multi-GPU communications library used in deep learning and other applications. The test suite helps verify the correct functionality and performance of NCCL on your system. Please visit [NCCL tests github](https://github.com/NVIDIA/nccl-tests?tab=readme-ov-file#nccl-tests) to learn more about NCCL and running it. - -Steps presented in this document are designed to run on A3 ultra machines (`DEVICE_TYPE=h200-141gb-8`). - -### 1. Create cluster - -First step is to create a cluster with A3 ultra machine. Execute below step: - -``` -python3 xpk.py cluster create \ - --cluster=$CLUSTER_NAME --device-type=$DEVICE_TYPE \ - --zone=$COMPUTE_ZONE --project=$PROJECT_ID \ - --num-nodes=$CLUSTER_NUM_NODES --reservation=$RESERVATION_ID -``` - -### 2. Run NCCL workload - -To run NCCL tests on created cluster a workload will be submitted using xpk as follows: - -``` -python3 xpk.py workload create \ - --workload=nccl-test --command="./examples/nccl/nccl.sh" \ - --base-docker-image=us-docker.pkg.dev/gce-ai-infra/gpudirect-gib/nccl-plugin-gib-diagnostic:v1.0.3 \ - --cluster=$CLUSTER_NAME --device-type=$DEVICE_TYPE \ - --zone=$COMPUTE_ZONE --project=$PROJECT_ID \ - --num-nodes=$WORKLOAD_NUM_NODES -``` diff --git a/examples/nccl-workload/nccl.sh b/examples/nccl-workload/nccl.sh deleted file mode 100755 index 873c602c2..000000000 --- a/examples/nccl-workload/nccl.sh +++ /dev/null @@ -1,84 +0,0 @@ -# """ -# Copyright 2025 Google LLC - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# https://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# """ - -# This is a script to execute a nccl test. See https://github.com/NVIDIA/nccl-tests for more details - -#!/bin/bash - -set -x -echo "Starting workload container for $NNODES benchmark" - -# Load all the cuda libs -/sbin/ldconfig - -# Install ping -apt update -y -apt install -y iputils-ping - -# Start sshd -/scripts/container_entry.sh daemon & - -# Get helper variables to form all hostnames -export POSTFIX=$(hostname --fqdn | cut -d . -f 2-) -export WORKERS_BASENAME=$(hostname --fqdn | cut -d . -f 1 | rev | cut -d - -f 2- | rev ) -export NODE_RANK=$JOB_COMPLETION_INDEX - - -# For every worker, wait till online and add to hostfile -for i in `seq 0 $(($NNODES-1))`; do - OTHER=${WORKERS_BASENAME}-${i}.${POSTFIX} - until ssh -p 222 -o StrictHostKeyChecking=no $OTHER hostname; do - echo Waiting for ${OTHER}... - sleep 10 - done - echo ${OTHER} port=222 slots=8 | tee -a /tmp/hostfile; -done - -cat /tmp/hostfile - -# Launch from head node -if [[ "${NODE_RANK}" -eq "0" ]]; then - - # World Level = 0x0, Rail Aligned = 0x7 - export NCCL_TESTS_SPLIT_MASK="0x0"; - - # Force use of libnccl-gib - export NCCL_NET=gIB - - # Set all the correct libnccl-gib environment variables - source /usr/local/gib/scripts/set_nccl_env.sh - - # Get all relevant NCCL / env vars to pass to all workers - ENV_VARS=$(echo ${!NCCL*} ${!OMPI*} LD_LIBRARY_PATH PATH | sed 's/ / -x /g') - - mpirun --hostfile /tmp/hostfile \ - -x $ENV_VARS \ - --allow-run-as-root \ - -mca plm_rsh_no_tree_spawn 1 \ - --mca orte_keep_fqdn_hostnames 1 \ - --mca btl self,tcp \ - --mca btl_tcp_if_include eth0 \ - --bind-to none \ - --mca plm_rsh_agent "ssh -q -o LogLevel=ERROR -o StrictHostKeyChecking=no -p 222" \ - /third_party/nccl-tests/build/all_gather_perf -b 1K -e 8G -f 2 -g 1 -w 5 --iters 100 -c 1 - -else - while ping -c 1 ${WORKERS_BASENAME}-0.${POSTFIX}; do - sleep 5 -done -fi - -exit 0 \ No newline at end of file diff --git a/src/xpk/commands/batch.py b/src/xpk/commands/batch.py index 2168b3478..203afcb96 100644 --- a/src/xpk/commands/batch.py +++ b/src/xpk/commands/batch.py @@ -22,12 +22,7 @@ from ..core.kueue import LOCAL_QUEUE_NAME from ..utils.console import xpk_exit, xpk_print from .common import set_cluster_command -from ..core.kjob import ( - AppProfileDefaults, - prepare_kjob, - Kueue_TAS_annotation, - get_gcsfuse_annotation, -) +from ..core.kjob import AppProfileDefaults, prepare_kjob, Kueue_TAS_annotation, add_annotation_to_job, get_gcsfuse_annotation from .kind import set_local_cluster_command import re @@ -58,14 +53,17 @@ def batch(args: Namespace) -> None: def submit_job(args: Namespace) -> None: + + create_xpk_k8s_service_account() + cmd = ( - 'kubectl kjob create slurm' - f' --profile {AppProfileDefaults.NAME.value}' - f' --localqueue {LOCAL_QUEUE_NAME}' - f' --pod-template-annotation {Kueue_TAS_annotation}' + 'kubectl kjob create slurm --profile' + f' {AppProfileDefaults.NAME.value} \\\n --localqueue' + f' {LOCAL_QUEUE_NAME} \\\n --pod-template-annotation' + f' {Kueue_TAS_annotation} ' ' --first-node-ip' ) - + cmd = add_annotation_to_job(args, cmd) gcsfuse_annotation = get_gcsfuse_annotation(args) if gcsfuse_annotation is not None: cmd += f' --pod-template-annotation {gcsfuse_annotation}' diff --git a/src/xpk/commands/run.py b/src/xpk/commands/run.py index a998e8cde..89cca4945 100644 --- a/src/xpk/commands/run.py +++ b/src/xpk/commands/run.py @@ -22,12 +22,7 @@ from ..core.kueue import LOCAL_QUEUE_NAME from ..utils.console import xpk_exit, xpk_print from .common import set_cluster_command -from ..core.kjob import ( - AppProfileDefaults, - prepare_kjob, - Kueue_TAS_annotation, - get_gcsfuse_annotation, -) +from ..core.kjob import JobTemplateDefaults, AppProfileDefaults, prepare_kjob, Kueue_TAS_annotation, add_annotation_to_job, get_gcsfuse_annotation from .kind import set_local_cluster_command @@ -58,13 +53,14 @@ def run(args: Namespace) -> None: def submit_job(args: Namespace) -> None: cmd = ( - 'kubectl kjob create slurm' - f' --profile {AppProfileDefaults.NAME.value}' - f' --localqueue {LOCAL_QUEUE_NAME}' - f' --pod-template-annotation {Kueue_TAS_annotation}' - ' --wait' - ' --rm' + 'kubectl kjob create slurm --profile' + f' {AppProfileDefaults.NAME.value} ' + f' --localqueue {LOCAL_QUEUE_NAME} ' + f" --pod-template-annotation '{Kueue_TAS_annotation}'" + f' --container-name {JobTemplateDefaults.CONTAINER_NAME.value}' + ' --wait --rm --first-node-ip' ) + cmd = add_annotation_to_job(args, cmd) gcsfuse_annotation = get_gcsfuse_annotation(args) if gcsfuse_annotation is not None: diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index c197aadbf..b7923e3e9 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -14,6 +14,7 @@ limitations under the License. """ +from ..core.network import get_subnetworks_for_a3mega, get_subnetworks_for_a3ultra from ..core.cluster import ( create_xpk_k8s_service_account, get_cluster_credentials, @@ -26,6 +27,7 @@ get_main_container_docker_image, get_user_workload_container, ) + from ..core.docker_resources import get_volumes from ..core.gcloud_context import add_zone_and_project from ..core.kueue import LOCAL_QUEUE_NAME @@ -617,13 +619,11 @@ def workload_create(args) -> None: ) if args.device_type == cluster_gcluster.a3mega_device_type: - sub_networks = [f'{args.cluster}-gpunet-{i}-subnet' for i in range(8)] + sub_networks = get_subnetworks_for_a3mega(args.cluster) yml_string = tcpxo_decorator.decorate_jobset(yml_string, sub_networks) if args.device_type == cluster_gcluster.a3ultra_device_type: - sub_networks = [f'{args.cluster}-sub-1'] + [ - f'{args.cluster}-rdma-sub-{i}' for i in range(8) - ] + sub_networks = get_subnetworks_for_a3ultra(args.cluster) yml_string = rdma_decorator.decorate_jobset(yml_string, sub_networks) if len(gcs_fuse_storages) + len(gcpfilestore_storages) > 0: diff --git a/src/xpk/core/kjob.py b/src/xpk/core/kjob.py index b17a6d907..c0958a42d 100644 --- a/src/xpk/core/kjob.py +++ b/src/xpk/core/kjob.py @@ -14,16 +14,21 @@ limitations under the License. """ -import os +from ..core.network import get_subnetworks_for_a3mega, get_subnetworks_for_a3ultra +from ..core.capacity import H100_MEGA_DEVICE_TYPE, H200_DEVICE_TYPE +from ..utils.yaml import literal_string from argparse import Namespace import yaml +from .workload_decorators.tcpxo_decorator import decorate_job_template_with_a3mega +from .workload_decorators.rdma_decorator import decorate_job_template_with_a3ultra +import os +from ..utils.console import xpk_print, xpk_exit from kubernetes import client as k8s_client from kubernetes.client import ApiClient from kubernetes.client.rest import ApiException from .cluster import setup_k8s_env, XPK_SA, DEFAULT_NAMESPACE from .storage import get_auto_mount_storages, get_auto_mount_gcsfuse_storages -from ..utils.console import xpk_print, xpk_exit from .commands import run_command_for_value, run_kubectl_apply, run_command_with_updates from .config import XpkConfig, KJOB_SHELL_IMAGE, KJOB_SHELL_INTERACTIVE_COMMAND, KJOB_SHELL_WORKING_DIRECTORY, KJOB_BATCH_IMAGE, KJOB_BATCH_WORKING_DIRECTORY from .resources import get_cluster_system_characteristics, SystemCharacteristics, AcceleratorType @@ -69,6 +74,7 @@ class PodTemplateDefaults(Enum): completionMode: Indexed template: spec: + dnsPolicy: ClusterFirstWithHostNet tolerations: - operator: "Exists" key: nvidia.com/gpu @@ -135,6 +141,60 @@ class PodTemplateDefaults(Enum): Kueue_TAS_annotation = "kueue.x-k8s.io/podset-preferred-topology=cloud.google.com/gce-topology-host" +def get_a3ultra_pod_template_annotations(args: Namespace) -> list[str]: + sub_networks = get_subnetworks_for_a3ultra(args.cluster) + interfaces = [ + "[\n", + ' {"interfaceName":"eth0","network":"default"},', + *[ + f' {{"interfaceName":"eth{i + 1}","network":"{sub_networks[i]}"}}{"," if i<8 else ""}' + for i in range(9) + ], + "]", + ] + interfaces_joined = interfaces[0] + "\n".join(interfaces[1:]) + interfaces = ( + f"networking.gke.io/interfaces=$'{literal_string(interfaces_joined)}'" + ) + return [ + "networking.gke.io/default-interface=eth0", + interfaces, + ] + + +def get_a3mega_pod_template_annotations(args: Namespace) -> list[str]: + """Adds or updates annotations in the Pod template.""" + sub_networks = get_subnetworks_for_a3mega(args.cluster) + interfaces = [ + "[\n", + ' {"interfaceName":"eth0","network":"default"},', + *[ + f' {{"interfaceName":"eth{i + 1}","network":"{sub_networks[i]}"}}{"," if i<7 else ""}' + for i in range(8) + ], + "]", + ] + joined = ( + "- path: /dev/nvidia0\n" + "- path: /dev/nvidia1\n" + "- path: /dev/nvidia2\n" + "- path: /dev/nvidia3\n" + "- path: /dev/nvidia4\n" + "- path: /dev/nvidia5\n" + "- path: /dev/nvidia6\n" + "- path: /dev/nvidia7\n" + "- path: /dev/nvidiactl\n" + "- path: /dev/nvidia-uvm\n" + "- path: /dev/dmabuf_import_helper" + ) + interfaces_joined = interfaces[0] + "\n".join(interfaces[1:]) + tcpxo = f"devices.gke.io/container.tcpxo-daemon=$'{joined}'" + interfaces = ( + f"networking.gke.io/interfaces=$'{literal_string(interfaces_joined)}'" + ) + return tcpxo, interfaces + + def verify_kjob_installed(args: Namespace) -> int: """Check if kjob is installed. If not provide user with proper communicate and exit. Args: @@ -176,7 +236,9 @@ def get_pod_template_interactive_command() -> str: return pod_command -def create_app_profile_instance(args: Namespace, volume_bundles: [str]) -> int: +def create_app_profile_instance( + args: Namespace, volume_bundles: list[str] +) -> int: """Create new AppProfile instance on cluster with default settings. Args: @@ -196,6 +258,17 @@ def create_app_profile_instance(args: Namespace, volume_bundles: [str]) -> int: ) +def decorate_job_template_with_gpu(yml_string: str, gpu_type: str) -> str: + job_spec = yaml.safe_load(yml_string)["template"] + if gpu_type == H100_MEGA_DEVICE_TYPE: + job_spec = decorate_job_template_with_a3mega(job_spec) + if gpu_type == H200_DEVICE_TYPE: + job_spec = decorate_job_template_with_a3ultra(job_spec) + job_template_dict = yaml.safe_load(yml_string) + job_template_dict["template"] = job_spec + return yaml.dump(job_template_dict, sort_keys=False) + + def create_job_template_instance( args: Namespace, system: SystemCharacteristics | None, @@ -215,7 +288,6 @@ def create_job_template_instance( working_directory = config.get(KJOB_BATCH_WORKING_DIRECTORY) if working_directory is None or len(working_directory) == 0: working_directory = JobTemplateDefaults.WORKING_DIRECTORY.value - resources = ( job_resources_template.format(gpu_per_node=system.chips_per_vm) if system is not None @@ -229,19 +301,22 @@ def create_job_template_instance( and system.accelerator_type == AcceleratorType["GPU"] else "" ) + yml_string = job_template_yaml.format( + name=JobTemplateDefaults.NAME.value, + parallelism=JobTemplateDefaults.PARALLELISM.value, + completions=JobTemplateDefaults.COMPLETIONS.value, + container_name=JobTemplateDefaults.CONTAINER_NAME.value, + image=job_image, + working_directory=working_directory, + resources=resources, + node_selector=node_selector, + service_account=service_account, + ) + if system is not None and system.accelerator_type == AcceleratorType["GPU"]: + yml_string = decorate_job_template_with_gpu(yml_string, system.device_type) return run_kubectl_apply( - yml_string=job_template_yaml.format( - name=JobTemplateDefaults.NAME.value, - parallelism=JobTemplateDefaults.PARALLELISM.value, - completions=JobTemplateDefaults.COMPLETIONS.value, - container_name=JobTemplateDefaults.CONTAINER_NAME.value, - image=job_image, - working_directory=working_directory, - resources=resources, - node_selector=node_selector, - service_account=service_account, - ), + yml_string, task="Creating JobTemplate", args=args, ) @@ -278,8 +353,6 @@ def create_pod_template_instance(args: Namespace, service_account: str) -> int: def prepare_kjob(args: Namespace) -> int: - xpk_print("Preparing kjob") - system = get_cluster_system_characteristics(args) k8s_api_client = setup_k8s_env(args) @@ -394,3 +467,36 @@ def get_gcsfuse_annotation(args: Namespace) -> str | None: if len(gcsfuse_storages) > 0: return "gke-gcsfuse/volumes=true" return None + + +def add_h100_mega_annotations(args, cmd: str) -> str: + tcpxo, interfaces = get_a3mega_pod_template_annotations(args) + cmd += f" --pod-template-annotation {tcpxo} \\\n" + cmd += ( + ' --pod-template-annotation networking.gke.io/default-interface="eth0"' + " \\\n" + ) + cmd += f" --pod-template-annotation {interfaces} " + return cmd + + +def add_h200_ultra_annotations(args, cmd) -> str: + eth0, interfaces = get_a3ultra_pod_template_annotations(args) + cmd += f" --pod-template-annotation {eth0} \\\n" + cmd += f" --pod-template-annotation {interfaces} \\\n" + return cmd + + +def get_gpu_type_from_cluster(args) -> str: + system = get_cluster_system_characteristics(args) + return system.device_type + + +def add_annotation_to_job(args, cmd: str) -> str: + gpu_type = get_gpu_type_from_cluster(args) + + if gpu_type == H100_MEGA_DEVICE_TYPE: + return add_h100_mega_annotations(args, cmd) + if gpu_type == H200_DEVICE_TYPE: + return add_h200_ultra_annotations(args, cmd) + return cmd diff --git a/src/xpk/core/network.py b/src/xpk/core/network.py index 1d69d01f8..82be5f715 100644 --- a/src/xpk/core/network.py +++ b/src/xpk/core/network.py @@ -175,6 +175,16 @@ def create_cluster_subnet(args, index) -> int: return 0 +def get_subnetworks_for_a3mega(cluster_name: str) -> list[str]: + return [f'{cluster_name}-gpunet-{i}-subnet' for i in range(8)] + + +def get_subnetworks_for_a3ultra(cluster_name: str) -> list[str]: + return [f'{cluster_name}-sub-1'] + [ + f'{cluster_name}-rdma-sub-{i}' for i in range(8) + ] + + def create_cluster_firewall_rule(args, index) -> int: """Create one GKE Cluster firewall rule. diff --git a/src/xpk/core/workload_decorators/rdma_decorator.py b/src/xpk/core/workload_decorators/rdma_decorator.py index 290d3add5..8cb1792d0 100644 --- a/src/xpk/core/workload_decorators/rdma_decorator.py +++ b/src/xpk/core/workload_decorators/rdma_decorator.py @@ -18,6 +18,21 @@ from ...utils.yaml import literal_string +def decorate_job_template_with_a3ultra(job_manifest) -> str: + spec = ( + job_manifest.setdefault('spec', {}) + .setdefault('template', {}) + .setdefault('spec', {}) + ) + spec.setdefault('tolerations', []) + spec.setdefault('volumes', []) + + add_volumes(job_manifest) + add_tolerations(job_manifest) + update_gpu_containers(job_manifest) + return job_manifest + + def decorate_jobset(jobset_manifest_str, sub_networks) -> str: """ Decorates a JobSet manifest with the necessary components for rdma-daemon. @@ -65,7 +80,7 @@ def add_annotations(job_manifest, sub_networks): ']', ] annotations.update({ - 'networking.gke.io/default-interface': 'eth0', + 'networking.gke.io/default-interface': "'eth0'", 'networking.gke.io/interfaces': literal_string('\n'.join(interfaces)), }) diff --git a/src/xpk/core/workload_decorators/tcpxo_decorator.py b/src/xpk/core/workload_decorators/tcpxo_decorator.py index 0e34d33d8..827756eb8 100644 --- a/src/xpk/core/workload_decorators/tcpxo_decorator.py +++ b/src/xpk/core/workload_decorators/tcpxo_decorator.py @@ -21,6 +21,42 @@ rxdm = 'v1.0.12' +def decorate_job_template_with_a3mega(job_manifest: dict) -> dict: + spec = ( + job_manifest.setdefault('spec', {}) + .setdefault('template', {}) + .setdefault('spec', {}) + ) + spec.setdefault('tolerations', []) + spec.setdefault('volumes', []) + + add_volumes(job_manifest) + add_tolerations(job_manifest) + add_tcpxo_daemon_container(job_manifest) + update_gpu_containers(job_manifest) + return job_manifest + + +def decorate_job(job_manifest: dict, sub_networks: list[str]) -> dict: + job_manifest.setdefault('spec', {}).setdefault('template', {}).setdefault( + 'metadata', {} + ).setdefault('annotations', {}) + spec = ( + job_manifest.setdefault('spec', {}) + .setdefault('template', {}) + .setdefault('spec', {}) + ) + spec.setdefault('tolerations', []) + spec.setdefault('volumes', []) + + add_annotations(job_manifest, sub_networks) + add_volumes(job_manifest) + add_tolerations(job_manifest) + add_tcpxo_daemon_container(job_manifest) + update_gpu_containers(job_manifest) + return job_manifest + + def decorate_jobset(jobset_manifest_str, sub_networks) -> str: """ Decorates a JobSet manifest with the necessary components for tcpxo-daemon. @@ -36,23 +72,7 @@ def decorate_jobset(jobset_manifest_str, sub_networks) -> str: for job in manifest['spec']['replicatedJobs']: job_manifest = job['template'] - job_manifest.setdefault('spec', {}).setdefault('template', {}).setdefault( - 'metadata', {} - ).setdefault('annotations', {}) - spec = ( - job_manifest.setdefault('spec', {}) - .setdefault('template', {}) - .setdefault('spec', {}) - ) - spec.setdefault('tolerations', []) - spec.setdefault('volumes', []) - - add_annotations(job_manifest, sub_networks) - add_volumes(job_manifest) - add_tolerations(job_manifest) - add_tcpxo_daemon_container(job_manifest) - update_gpu_containers(job_manifest) - + job_manifest = decorate_job(job_manifest, sub_networks) return yaml.dump(manifest, sort_keys=False) @@ -135,8 +155,8 @@ def add_tcpxo_daemon_container(job_manifest): ], 'env': [{'name': 'LD_LIBRARY_PATH', 'value': '/usr/local/nvidia/lib64'}], } - job_manifest['spec']['template']['spec']['containers'].insert( - 0, tcpxo_daemon_container + job_manifest['spec']['template']['spec']['containers'].append( + tcpxo_daemon_container ) diff --git a/tools/Dockerfile-kjob b/tools/Dockerfile-kjob index 3d4eb3609..9ebd2aa58 100644 --- a/tools/Dockerfile-kjob +++ b/tools/Dockerfile-kjob @@ -42,7 +42,7 @@ ENV PATH $PATH:/usr/local/go/bin:$GOPATH/bin # Clone the kjob repository ARG KJOB_BRANCH=main -RUN git clone --branch ${KJOB_BRANCH} https://github.com/kubernetes-sigs/kjob.git /kjob +RUN git clone --branch ${KJOB_BRANCH} https://github.com/pawloch00/kjob.git /kjob # Build the gcluster binary WORKDIR /kjob