diff --git a/deepnvme/README.md b/deepnvme/README.md new file mode 100644 index 000000000..42a19aaa8 --- /dev/null +++ b/deepnvme/README.md @@ -0,0 +1,7 @@ +This folder contains performance micro-benchmarks of using DeepNVMe for various I/O data transfer scenarios. `GDS` mode transfers is supported on compatible platforms + +ds_io: Sweep scripts for the `ds_io` I/O performance utility. `ds_io` is a `fio`-like utility for measuring read and write performance of raw I/O transfers involving CPU or GPU buffers. + +file_access: Scripts for measuring file acesses involving CPU or GPU buffers. + +zero_inference: Weight-offloading for LLM inference. diff --git a/deepnvme/ds_io/ds_io_read_sweep.sh b/deepnvme/ds_io/ds_io_read_sweep.sh new file mode 100755 index 000000000..bd4b8de19 --- /dev/null +++ b/deepnvme/ds_io/ds_io_read_sweep.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [[ $# -lt 3 ]]; then + echo "Usage: $0 " + exit 1 +fi + +XFER=$1 +NVME_DIR=$2 +LOG_DIR=$3 + +./ds_io_sweep.sh "read" ${XFER} ${NVME_DIR} ${LOG_DIR} diff --git a/deepnvme/ds_io/ds_io_sweep.sh b/deepnvme/ds_io/ds_io_sweep.sh new file mode 100755 index 000000000..6a25f3d93 --- /dev/null +++ b/deepnvme/ds_io/ds_io_sweep.sh @@ -0,0 +1,91 @@ +#!/bin/bash +# set -x +if [[ $# -lt 4 ]]; then + echo "Usage: $0 " + exit 1 +fi + +IO_OP=$1 +XFER=$2 +NVME_DIR=$3 +LOG_DIR=$4 + + +if [[ ${IO_OP} == "read" ]]; then + io_op_opt="--read" +elif [[ ${IO_OP} == "write" ]]; then + io_op_opt="" +else + echo "Error: ${IO_OP} is an invalid op. Valid ops are [read, write]" + exit 1 +fi + +if [[ ${XFER} == "cpu" ]]; then + xfer_opt="" +elif [[ ${XFER} == "gpu" ]]; then + xfer_opt="--gpu --use_accelerator_pin_memory" +elif [[ ${XFER} == "gds" ]]; then + xfer_opt="--gpu --use_gds" +else + echo "Error: ${XFER} is an invalid op. Valid xfers are [cpu, gpu, gds]" + exit 1 +fi + +NUM_DRIVES=`ls -d ${NVME_DIR}* | wc -l` +if [[ $NUM_DRIVES -lt 1 ]]; then + echo "Error: Found less than 1 folder in ${NVME_DIR}" + exit 1 +fi + + + +mkdir -p ${LOG_DIR} +IO_SIZE=1G + +for numjobs in 1 4 8; do + if ((numjobs < NUM_DRIVES)); then + continue + fi + FTD_OPT="--folder_to_device_mapping " + drive_num=0 + jobs_per_drive=$((numjobs/NUM_DRIVES)) + if ((jobs_per_drive == 0 )); then + jobs_per_drive=1 + fi + for (( i=0; i<${numjobs}; i++ )); do + FTD_OPT="${FTD_OPT} ${NVME_DIR}${drive_num}:${i}" + if (( (i+1) % jobs_per_drive == 0)); then + drive_num=$((drive_num+1)) + fi + done + # echo ${FTD_OPT} + COMMON_OPTS="--io_size ${IO_SIZE} ${io_op_opt} ${xfer_opt} ${FTD_OPT}" + for ov in overlap sequential; do + if [[ ${ov} == "sequential" ]]; then + ov_opt="--sequential_requests" + else + ov_opt="" + fi + for sub in single block; do + if [[ ${sub} == "single" ]]; then + sub_opt="--single_submit" + else + sub_opt="" + fi + for io_para in 1 2 4 8; do + io_para_opt="--io_parallel ${io_para}" + for bs in 1M 2M; do + bs_opt="--block_size ${bs}" + for qd in 128; do + qd_opt="--queue_depth ${qd}" + RUN_OPTS="${ov_opt} ${sub_opt} ${io_para_opt} ${bs_opt} ${qd_opt}" + LOG="${LOG_DIR}/$IO_OPT_${sub}_${ov}_t${io_para}_p${numjobs}_d${qd}_bs${bs}.txt" + cmd="ds_io ${COMMON_OPTS} ${RUN_OPTS} &> ${LOG}" + echo ${cmd} + eval ${cmd} + done + done + done + done + done +done \ No newline at end of file diff --git a/deepnvme/ds_io/ds_io_write_sweep.sh b/deepnvme/ds_io/ds_io_write_sweep.sh new file mode 100755 index 000000000..75ebd56b9 --- /dev/null +++ b/deepnvme/ds_io/ds_io_write_sweep.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +if [[ $# -lt 3 ]]; then + echo "Usage: $0 " + exit 1 +fi + +XFER=$1 +NVME_DIR=$2 +LOG_DIR=$3 + +./ds_io_sweep.sh "write" ${XFER} ${NVME_DIR} ${LOG_DIR} +exit diff --git a/deepnvme/model_checkpoint/save_model_utils.py b/deepnvme/model_checkpoint/save_model_utils.py index be5c4d5bc..e7326821a 100644 --- a/deepnvme/model_checkpoint/save_model_utils.py +++ b/deepnvme/model_checkpoint/save_model_utils.py @@ -1,6 +1,6 @@ import argparse import os -from transformers import AutoModelForCausalLM +from transformers import AutoModelForCausalLM, AutoTokenizer from transformers import T5ForConditionalGeneration from torch_save_utils import PINNED_BUFFER_MB @@ -23,10 +23,13 @@ def _get_hf_model(tag): model_name = HF_MODELS_DICT[tag] if tag == TINY_T5: model = T5ForConditionalGeneration.from_pretrained(model_name) + else: model = AutoModelForCausalLM.from_pretrained(model_name) + tokenizer = AutoTokenizer.from_pretrained(model_name) + - return model, model_name, tag + return model, tokenizer, model_name, tag def get_model(model_tag): return _get_hf_model(model_tag) @@ -108,6 +111,13 @@ def parse_arguments(): action='store_true', help='Disable double buffering of i/o buffer.') + parser.add_argument('--safetensors', + action='store_true', + help='Use safetensors load/save.') + + parser.add_argument('--regular_torch_save', + action='store_true', + help='Use vanilla torch.save.') #parser.add_argument('--single_writer', action='store_true', help='Disable parallel rank writes of data parallel (replicated) state') diff --git a/deepnvme/model_checkpoint/torch_save_load_model.py b/deepnvme/model_checkpoint/torch_save_load_model.py new file mode 100644 index 000000000..ee637f897 --- /dev/null +++ b/deepnvme/model_checkpoint/torch_save_load_model.py @@ -0,0 +1,73 @@ +# Credit https://github.com/sayakpaul +from save_model_utils import get_model, validate_arguments, parse_arguments +from torch_save_utils import load_io_ops, _test_ds_fast_save, test_save +import safetensors.torch +import os +import time +import torch + +def test_sft_save(file, buffer, args): + st = time.time() + safetensors.torch.save_file(filename=file, tensors=buffer) + return time.time() - st + +def main(): + print( + f'Performance test of torch.save() integration of fast model checkpointing.' + ) + print(f'torch version = {torch.__version__}') + torch.manual_seed(42) + + args = parse_arguments() + if not validate_arguments(args): + quit() + load_io_ops(args) + model, tokenizer, model_name, ckpt_name = get_model(args.model) + + inputs = tokenizer("I am good", return_tensors="pt").to("cuda") + + if args.half: + model = model.half() + if args.gpu: + model = model.to("cuda") + + with torch.no_grad(): + model.eval() + pre_logits = model(**inputs).logits + + if not args.safetensors: + file = os.path.join(args.folder, f'{ckpt_name}.pt') + else: + file = os.path.join(args.folder, f'{ckpt_name}.safetensors') + if os.path.exists(file): + os.remove(file) + if not args.regular_torch_save and not args.safetensors: + write_sec = _test_ds_fast_save(file, model.state_dict(), args, False) + elif args.regular_torch_save: + write_sec = test_save(file, model.state_dict(), args) + else: + write_sec = test_sft_save(file, model.state_dict(), args) + ckpt_size = os.path.getsize(file) + gb_size = ckpt_size / (1024**3) + gb_per_sec = gb_size / write_sec + print( + f'{gb_size:5.2f} GB, {write_sec:5.2f} secs, {gb_per_sec:5.2f} GB/s' + ) + st = time.time() + if args.safetensors: + loaded_sd = safetensors.torch.load_file(file, device="cuda") + else: + loaded_sd = torch.load(file, weights_only=True, map_location="cuda") + load_sec = time.time() - st + print(f"Loaded in {load_sec:5.2f} seconds.") + model.load_state_dict(loaded_sd) + with torch.no_grad(): + model.eval() + post_logits = model(**inputs).logits + + assert torch.allclose(pre_logits, post_logits, atol=1e-3, rtol=1e-3) + os.remove(file) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/deepnvme/model_checkpoint/torch_save_model.py b/deepnvme/model_checkpoint/torch_save_model.py index 9ac855ca6..d46b91a0e 100644 --- a/deepnvme/model_checkpoint/torch_save_model.py +++ b/deepnvme/model_checkpoint/torch_save_model.py @@ -57,7 +57,7 @@ def main(): if not validate_arguments(args): quit() load_io_ops(args) - model, model_name, ckpt_name = get_model(args.model) + model, tokenizer, model_name, ckpt_name = get_model(args.model) if args.half: model = model.half() if args.gpu: