Skip to content

Add example of running nccl tests in multihost slurm mode #417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4b929e9
kjob storage configuration
mbobrovskyi Feb 17, 2025
cd5924f
Wait for run-filestore.
mbobrovskyi Mar 3, 2025
933c198
Merge branch 'develop' into mbobrovskyi/kjob-storage-configuration
pawloch00 Mar 4, 2025
776b9d8
draft
pawloch00 Mar 5, 2025
80daa8e
decorator
pawloch00 Mar 5, 2025
39386f5
:fix
pawloch00 Mar 5, 2025
92ff4ff
Merge branch 'develop' into mbobrovskyi/kjob-storage-configuration
mbobrovskyi Mar 5, 2025
fa687cb
Merge branch 'develop' of https://github.com/AI-Hypercomputer/xpk int…
pawloch00 Mar 5, 2025
3c5ea31
Merge branch 'mbobrovskyi/kjob-storage-configuration' of https://gith…
pawloch00 Mar 5, 2025
e8ffd62
Merge branch 'develop' of https://github.com/AI-Hypercomputer/xpk int…
pawloch00 Mar 5, 2025
890cef6
fixes
pawloch00 Mar 5, 2025
8231917
fix multihost
pawloch00 Mar 6, 2025
6be77b7
add basic exmaple
pawloch00 Mar 7, 2025
d18390e
message.py
pawloch00 Mar 10, 2025
8b0bede
print env
pawloch00 Mar 10, 2025
7d2ec93
debug
pawloch00 Mar 11, 2025
5e9b9c8
Merge branch 'develop' of https://github.com/AI-Hypercomputer/xpk int…
pawloch00 Mar 11, 2025
a63d630
fixes
pawloch00 Mar 12, 2025
6280d89
changes for running on a3 ultra
pawloch00 Mar 12, 2025
a44a830
fixes for running multihost
pawloch00 Mar 13, 2025
d22f003
fix
pawloch00 Mar 13, 2025
a612935
fix
pawloch00 Mar 13, 2025
c253794
fix
pawloch00 Mar 13, 2025
5c896b8
rm example changes
pawloch00 Mar 13, 2025
c37297f
newline fix
pawloch00 Mar 13, 2025
8b2b181
fix whitespace and remove debug prints
pawloch00 Mar 13, 2025
79e32b4
fix whitespace
pawloch00 Mar 13, 2025
2efd45c
fix linting
pawloch00 Mar 13, 2025
7b3ab87
add example for running nccl tests from xpk run
pawloch00 Mar 13, 2025
bccff0c
rm nccl directory with obsolete tests
pawloch00 Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions examples/llama-3.1-finetuning-multi-host/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling
import torch

from torch.distributed.elastic.multiprocessing.errors import record
# 1. Data Loading and Preprocessing
import os

for name, value in os.environ.items():
print(f"{name}: {value}")

dataset = load_dataset("json", data_files="training_data.jsonl", split="train")


Expand Down Expand Up @@ -75,7 +80,7 @@ def tokenize_function(example):
learning_rate=2e-5,
num_train_epochs=3,
logging_dir="./logs",
logging_steps=10,
logging_steps=1,
save_steps=100,
eval_strategy="no",
fp16=False,
Expand All @@ -94,6 +99,13 @@ def tokenize_function(example):
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)

trainer.train()
trainer.save_model("./llama3-finetuned-final")
tokenizer.save_pretrained("./llama3-finetuned-final")

@record
def train_func():
trainer.train()
trainer.save_model("./llama3-finetuned-final")
tokenizer.save_pretrained("./llama3-finetuned-final")


if __name__ == "__main__":
train_func()
55 changes: 55 additions & 0 deletions examples/nccl-batch/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Running NCCL tests on GPU clusters using xpk slurm mode.

