Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deepnvme/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
This folder contains performance micro-benchmarks of using DeepNVMe for various I/O data transfer scenarios. `GDS` mode transfers is supported on compatible platforms

ds_io: Sweep scripts for the `ds_io` I/O performance utility. `ds_io` is a `fio`-like utility for measuring read and write performance of raw I/O transfers involving CPU or GPU buffers.

file_access: Scripts for measuring file acesses involving CPU or GPU buffers.

zero_inference: Weight-offloading for LLM inference.
12 changes: 12 additions & 0 deletions deepnvme/ds_io/ds_io_read_sweep.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

if [[ $# -lt 3 ]]; then
echo "Usage: $0 <xfer [cpu|gpu|gds]> <nvme mount> <output log dir>"
exit 1
fi

XFER=$1
NVME_DIR=$2
LOG_DIR=$3

./ds_io_sweep.sh "read" ${XFER} ${NVME_DIR} ${LOG_DIR}
91 changes: 91 additions & 0 deletions deepnvme/ds_io/ds_io_sweep.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/bin/bash
# set -x
if [[ $# -lt 4 ]]; then
echo "Usage: $0 <op [read|write]> <xfer [gpu|cpu|gds]> <nvme mount> <output log dir>"
exit 1
fi

IO_OP=$1
XFER=$2
NVME_DIR=$3
LOG_DIR=$4


if [[ ${IO_OP} == "read" ]]; then
io_op_opt="--read"
elif [[ ${IO_OP} == "write" ]]; then
io_op_opt=""
else
echo "Error: ${IO_OP} is an invalid op. Valid ops are [read, write]"
exit 1
fi

if [[ ${XFER} == "cpu" ]]; then
xfer_opt=""
elif [[ ${XFER} == "gpu" ]]; then
xfer_opt="--gpu --use_accelerator_pin_memory"
elif [[ ${XFER} == "gds" ]]; then
xfer_opt="--gpu --use_gds"
else
echo "Error: ${XFER} is an invalid op. Valid xfers are [cpu, gpu, gds]"
exit 1
fi

NUM_DRIVES=`ls -d ${NVME_DIR}* | wc -l`
if [[ $NUM_DRIVES -lt 1 ]]; then
echo "Error: Found less than 1 folder in ${NVME_DIR}"
exit 1
fi



mkdir -p ${LOG_DIR}
IO_SIZE=1G

for numjobs in 1 4 8; do
if ((numjobs < NUM_DRIVES)); then
continue
fi
FTD_OPT="--folder_to_device_mapping "
drive_num=0
jobs_per_drive=$((numjobs/NUM_DRIVES))
if ((jobs_per_drive == 0 )); then
jobs_per_drive=1
fi
for (( i=0; i<${numjobs}; i++ )); do
FTD_OPT="${FTD_OPT} ${NVME_DIR}${drive_num}:${i}"
if (( (i+1) % jobs_per_drive == 0)); then
drive_num=$((drive_num+1))
fi
done
# echo ${FTD_OPT}
COMMON_OPTS="--io_size ${IO_SIZE} ${io_op_opt} ${xfer_opt} ${FTD_OPT}"
for ov in overlap sequential; do
if [[ ${ov} == "sequential" ]]; then
ov_opt="--sequential_requests"
else
ov_opt=""
fi
for sub in single block; do
if [[ ${sub} == "single" ]]; then
sub_opt="--single_submit"
else
sub_opt=""
fi
for io_para in 1 2 4 8; do
io_para_opt="--io_parallel ${io_para}"
for bs in 1M 2M; do
bs_opt="--block_size ${bs}"
for qd in 128; do
qd_opt="--queue_depth ${qd}"
RUN_OPTS="${ov_opt} ${sub_opt} ${io_para_opt} ${bs_opt} ${qd_opt}"
LOG="${LOG_DIR}/$IO_OPT_${sub}_${ov}_t${io_para}_p${numjobs}_d${qd}_bs${bs}.txt"
cmd="ds_io ${COMMON_OPTS} ${RUN_OPTS} &> ${LOG}"
echo ${cmd}
eval ${cmd}
done
done
done
done
done
done
13 changes: 13 additions & 0 deletions deepnvme/ds_io/ds_io_write_sweep.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

if [[ $# -lt 3 ]]; then
echo "Usage: $0 <xfer [cpu|gpu|gds]> <nvme mount> <output log dir>"
exit 1
fi

XFER=$1
NVME_DIR=$2
LOG_DIR=$3

./ds_io_sweep.sh "write" ${XFER} ${NVME_DIR} ${LOG_DIR}
exit
14 changes: 12 additions & 2 deletions deepnvme/model_checkpoint/save_model_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import os
from transformers import AutoModelForCausalLM
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import T5ForConditionalGeneration
from torch_save_utils import PINNED_BUFFER_MB

Expand All @@ -23,10 +23,13 @@ def _get_hf_model(tag):
model_name = HF_MODELS_DICT[tag]
if tag == TINY_T5:
model = T5ForConditionalGeneration.from_pretrained(model_name)

else:
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)


return model, model_name, tag
return model, tokenizer, model_name, tag

def get_model(model_tag):
return _get_hf_model(model_tag)
Expand Down Expand Up @@ -108,6 +111,13 @@ def parse_arguments():
action='store_true',
help='Disable double buffering of i/o buffer.')

parser.add_argument('--safetensors',
action='store_true',
help='Use safetensors load/save.')

parser.add_argument('--regular_torch_save',
action='store_true',
help='Use vanilla torch.save.')

#parser.add_argument('--single_writer', action='store_true', help='Disable parallel rank writes of data parallel (replicated) state')

Expand Down
73 changes: 73 additions & 0 deletions deepnvme/model_checkpoint/torch_save_load_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Credit https://github.com/sayakpaul
from save_model_utils import get_model, validate_arguments, parse_arguments
from torch_save_utils import load_io_ops, _test_ds_fast_save, test_save
import safetensors.torch
import os
import time
import torch

def test_sft_save(file, buffer, args):
st = time.time()
safetensors.torch.save_file(filename=file, tensors=buffer)
return time.time() - st

def main():
print(
f'Performance test of torch.save() integration of fast model checkpointing.'
)
print(f'torch version = {torch.__version__}')
torch.manual_seed(42)

args = parse_arguments()
if not validate_arguments(args):
quit()
load_io_ops(args)
model, tokenizer, model_name, ckpt_name = get_model(args.model)

inputs = tokenizer("I am good", return_tensors="pt").to("cuda")

if args.half:
model = model.half()
if args.gpu:
model = model.to("cuda")

with torch.no_grad():
model.eval()
pre_logits = model(**inputs).logits

if not args.safetensors:
file = os.path.join(args.folder, f'{ckpt_name}.pt')
else:
file = os.path.join(args.folder, f'{ckpt_name}.safetensors')
if os.path.exists(file):
os.remove(file)
if not args.regular_torch_save and not args.safetensors:
write_sec = _test_ds_fast_save(file, model.state_dict(), args, False)
elif args.regular_torch_save:
write_sec = test_save(file, model.state_dict(), args)
else:
write_sec = test_sft_save(file, model.state_dict(), args)
ckpt_size = os.path.getsize(file)
gb_size = ckpt_size / (1024**3)
gb_per_sec = gb_size / write_sec
print(
f'{gb_size:5.2f} GB, {write_sec:5.2f} secs, {gb_per_sec:5.2f} GB/s'
)
st = time.time()
if args.safetensors:
loaded_sd = safetensors.torch.load_file(file, device="cuda")
else:
loaded_sd = torch.load(file, weights_only=True, map_location="cuda")
load_sec = time.time() - st
print(f"Loaded in {load_sec:5.2f} seconds.")
model.load_state_dict(loaded_sd)
with torch.no_grad():
model.eval()
post_logits = model(**inputs).logits

assert torch.allclose(pre_logits, post_logits, atol=1e-3, rtol=1e-3)
os.remove(file)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion deepnvme/model_checkpoint/torch_save_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def main():
if not validate_arguments(args):
quit()
load_io_ops(args)
model, model_name, ckpt_name = get_model(args.model)
model, tokenizer, model_name, ckpt_name = get_model(args.model)
if args.half:
model = model.half()
if args.gpu:
Expand Down