Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
d2fd6a7
Initial MLI schemas and MessageHandler class (#607)
AlyssaCote Jun 11, 2024
3c9915c
Merge branch 'develop' into mli-feature
ankona Jun 14, 2024
38081da
ML Worker Manager MVP (#608)
ankona Jun 20, 2024
ab900b8
Remove device attribute from schemas (#619)
AlyssaCote Jun 25, 2024
a9ffb14
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
ee2c110
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
8a2f173
Add model metadata to request schema (#624)
AlyssaCote Jul 3, 2024
52abd32
Enable environment variable based configuration for ML Worker Manager…
AlyssaCote Jul 10, 2024
eace71e
FLI-based Worker Manager (#622)
al-rigazzi Jul 15, 2024
5fac3e2
Add ability to specify hardware policies on dragon run requests (#631)
ankona Jul 17, 2024
0030a4a
Revert "Add ability to specify hardware policies on dragon run reques…
ankona Jul 17, 2024
b6c2f2b
Merge latest develop into mli-feature (#640)
ankona Jul 18, 2024
272a1d7
Improve error handling in worker manager (#629)
AlyssaCote Jul 18, 2024
7169f1c
Schema performance improvements (#632)
AlyssaCote Jul 18, 2024
84101b3
New develop merger (#645)
al-rigazzi Jul 19, 2024
e225c07
merging develop
ankona Jul 26, 2024
9f482b1
Merge branch 'develop' into mli-feature
ankona Jul 31, 2024
263e3c7
Fix dragon installation issues (#652)
ankona Aug 2, 2024
0453b8b
Add FeatureStore descriptor to tensor & model keys (#633)
ankona Aug 7, 2024
99ed41c
Merge branch 'develop' into mli-feature
ankona Aug 8, 2024
74d6e78
Use `torch.from_numpy` instead of `torch.tensor` to reduce a copy (#661)
AlyssaCote Aug 8, 2024
391784c
MLI environment variables updated using new naming convention (#665)
AlyssaCote Aug 14, 2024
f7ef49b
Remove pydantic dependency from MLI code (#667)
AlyssaCote Aug 20, 2024
ef034d5
Enable specification of target hostname for a dragon task (#660)
ankona Aug 26, 2024
6d5518b
fix init reordering bug (#675)
ankona Aug 26, 2024
5d85995
Queue-based Worker Manager (#647)
al-rigazzi Aug 28, 2024
128598b
Refactor `exception_handler` to avoid unnecessary building and serial…
AlyssaCote Aug 29, 2024
8aa990c
SmartSim environment variables updated using new naming convention (#…
AlyssaCote Aug 29, 2024
f6d55d8
MLI file names conform to snake case (#689)
AlyssaCote Aug 30, 2024
a3795e7
Add event broadcasting capability (#672)
ankona Sep 10, 2024
28bfd8f
Update MLI docstrings part 1 (#692)
AlyssaCote Sep 10, 2024
61ab71d
Update MLI docstrings part 2 (#699)
AlyssaCote Sep 10, 2024
3e9bffa
Parametrize dragon install (#703)
ankona Sep 12, 2024
b4798da
Merge branch 'develop' into mli-feature
ankona Sep 12, 2024
b0b1db6
Ensure forks build latest public dragon in CI actions (#706)
ankona Sep 18, 2024
0ebd5ab
Clean up error handling in MLI (#698)
AlyssaCote Sep 19, 2024
7336031
Version used to run benchmarks
al-rigazzi Sep 19, 2024
77791d9
Merge branch 'mli-feature' into benchmark
al-rigazzi Sep 19, 2024
d43f7c7
MLI helper methods (#709)
AlyssaCote Sep 19, 2024
5ec287c
Bug fix (#715)
AlyssaCote Sep 20, 2024
ce02433
Committing pre-merge
al-rigazzi Sep 25, 2024
5217a0a
Merge branch 'mli-feature' of https://github.com/CrayLabs/SmartSim in…
al-rigazzi Sep 25, 2024
c1c4604
Working after merge
al-rigazzi Sep 25, 2024
3a5a8ce
Improve TF Worker throughput
al-rigazzi Sep 25, 2024
48caac2
Revert changes to throughput scripts
al-rigazzi Sep 26, 2024
f79f53c
Style, lint, changelog
al-rigazzi Sep 26, 2024
f569b9c
Add tests for TF worker
al-rigazzi Sep 26, 2024
50ce6ad
Improve standalone_worker_manager.py
al-rigazzi Sep 26, 2024
d49634f
Remove commented sections
al-rigazzi Sep 26, 2024
a8d501b
Style
al-rigazzi Sep 26, 2024
79b4954
Switch to Channel.make_process_local
al-rigazzi Sep 26, 2024
152d434
Add timeout and exc handling to WM response
al-rigazzi Sep 26, 2024
14b20f1
More commented lines to remove
al-rigazzi Sep 26, 2024
dbbdcd9
Remove debug information
al-rigazzi Sep 26, 2024
b9bdf99
Fix issue when keras layer has "resource" field
al-rigazzi Sep 27, 2024
3f571f4
Address first comments.
al-rigazzi Oct 2, 2024
dc7307a
Add ONNX worker
al-rigazzi Oct 3, 2024
dccc07d
Add onnx mock app
al-rigazzi Oct 4, 2024
a590a36
Style
al-rigazzi Oct 4, 2024
bf324d2
Add optional compile step for Torch model
al-rigazzi Oct 4, 2024
ca01cb1
Add integration of dragon-based event broadcasting (#710)
ankona Oct 10, 2024
dc31b75
Refine try-catch in onnx worker
al-rigazzi Oct 11, 2024
5afcfcf
Merge branch 'mli-feature' of https://github.com/CrayLabs/SmartSim in…
al-rigazzi Oct 11, 2024
4e6ddff
Use new ProtoClient in apps
al-rigazzi Oct 11, 2024
9947013
Style
al-rigazzi Oct 11, 2024
23de37f
Mypy
al-rigazzi Oct 11, 2024
10d59c8
Style
al-rigazzi Oct 11, 2024
ce5a306
Fix tests
al-rigazzi Oct 14, 2024
f23c267
Fix tests
al-rigazzi Oct 14, 2024
6a08b82
Merge branch 'v1.0' of https://github.com/CrayLabs/SmartSim into tf_w…
al-rigazzi Oct 31, 2024
da92472
Complete post-merge operations
al-rigazzi Oct 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
if: matrix.subset != 'dragon'
run: smart build --device cpu -v


- name: Install ML Runtimes (with dragon)
if: matrix.subset == 'dragon'
env:
Expand Down
5 changes: 5 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jump to:

Description

- Add `TensorFlowWorker` and `ONNXWorker`
- Fix symlink operation
- RequestBatch rewrite
- Fix regression on hostlist param to DragonRunRequest
Expand Down Expand Up @@ -51,6 +52,7 @@ To be released at some point in the future

Description

- Allow specifying Model and Ensemble parameters with
- Implement workaround for Tensorflow that allows RedisAI to build with GCC-14
- Add instructions for installing SmartSim on PML's Scylla

Expand Down Expand Up @@ -102,6 +104,9 @@ Description

Detailed Notes

- The serializer would fail if a parameter for a Model or Ensemble
was specified as a numpy dtype. The constructors for these
methods now validate that the input is number-like and convert
- On Frontier, the MIOPEN cache may need to be set prior to using
RedisAI in the ``smart validate``. The instructions for Frontier
have been updated accordingly.
Expand Down
14 changes: 14 additions & 0 deletions doc/installation_instructions/basic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,20 @@ in combination to customize the Dragon installation. For example:
smart build --device cpu --dragon-repo userfork/dragon --dragon-version 0.91


``smart build`` supports installing a specific version of dragon. It exposes the
parameters ``--dragon-repo`` and ``--dragon-version``, which can be used alone or
in combination to customize the Dragon installation. For example:

.. code-block:: bash

# using the --dragon-repo and --dragon-version flags to customize the Dragon installation
smart build --device cpu --dragon-repo userfork/dragon # install Dragon from a specific repo
smart build --device cpu --dragon-version 0.10 # install a specific Dragon release

# combining both flags
smart build --device cpu --dragon-repo userfork/dragon --dragon-version 0.91


.. note::
Dragon is only supported on Linux systems. For further information, you
can read :ref:`the dedicated documentation page <dragon>`.
Expand Down
88 changes: 66 additions & 22 deletions ex/high_throughput_inference/mli_driver.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,66 @@
import os
import argparse
import base64
import cloudpickle
import os
import shutil
import sys
from smartsim import Experiment
from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker
from smartsim.status import TERMINAL_STATUSES
from smartsim.settings import DragonRunSettings
import time
import typing as t

import cloudpickle

from smartsim import Experiment
from smartsim.settings import DragonRunSettings
from smartsim.status import TERMINAL_STATUSES

parser = argparse.ArgumentParser("Mock application")
parser.add_argument("--log_max_batchsize", default=8, type=int)
parser.add_argument("--num_nodes_app", default=1, type=int)
parser.add_argument("--toolkit", default="torch", choices=["torch","tensorflow","onnx"], type=str)
args = parser.parse_args()

DEVICE = "gpu"
NUM_RANKS = 4
NUM_RANKS_PER_NODE = 1
NUM_NODES_APP = args.num_nodes_app
NUM_WORKERS = 1
BATCH_SIZE = 2
BATCH_TIMEOUT = 0.0
filedir = os.path.dirname(__file__)
worker_manager_script_name = os.path.join(filedir, "standalone_worker_manager.py")
app_script_name = os.path.join(filedir, "mock_app.py")
model_name = os.path.join(filedir, f"resnet50.{DEVICE}.pt")
if args.toolkit == "torch":
# keeping old name for backward compatibility
app_script_name = os.path.join(filedir, "mock_app.py")
model_name = os.path.join(filedir, f"resnet50.{DEVICE}.pt")
else:
app_script_name = os.path.join(filedir, f"mock_app_{args.toolkit}.py")

transport: t.Literal["hsta", "tcp"] = "hsta"

os.environ["SMARTSIM_DRAGON_TRANSPORT"] = transport

exp_path = os.path.join(filedir, f"MLI_proto_{transport.upper()}")
exp_path = os.path.join(
filedir,
"benchmark",
args.toolkit,
f"throughput_n{NUM_NODES_APP}_rpn{NUM_RANKS_PER_NODE}_timeout{BATCH_TIMEOUT}",
f"samples{2**args.log_max_batchsize}",
)
try:
shutil.rmtree(exp_path)
time.sleep(2)
except:
pass
os.makedirs(exp_path, exist_ok=True)
exp = Experiment("MLI_proto", launcher="dragon", exp_path=exp_path)
exp = Experiment("MLI_benchmark", launcher="dragon", exp_path=exp_path)

torch_worker_str = base64.b64encode(cloudpickle.dumps(TorchWorker)).decode("ascii")
if args.toolkit == "torch":
from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker
worker_str = base64.b64encode(cloudpickle.dumps(TorchWorker)).decode("ascii")
elif args.toolkit == "tensorflow":
from smartsim._core.mli.infrastructure.worker.tensorflow_worker import TensorFlowWorker
worker_str = base64.b64encode(cloudpickle.dumps(TensorFlowWorker)).decode("ascii")
elif args.toolkit == "onnx":
from smartsim._core.mli.infrastructure.worker.onnx_worker import ONNXWorker
worker_str = base64.b64encode(cloudpickle.dumps(ONNXWorker)).decode("ascii")

worker_manager_rs: DragonRunSettings = exp.create_run_settings(
sys.executable,
Expand All @@ -34,35 +69,44 @@
"--device",
DEVICE,
"--worker_class",
torch_worker_str,
worker_str,
"--batch_size",
str(NUM_RANKS//NUM_WORKERS),
str(BATCH_SIZE),
"--batch_timeout",
str(0.00),
str(BATCH_TIMEOUT),
"--num_workers",
str(NUM_WORKERS)
str(NUM_WORKERS),
],
)

aff = []

worker_manager_rs.set_cpu_affinity(aff)

worker_manager_rs.set_gpu_affinity([0, 1, 2, 3])
worker_manager_rs.set_hostlist(["pinoak0037"])
worker_manager = exp.create_model("worker_manager", run_settings=worker_manager_rs)
worker_manager.attach_generator_files(to_copy=[worker_manager_script_name])

app_rs: DragonRunSettings = exp.create_run_settings(
sys.executable,
exe_args=[app_script_name, "--device", DEVICE, "--log_max_batchsize", str(6)],
exe_args=[
app_script_name,
"--device",
DEVICE,
"--log_max_batchsize",
str(args.log_max_batchsize),
],
)
app_rs.set_tasks_per_node(NUM_RANKS)

app_rs.set_tasks_per_node(NUM_RANKS_PER_NODE)
app_rs.set_nodes(NUM_NODES_APP)

app = exp.create_model("app", run_settings=app_rs)
app.attach_generator_files(to_copy=[app_script_name], to_symlink=[model_name])
if args.toolkit == "torch":
app.attach_generator_files(to_copy=[app_script_name], to_symlink=[model_name])

exp.generate(worker_manager, app, overwrite=True)
exp.start(worker_manager, app, block=False)
exp.start(worker_manager, block=False)
exp.start(app, block=False)

while True:
if exp.get_status(app)[0] in TERMINAL_STATUSES:
Expand Down
77 changes: 27 additions & 50 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,55 +24,41 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# isort: off
import dragon
from dragon import fli
from dragon.channels import Channel
import dragon.channels
from dragon.data.ddict.ddict import DDict
from dragon.globalservices.api_setup import connect_to_infrastructure
from dragon.utils import b64decode, b64encode

# isort: on

import argparse
import io

from mpi4py import MPI
import torch

from smartsim.log import get_logger
from smartsim._core.mli.client.protoclient import ProtoClient

torch.set_num_interop_threads(16)
torch.set_num_threads(1)

logger = get_logger("App")
logger.info("Started app")

from collections import OrderedDict

from smartsim.log import get_logger, log_to_file
from smartsim._core.mli.client.protoclient import ProtoClient

logger = get_logger("App")


CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False


class ResNetWrapper:
"""Wrapper around a pre-rained ResNet model."""

def __init__(self, name: str, model: str):
"""Initialize the instance.

:param name: The name to use for the model
:param model: The path to the pre-trained PyTorch model"""
self._model = torch.jit.load(model)
self._model = None # torch.jit.load(model)
self._name = name
buffer = io.BytesIO()
scripted = torch.jit.trace(self._model, self.get_batch())
torch.jit.save(scripted, buffer)

with open(model, "rb") as model_file:
buffer = io.BytesIO(model_file.read())
self._serialized_model = buffer.getvalue()

# pylint: disable-next=no-self-use
def get_batch(self, batch_size: int = 32):
"""Create a random batch of data with the correct dimensions to
invoke a ResNet model.
Expand All @@ -96,6 +82,11 @@ def name(self) -> str:
return self._name


def log(msg: str, rank_: int) -> None:
if rank_ == 0:
logger.info(msg)


if __name__ == "__main__":

parser = argparse.ArgumentParser("Mock application")
Expand All @@ -105,38 +96,24 @@ def name(self) -> str:

resnet = ResNetWrapper("resnet50", f"resnet50.{args.device}.pt")

client = ProtoClient(timing_on=True)
client.set_model(resnet.name, resnet.model)
comm_world = MPI.COMM_WORLD
rank = comm_world.Get_rank()
client = ProtoClient(timing_on=True, rank=rank)

if rank == 0:
client.set_model(resnet.name, resnet.model)

if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
# TODO: adapt to non-Nvidia devices
torch_device = args.device.replace("gpu", "cuda")
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(
torch_device
)
comm_world.Barrier()

TOTAL_ITERATIONS = 100

for log2_bsize in range(args.log_max_batchsize + 1):
for log2_bsize in range(args.log_max_batchsize, args.log_max_batchsize + 1):
b_size: int = 2**log2_bsize
logger.info(f"Batch size: {b_size}")
for iteration_number in range(TOTAL_ITERATIONS + int(b_size == 1)):
logger.info(f"Iteration: {iteration_number}")
sample_batch = resnet.get_batch(b_size)
log(f"Batch size: {b_size}", rank)
for iteration_number in range(TOTAL_ITERATIONS):
sample_batch = resnet.get_batch(b_size).numpy()
remote_result = client.run_model(resnet.name, sample_batch)
comm_world.Barrier()
logger.info(client.perf_timer.get_last("total_time"))
if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
local_res = pt_model(sample_batch.to(torch_device))
err_norm = torch.linalg.vector_norm(
torch.flatten(remote_result).to(torch_device)
- torch.flatten(local_res),
ord=1,
).cpu()
res_norm = torch.linalg.vector_norm(remote_result, ord=1).item()
local_res_norm = torch.linalg.vector_norm(local_res, ord=1).item()
logger.info(
f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}"
)
torch.cuda.synchronize()

client.perf_timer.print_timings(to_file=True)

client.perf_timer.print_timings(to_file=True, to_stdout=rank == 0)
Loading
Loading