This document provides an introduction to running tests for the NVIDIA Collective Communications Library (NCCL). NCCL is a high-performance, multi-GPU communications library used in deep learning and other applications. The test suite helps verify the correct functionality and performance of NCCL on your system. Please visit [NCCL tests github](https://github.com/NVIDIA/nccl-tests?tab=readme-ov-file#nccl-tests) to learn more about NCCL and running it.

Steps presented in this document are designed to run on A3 Ultra and A3 Mega machines (`DEVICE_TYPE=h200-141gb-8` or `DEVICE_TYPE=h100-mega-80gb-8`).

### 1. Create cluster

Skip this step if you have already provisioned a GKE cluster with A3 Ultra or A3 Mega machines.

First step is to create a cluster with A3 Ultra or A3 Mega machine. Execute command below:

```
python3 xpk.py cluster create \
--cluster=$CLUSTER_NAME --device-type=$DEVICE_TYPE \
--zone=$COMPUTE_ZONE --project=$PROJECT_ID \
--num-nodes=$CLUSTER_NUM_NODES --reservation=$RESERVATION_ID
```

### 2. Run NCCL workload

The command to run NCCL tests on A3 clusters depends on the type of machine.


#### A3 Mega

It is neccessary to set batch image to run nccl tests in slurm mode.

```bash
python3 xpk.py config set batch-working-directory /home/nccl
```

Docker image provided by google will be used.
```bash
python3 xpk.py config set batch-image us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpxo/nccl-plugin-gpudirecttcpx-dev:v1.0.8-1
```

Command `xpk run` will be used to start running nccl tests.

```bash
python3 xpk.py run \
--cluster $CLUSTER_NAME --zone $COMPUTE_ZONE \
--project $PROJECT_ID \
--nodes $NUM_NODES \
--gpus-per-task nvidia.com/gpu:8 \
examples/nccl-batch/a3mega.slurm
```

### Troubleshooting

If you are getting a 403 Forbidden Error when creating docker image, make sure to add `us-docker.pkg.dev` to the list of gcloud credential helpers using this command:

```bash
gcloud auth configure-docker us-docker.pkg.dev
```
46 changes: 46 additions & 0 deletions examples/nccl-batch/a3mega.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash
#SBATCH --job-name=nccl-tests
#SBATCH --cpus-per-task=8
#SBATCH --nodes=4
#SBATCH --ntasks=1

set -x
apt update -y
apt install -y iputils-ping
/scripts/container_entry.sh daemon &

export POSTFIX=$(hostname --fqdn | cut -d . -f 2-)
export WORKERS_BASENAME=$(hostname --fqdn | cut -d . -f 1 | rev | cut -d - -f 2- | rev )
export NODE_RANK=$SLURM_JOB_ID-1

for i in `seq 0 $(($SLURM_NNODES-1))`; do
OTHER=${WORKERS_BASENAME}-${i}.${POSTFIX}
until ssh -p 222 -o StrictHostKeyChecking=no $OTHER hostname; do
echo "Waiting for ${OTHER}..."
sleep 10
done
echo ${OTHER} port=222 slots=8 | tee -a /tmp/hostfile;
done

cat /tmp/hostfile
echo "nodepool size"
echo $(( ${SLURM_GPUS} * "${SLURM_NNODES}" ))
if [[ "${NODE_RANK}" -eq "0" ]]; then
export NCCL_TESTS_SPLIT_MASK="0x0";
export NCCL_LIB_DIR=$LD_LIBRARY_PATH
ENV_VARS=$(echo ${!NCCL*} ${!OMPI*} LD_LIBRARY_PATH PATH | sed 's/ / -x /g')
mpirun --hostfile /tmp/hostfile \
-x $ENV_VARS \
--allow-run-as-root \
-np $(( ${SLURM_GPUS} * "${SLURM_NNODES}" )) \
--mca orte_keep_fqdn_hostnames 1 \
--mca btl tcp,self \
--mca btl_tcp_if_include eth0 \
--mca plm_rsh_agent "ssh -q -o LogLevel=ERROR -o StrictHostKeyChecking=no -p 222" \
taskset -c 32-63 /scripts/demo_mpi_entry_with_config_profile.sh all_gather_perf \
-b 1K -e 8G -f 2 -g 1 -w 5 --iters 100
else
while ping -c 1 ${WORKERS_BASENAME}-0.${POSTFIX}; do
sleep 5
done
fi
29 changes: 0 additions & 29 deletions examples/nccl-workload/nccl.md

This file was deleted.

84 changes: 0 additions & 84 deletions examples/nccl-workload/nccl.sh

This file was deleted.

20 changes: 9 additions & 11 deletions src/xpk/commands/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@
from ..core.kueue import LOCAL_QUEUE_NAME
from ..utils.console import xpk_exit, xpk_print
from .common import set_cluster_command
from ..core.kjob import (
AppProfileDefaults,
prepare_kjob,
Kueue_TAS_annotation,
get_gcsfuse_annotation,
)
from ..core.kjob import AppProfileDefaults, prepare_kjob, Kueue_TAS_annotation, add_annotation_to_job, get_gcsfuse_annotation
from .kind import set_local_cluster_command
import re

Expand Down Expand Up @@ -58,14 +53,17 @@ def batch(args: Namespace) -> None:


def submit_job(args: Namespace) -> None:

create_xpk_k8s_service_account()

cmd = (
'kubectl kjob create slurm'
f' --profile {AppProfileDefaults.NAME.value}'
f' --localqueue {LOCAL_QUEUE_NAME}'
f' --pod-template-annotation {Kueue_TAS_annotation}'
'kubectl kjob create slurm --profile'
f' {AppProfileDefaults.NAME.value} \\\n --localqueue'
f' {LOCAL_QUEUE_NAME} \\\n --pod-template-annotation'
f' {Kueue_TAS_annotation} '
' --first-node-ip'
)

cmd = add_annotation_to_job(args, cmd)
gcsfuse_annotation = get_gcsfuse_annotation(args)
if gcsfuse_annotation is not None:
cmd += f' --pod-template-annotation {gcsfuse_annotation}'
Expand Down
20 changes: 8 additions & 12 deletions src/xpk/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@
from ..core.kueue import LOCAL_QUEUE_NAME
from ..utils.console import xpk_exit, xpk_print
from .common import set_cluster_command
from ..core.kjob import (
AppProfileDefaults,
prepare_kjob,
Kueue_TAS_annotation,
get_gcsfuse_annotation,
)
from ..core.kjob import JobTemplateDefaults, AppProfileDefaults, prepare_kjob, Kueue_TAS_annotation, add_annotation_to_job, get_gcsfuse_annotation
from .kind import set_local_cluster_command


Expand Down Expand Up @@ -58,13 +53,14 @@ def run(args: Namespace) -> None:

def submit_job(args: Namespace) -> None:
cmd = (
'kubectl kjob create slurm'
f' --profile {AppProfileDefaults.NAME.value}'
f' --localqueue {LOCAL_QUEUE_NAME}'
f' --pod-template-annotation {Kueue_TAS_annotation}'
' --wait'
' --rm'
'kubectl kjob create slurm --profile'
f' {AppProfileDefaults.NAME.value} '
f' --localqueue {LOCAL_QUEUE_NAME} '
f" --pod-template-annotation '{Kueue_TAS_annotation}'"
f' --container-name {JobTemplateDefaults.CONTAINER_NAME.value}'
' --wait --rm --first-node-ip'
)
cmd = add_annotation_to_job(args, cmd)

gcsfuse_annotation = get_gcsfuse_annotation(args)
if gcsfuse_annotation is not None:
Expand Down
8 changes: 4 additions & 4 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
limitations under the License.
"""

from ..core.network import get_subnetworks_for_a3mega, get_subnetworks_for_a3ultra
from ..core.cluster import (
create_xpk_k8s_service_account,
get_cluster_credentials,
Expand All @@ -26,6 +27,7 @@
get_main_container_docker_image,
get_user_workload_container,
)

from ..core.docker_resources import get_volumes
from ..core.gcloud_context import add_zone_and_project
from ..core.kueue import LOCAL_QUEUE_NAME
Expand Down Expand Up @@ -617,13 +619,11 @@ def workload_create(args) -> None:
)

if args.device_type == cluster_gcluster.a3mega_device_type:
sub_networks = [f'{args.cluster}-gpunet-{i}-subnet' for i in range(8)]
sub_networks = get_subnetworks_for_a3mega(args.cluster)
yml_string = tcpxo_decorator.decorate_jobset(yml_string, sub_networks)

if args.device_type == cluster_gcluster.a3ultra_device_type:
sub_networks = [f'{args.cluster}-sub-1'] + [
f'{args.cluster}-rdma-sub-{i}' for i in range(8)
]
sub_networks = get_subnetworks_for_a3ultra(args.cluster)
yml_string = rdma_decorator.decorate_jobset(yml_string, sub_networks)

if len(gcs_fuse_storages) + len(gcpfilestore_storages) > 0:
Expand Down
Loading
Loading