diff --git a/.gitignore b/.gitignore index 4160575..6fe8679 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ __pycache__/ *.so # Distribution / packaging +wandb/ .Python build/ develop-eggs/ diff --git a/pipegoose/distributed/parallel_context.py b/pipegoose/distributed/parallel_context.py index 652d0f3..6992e53 100644 --- a/pipegoose/distributed/parallel_context.py +++ b/pipegoose/distributed/parallel_context.py @@ -125,8 +125,8 @@ def __init__( self.init_global_dist(rank, world_size, backend, host, port) self.init_parallel_groups() - # if torch.cuda.is_available(): - # self.set_device() + if torch.cuda.is_available() and backend == "nccl": + self.set_device() self.map_rank_to_device() @@ -261,17 +261,20 @@ def set_seed(self, seed: int): def map_rank_to_device(self): """Map global rank to device.""" + rank_tensor = torch.zeros(len(self._local_ranks), dtype=torch.long) + rank_tensor = rank_tensor.cuda() if torch.cuda.is_available() else rank_tensor for idx, local_rank in enumerate(self._local_ranks.values()): rank_tensor[idx] = local_rank rank_tensor_list = [ - torch.zeros(rank_tensor.size(), dtype=torch.long) for _ in range(self.get_world_size(ParallelMode.GLOBAL)) + torch.zeros(rank_tensor.size(), dtype=torch.long).cuda() if torch.cuda.is_available() else torch.zeros(rank_tensor.size(), dtype=torch.long) + for _ in range(self.get_world_size(ParallelMode.GLOBAL)) ] dist.all_gather(tensor_list=rank_tensor_list, tensor=rank_tensor) - + for _rank, _rank_tensor in enumerate(rank_tensor_list): modes_and_ranks = {mode: rank for mode, rank in zip(self._local_ranks.keys(), _rank_tensor.tolist())} self._ranks_to_device[tuple(modes_and_ranks.items())] = _rank diff --git a/pipegoose/nn/tensor_parallel/_functional.py b/pipegoose/nn/tensor_parallel/_functional.py index 93cdd5f..409486e 100644 --- a/pipegoose/nn/tensor_parallel/_functional.py +++ b/pipegoose/nn/tensor_parallel/_functional.py @@ -25,7 +25,10 @@ def backward(ctx: Any, grad: torch.Tensor) -> Tuple[torch.Tensor, None, None]: all_reduce(grad, parallel_context=parallel_context, parallel_mode=ParallelMode.TENSOR) - return (grad, None, None) + return ( + grad, + None, + ) class _Gather(Function): diff --git a/pipegoose/nn/tensor_parallel/linear.py b/pipegoose/nn/tensor_parallel/linear.py index 53a5d68..c38e3b9 100644 --- a/pipegoose/nn/tensor_parallel/linear.py +++ b/pipegoose/nn/tensor_parallel/linear.py @@ -32,17 +32,16 @@ def __init__( if bias is True: self.bias = nn.Parameter(torch.randn(out_per_partition)) - + else: + self.bias = None + def _get_output_per_partition(self, out_features: int, parallel_context: ParallelContext) -> int: local_world_size = parallel_context.get_world_size(ParallelMode.TENSOR) return out_features // local_world_size def forward(self, input: torch.Tensor) -> torch.Tensor: input_parallel = broadcast_to_tensor_group(input, self.parallel_context) - outputs = F.linear(input_parallel, self.weight) - - if self.bias is not None: - outputs = outputs + self.bias + outputs = F.linear(input_parallel, self.weight, self.bias) if self.gather_output: outputs = gather_to_tensor_group(outputs, dim=-1, parallel_context=self.parallel_context) diff --git a/pipegoose/nn/tensor_parallel/parallel_mapping.py b/pipegoose/nn/tensor_parallel/parallel_mapping.py index 875981c..8f1228f 100644 --- a/pipegoose/nn/tensor_parallel/parallel_mapping.py +++ b/pipegoose/nn/tensor_parallel/parallel_mapping.py @@ -34,6 +34,8 @@ class ParallelMapping: Row(("mlp.dense_4h_to_h", "self_attention.dense")), LMHead(("lm_head",)), ], + "debug_single_mlp": [Column(("debug_single_mlp",))], + } @staticmethod diff --git a/pipegoose/nn/tensor_parallel/tensor_parallel.py b/pipegoose/nn/tensor_parallel/tensor_parallel.py index b0130bd..fb2cbca 100644 --- a/pipegoose/nn/tensor_parallel/tensor_parallel.py +++ b/pipegoose/nn/tensor_parallel/tensor_parallel.py @@ -6,10 +6,7 @@ from pipegoose.distributed.parallel_context import ParallelContext from pipegoose.nn.parallel import Parallel from pipegoose.nn.tensor_parallel.parallelizer import ( - EmbeddingParallelizer, - LayerNormParallelizer, LinearParallelizer, - LMHeadParallelizer, ModuleParallelizer, ) @@ -17,7 +14,8 @@ class TensorParallel(Parallel): """Turn a 🤗 transformers model into a tensor parallel model.""" - PARALLELIZERS = [EmbeddingParallelizer, LinearParallelizer, LayerNormParallelizer, LMHeadParallelizer] + # PARALLELIZERS = [EmbeddingParallelizer, LinearParallelizer, LayerNormParallelizer, LMHeadParallelizer] + PARALLELIZERS = [LinearParallelizer] def __init__(self, module: nn.Module, parallel_context: ParallelContext): self.module = module @@ -33,6 +31,11 @@ def parallelize(self) -> nn.Module: # multiple times. so we filter out and retain the non-repetitive modules (leaf modules) leaf_modules = self._get_leaf_modules(module) for module_name, leaf_module in leaf_modules: + # NOTE: just skip parallelizing query_key_value in attention + # for debugging purposes + if "query_key_value" in module_name: + continue + parallelizer = self._find_parallelizer(module_name, leaf_module) if parallelizer is not None: parallelizer(module_name, leaf_module, module, self.parallel_context).parallelize() diff --git a/pipegoose/utils/logger.py b/pipegoose/utils/logger.py new file mode 100644 index 0000000..32d45ef --- /dev/null +++ b/pipegoose/utils/logger.py @@ -0,0 +1,154 @@ +import datetime +import inspect +import sys +import os +import wandb +import glob +import re +import os + +class Logger: + # https://github.com/Cadene/bootstrap.pytorch/blob/master/bootstrap/lib/logger.py + """ The Logger class is a singleton. It contains all the utilities + for logging variables in a key-value dictionary. + It can also be considered as a replacement for the print function. + + .. code-block:: python + + Logger(dir_logs='logs/mnist') + Logger().flush() # write the logs.json + Logger()("Launching training procedures") # written to logs.txt + > [I 2018-07-23 18:58:31] ...trap/engines/engine.py.80: Launching training procedures + """ + + DEBUG = -1 + INFO = 0 + SUMMARY = 1 + WARNING = 2 + ERROR = 3 + SYSTEM = 4 + _instance = None + indicator = {DEBUG: 'D', INFO: 'I', SUMMARY: 'S', WARNING: 'W', ERROR: 'E', SYSTEM: 'S'} + + class Colors: + END = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + GREY = 30 + RED = 31 + GREEN = 32 + YELLOW = 33 + BLUE = 34 + PURPLE = 35 + SKY = 36 + WHITE = 37 + BACKGROUND = 10 + LIGHT = 60 + + @staticmethod + def code(value): + return '\033[{}m'.format(value) + + colorcode = { + DEBUG: Colors.code(Colors.GREEN), + INFO: Colors.code(Colors.GREY + Colors.LIGHT), + SUMMARY: Colors.code(Colors.BLUE + Colors.LIGHT), + WARNING: Colors.code(Colors.YELLOW + Colors.LIGHT), + ERROR: Colors.code(Colors.RED + Colors.LIGHT), + SYSTEM: Colors.code(Colors.WHITE + Colors.LIGHT) + } + + compactjson = True + log_level = None # log level + dir_logs = None + path_json = None + path_txt = None + file_txt = None + name = None + max_lineno_width = 3 + + def __new__(cls, dir_logs=None, name='logs'): + if Logger._instance is None: + Logger._instance = object.__new__(Logger) + + if dir_logs: + Logger._instance.name = name + Logger._instance.dir_logs = dir_logs + Logger._instance.path_txt = os.path.join(dir_logs, '{}.txt'.format(name)) + Logger._instance.file_txt = open(os.path.join(dir_logs, '{}.txt'.format(name)), 'a+') + # NOTE: Support json or CSV ? + # Logger._instance.path_json = os.path.join(dir_logs, '{}.json'.format(name)) + # Logger._instance.reload_json() + else: + Logger._instance.log_message('No logs files will be created (dir_logs attribute is empty)', + log_level=Logger.WARNING) + + return Logger._instance + + def __call__(self, *args, **kwargs): + return self.log_message(*args, **kwargs, stack_displacement=2) + + def log_message(self, *message, log_level=INFO, break_line=True, print_header=True, stack_displacement=1, + raise_error=True, adaptive_width=True): + + if self.dir_logs and not self.file_txt: + raise Exception('Critical: Log file not defined. Do you have write permissions for {}?'.format(self.dir_logs)) + + caller_info = inspect.getframeinfo(inspect.stack()[stack_displacement][0]) + message = ' '.join([str(m) for m in list(message)]) + + if print_header: + message_header = '[{} {:%Y-%m-%d %H:%M:%S}]'.format(self.indicator[log_level], + datetime.datetime.now()) + filename = caller_info.filename + if adaptive_width: + # allows the lineno_width to grow when necessary + lineno_width = len(str(caller_info.lineno)) + self.max_lineno_width = max(lineno_width, self.max_lineno_width) + else: + # manually fix it to 3 numbers + lineno_width = 3 + + if len(filename) > 28 - self.max_lineno_width: + filename = '...{}'.format(filename[-22 - (self.max_lineno_width - lineno_width):]) + + message_locate = '{}.{}:'.format(filename, caller_info.lineno) + message_logger = '{} {} {}'.format(message_header, message_locate, message) + message_screen = '{}{}{}{} {} {}'.format(self.Colors.BOLD, + self.colorcode[log_level], + message_header, + self.Colors.END, + message_locate, + message) + else: + message_logger = message + message_screen = message + + if break_line: + print(message_screen) + if self.dir_logs: + self.file_txt.write('%s\n' % message_logger) + else: + print(message_screen, end='') + sys.stdout.flush() + if self.dir_logs: + self.file_txt.write(message_logger) + + if self.dir_logs: + self.file_txt.flush() + if log_level == self.ERROR and raise_error: + raise Exception(message) + + def update_log_file(self, path_src, path_dst): + """ + Append content of file at path_src to file at path_dst + """ + + with open(path_src, 'r') as f: + lines_src = f.readlines() + + with open(path_dst, 'r') as f: + lines_dst = f.readlines() + + with open(path_dst, 'w') as f: + f.writelines(lines_src + ["\n"] + lines_dst) \ No newline at end of file diff --git a/tests/convergence/debug_batch.pt b/tests/convergence/debug_batch.pt new file mode 100644 index 0000000..1f696bb Binary files /dev/null and b/tests/convergence/debug_batch.pt differ diff --git a/tests/convergence/debug_target.pt b/tests/convergence/debug_target.pt new file mode 100644 index 0000000..076bb06 Binary files /dev/null and b/tests/convergence/debug_target.pt differ diff --git a/tests/convergence/model.pt b/tests/convergence/model.pt new file mode 100644 index 0000000..24e9582 Binary files /dev/null and b/tests/convergence/model.pt differ diff --git a/tests/convergence/run_tp.py b/tests/convergence/run_tp.py new file mode 100644 index 0000000..b3c5bf5 --- /dev/null +++ b/tests/convergence/run_tp.py @@ -0,0 +1,210 @@ +from copy import deepcopy + +import torch +import torch.distributed as dist +from datasets import load_dataset +from torch.optim import SGD +from torch.utils.data import DataLoader +from torch.utils.data.distributed import DistributedSampler +from transformers import AutoModelForCausalLM, AutoTokenizer + +from pipegoose.distributed.parallel_context import ParallelContext +from pipegoose.distributed.parallel_mode import ParallelMode +from pipegoose.nn import TensorParallel +from pipegoose.utils.logger import Logger + +def get_model_params_size(model, fp_bytes=4): + params_size = 0 + for p in model.parameters(): + params_size += p.numel() + params_gb = params_size * fp_bytes / 2**30 + return params_gb + + +def set_seed(seed): + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + + +if __name__ == "__main__": + import wandb + + DATA_PARALLEL_SIZE = 1 + TENSOR_PARALLEL_SIZE = 2 + PIPELINE_PARALLEL_SIZE = 1 + MODEL = "bigscience/bloom-560m" + DATASET = "imdb" + NUM_EPOCHS = 4 + LR = 1e-3 + SEED = 69 + BATCH_SIZE = 4 + CONTEXT_LENGTH = 1024 + + torch.cuda.empty_cache() + set_seed(SEED) + + Logger()(f"device_count: {torch.cuda.device_count()}") + Logger()(f"is available: {torch.cuda.is_available()}") + + parallel_context = ParallelContext.from_torch( + data_parallel_size=DATA_PARALLEL_SIZE, + tensor_parallel_size=TENSOR_PARALLEL_SIZE, + pipeline_parallel_size=PIPELINE_PARALLEL_SIZE, + ) + rank = parallel_context.get_global_rank() + + Logger()(f"rank={rank}, initialized parallel_context") + + train_dataset = load_dataset("imdb", split="train[:130]") + train_dataset = train_dataset.map(lambda x: {"text": x["text"][:10]}) # for demonstration purposes + + dp_rank = parallel_context.get_local_rank(ParallelMode.DATA) + train_sampler = DistributedSampler(train_dataset, num_replicas=DATA_PARALLEL_SIZE, rank=dp_rank, seed=SEED) + train_dataloader = DataLoader( + train_dataset, + batch_size=BATCH_SIZE // DATA_PARALLEL_SIZE, + shuffle=False, + sampler=train_sampler, + ) + + val_dataset = load_dataset("imdb", split="test[:130]") + val_dataset = val_dataset.map(lambda x: {"text": x["text"][:10]}) # for demonstration purposes + val_sampler = DistributedSampler(val_dataset, num_replicas=DATA_PARALLEL_SIZE, rank=dp_rank, seed=SEED) + val_dataloader = DataLoader( + val_dataset, + batch_size=BATCH_SIZE // DATA_PARALLEL_SIZE, + shuffle=False, + sampler=val_sampler, + ) + + model = AutoModelForCausalLM.from_pretrained(MODEL) + ref_model = deepcopy(model) + tokenizer = AutoTokenizer.from_pretrained(MODEL) + tokenizer.pad_token = tokenizer.eos_token + tokenizer.add_special_tokens({"pad_token": "[PAD]"}) + + Logger()(f"rank={rank}, model size before parallelizing: {round(get_model_params_size(model), 3)} GB") + + dist.barrier() + + model = TensorParallel(model, parallel_context).parallelize() + # model = DataParallel(model, parallel_context).parallelize() + optim = SGD(model.parameters(), lr=LR) + # optim = DistributedOptimizer(optim, parallel_context) + model.to("cuda") + device = next(model.parameters()).device + + Logger()(f"rank={rank}, model size after parallelizing: {round(get_model_params_size(model), 3)} GB") + Logger()(f"rank={rank}, model is moved to device: {device}") + + ref_model.to(device) + # if DATA_PARALLEL_SIZE > 1: + # ref_model = torch.nn.parallel.DistributedDataParallel(ref_model) + + ref_optim = SGD(ref_model.parameters(), lr=LR) + + model.train() + ref_model.train() + step = 0 + dist.barrier() + + if rank == 0: + + def get_time_name(): + import datetime + + today = datetime.datetime.now() + return today.strftime("%d/%m/%Y_%H:%M:%S") + + wandb.init( + project="pipegoose", + name=f"{get_time_name()}.test_dp_tp_zero1_converegence", + config={ + "data_parallel_size": DATA_PARALLEL_SIZE, + "tensor_parallel_size": TENSOR_PARALLEL_SIZE, + "pipeline_parallel_size": PIPELINE_PARALLEL_SIZE, + "model": MODEL, + "dataset": DATASET, + "epochs": NUM_EPOCHS, + "learning_rate": LR, + "seed": SEED, + "batch_size": BATCH_SIZE, + }, + ) + + for epoch in range(NUM_EPOCHS): + train_sampler.set_epoch(epoch) + Logger()(f"rank={rank}, epoch={epoch}") + + for batch in train_dataloader: + inputs = tokenizer( + batch["text"], + padding=True, + truncation=True, + max_length=CONTEXT_LENGTH, + return_tensors="pt", + ) + inputs = {name: tensor.to(device) for name, tensor in inputs.items()} + labels = inputs["input_ids"] + + outputs = model(**inputs, labels=labels) + ref_outputs = ref_model(**inputs, labels=labels) + + optim.zero_grad() + outputs.loss.backward() + optim.step() + + ref_optim.zero_grad() + ref_outputs.loss.backward() + ref_optim.step() + + Logger()(f"epoch={epoch}, step={step}, rank={rank}, train_loss={outputs.loss}, ref_train_loss={ref_outputs.loss}") + + if rank == 0: + wandb.log( + { + "train_loss": outputs.loss, + "ref_train_loss": ref_outputs.loss, + "step": step, + "epoch": epoch, + } + ) + + step += 1 + + model.eval() + ref_model.eval() + dist.barrier() + + step = 0 + val_sampler.set_epoch(1) + + for batch in val_dataloader: + inputs = tokenizer( + batch["text"], + padding=True, + truncation=True, + max_length=CONTEXT_LENGTH, + return_tensors="pt", + ) + inputs = {name: tensor.to(device) for name, tensor in inputs.items()} + labels = inputs["input_ids"] + + outputs = model(**inputs, labels=labels) + ref_outputs = ref_model(**inputs, labels=labels) + + Logger()(f"rank={rank}, val_loss={outputs.loss}, ref_val_loss={ref_outputs.loss}, step={step}") + + if rank == 0: + wandb.log( + { + "val_loss": outputs.loss, + "ref_val_loss": ref_outputs.loss, + "step": step, + } + ) + + step += 1 + + wandb.finish() + model.cpu() diff --git a/tests/convergence/run_tp_mnist.py b/tests/convergence/run_tp_mnist.py new file mode 100644 index 0000000..1918526 --- /dev/null +++ b/tests/convergence/run_tp_mnist.py @@ -0,0 +1,232 @@ +from copy import deepcopy + +import torch +import torch.distributed as dist +from torch.optim import SGD +from torch.utils.data import DataLoader, random_split +import torch.nn as nn +from torchvision import datasets, transforms + +from pipegoose.distributed.parallel_context import ParallelContext +from pipegoose.distributed.parallel_mode import ParallelMode +from pipegoose.nn import TensorParallel +from pipegoose.utils.logger import Logger + +class NN(nn.Module): + def __init__(self, input_size, output_size): + super(NN, self).__init__() + self.debug_single_mlp = nn.Linear(input_size, output_size) + + def forward(self, x): + x = torch.flatten(x, 1) + x = self.debug_single_mlp(x) + return x + +class MNISTloader: + def __init__( + self, + batch_size: int = 64, + data_dir: str = "./data/", + num_workers: int = 0, + pin_memory: bool = False, + shuffle: bool = False, + train_val_split: float = 0.1, + ): + self.batch_size = batch_size + self.data_dir = data_dir + self.num_workers = num_workers + self.pin_memory = pin_memory + self.shuffle = shuffle + self.train_val_split = train_val_split + + self.setup() + + def setup(self): + transform = transforms.Compose( + [ + transforms.Resize((32, 32)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.5], std=[0.5]), + ] + ) + + self.train_dataset = datasets.MNIST( + self.data_dir, train=True, download=True, transform=transform + ) + val_split = int(len(self.train_dataset) * self.train_val_split) + train_split = len(self.train_dataset) - val_split + + self.train_dataset, self.val_dataset = random_split( + self.train_dataset, [train_split, val_split] + ) + self.test_dataset = datasets.MNIST( + self.data_dir, train=False, download=True, transform=transform + ) + + print( + "Image Shape: {}".format(self.train_dataset[0][0].numpy().shape), + end="\n\n", + ) + print("Training Set: {} samples".format(len(self.train_dataset))) + print("Validation Set: {} samples".format(len(self.val_dataset))) + print("Test Set: {} samples".format(len(self.test_dataset))) + + def load(self): + train_loader = DataLoader( + dataset=self.train_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + val_loader = DataLoader( + dataset=self.val_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + test_loader = DataLoader( + dataset=self.test_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + return train_loader, val_loader, test_loader + +def set_seed(seed): + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + +if __name__ == "__main__": + import wandb + + DATA_PARALLEL_SIZE = 1 + TENSOR_PARALLEL_SIZE = 2 + PIPELINE_PARALLEL_SIZE = 1 + NUM_EPOCHS = 30 + LR = 2e-1 + SEED = 42 + BATCH_SIZE = 1024 + + torch.cuda.empty_cache() + set_seed(SEED) + + Logger()(f"device_count: {torch.cuda.device_count()}") + Logger()(f"is available: {torch.cuda.is_available()}") + + parallel_context = ParallelContext.from_torch( + data_parallel_size=DATA_PARALLEL_SIZE, + tensor_parallel_size=TENSOR_PARALLEL_SIZE, + pipeline_parallel_size=PIPELINE_PARALLEL_SIZE, + ) + rank = parallel_context.get_global_rank() + + Logger()(f"rank={rank}, initialized parallel_context") + + dp_rank = parallel_context.get_local_rank(ParallelMode.DATA) + # train_dataloader, _, _ = MNISTloader(batch_size=BATCH_SIZE).load() + # for batch_idx, (debug_batch, debug_target) in enumerate(train_dataloader): + # break + # Dump batch of data to reload later + # torch.save(debug_batch, "debug_batch.pt") + # torch.save(debug_target, "debug_target.pt") + + # Load batch of data + debug_batch = torch.load("debug_batch.pt") + debug_target = torch.load("debug_target.pt") + + model = NN(input_size=32 * 32, output_size=10) + model.load_state_dict(torch.load("model.pt")) + ref_model = deepcopy(model) + + dist.barrier() + + model = TensorParallel(model, parallel_context).parallelize() + optim = SGD(model.parameters(), lr=LR) + criterion = nn.CrossEntropyLoss() + + model.to("cuda") + device = next(model.parameters()).device + + Logger()(f"rank={rank}, model is moved to device: {device}") + + ref_model.to(device) + ref_optim = SGD(ref_model.parameters(), lr=LR) + ref_criterion = nn.CrossEntropyLoss() + + + model.train() + ref_model.train() + step = 0 + dist.barrier() + + if rank == 0: + + def get_time_name(): + import datetime + + today = datetime.datetime.now() + return today.strftime("%d/%m/%Y_%H:%M:%S") + + wandb.init( + project="pipegoose", + name=f"{get_time_name()}.test_tp_mnist_converegence", + config={ + "data_parallel_size": DATA_PARALLEL_SIZE, + "tensor_parallel_size": TENSOR_PARALLEL_SIZE, + "pipeline_parallel_size": PIPELINE_PARALLEL_SIZE, + "model": "NN", + "dataset": "MNIST", + "epochs": NUM_EPOCHS, + "learning_rate": LR, + "seed": SEED, + "batch_size": BATCH_SIZE, + }, + ) + + # wandb log image + # wandb.log({"examples": [wandb.Image(img.numpy()) for img in debug_batch]}) + + for epoch in range(NUM_EPOCHS): + Logger()(f"rank={rank}, epoch={epoch}") + + train_loss_running, train_acc_running = 0, 0 + + inputs, labels = debug_batch.to(device), debug_target.to(device) + + outputs = model(inputs) + _, predictions = torch.max(outputs, dim=1) + loss = criterion(outputs, labels) + + ref_outputs = ref_model(inputs) + _, ref_predictions = torch.max(ref_outputs, dim=1) + ref_loss = ref_criterion(ref_outputs, labels) + + optim.zero_grad() + loss.backward() + optim.step() + + ref_optim.zero_grad() + ref_loss.backward() + ref_optim.step() + + Logger()(f"epoch={epoch}, rank={rank}, train_loss={loss}, ref_train_loss={ref_loss}") + + if rank == 0: + wandb.log( + { + "train_loss": loss, + "ref_train_loss": ref_loss, + "epoch": epoch, + } + ) + + + dist.barrier() + wandb.finish() + model.cpu() diff --git a/tests/convergence/run_tp_small.py b/tests/convergence/run_tp_small.py new file mode 100644 index 0000000..360e76f --- /dev/null +++ b/tests/convergence/run_tp_small.py @@ -0,0 +1,117 @@ +from copy import deepcopy + +import torch +import torch.distributed as dist +from torch.optim import SGD +from torch.utils.data import DataLoader, random_split +import torch.nn as nn +from torchvision import datasets, transforms + +from pipegoose.distributed.parallel_context import ParallelContext +from pipegoose.distributed.parallel_mode import ParallelMode +from pipegoose.nn import TensorParallel +from pipegoose.utils.logger import Logger +import torch.nn.functional as F +import numpy as np +import random + +class NN(nn.Module): + def __init__(self, input_size, output_size): + super(NN, self).__init__() + self.debug_single_mlp = nn.Linear(input_size, output_size) + + def forward(self, x): + x = self.debug_single_mlp(x) + return x + +def set_random_seed(seed: int): + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + +if __name__ == "__main__": + DATA_PARALLEL_SIZE = 1 + TENSOR_PARALLEL_SIZE = 2 + PIPELINE_PARALLEL_SIZE = 1 + NUM_EPOCHS = 30 + LR = 2e-1 + SEED = 42 + + torch.cuda.empty_cache() + + Logger()(f"device_count: {torch.cuda.device_count()}") + Logger()(f"is available: {torch.cuda.is_available()}") + + parallel_context = ParallelContext.from_torch( + data_parallel_size=DATA_PARALLEL_SIZE, + tensor_parallel_size=TENSOR_PARALLEL_SIZE, + pipeline_parallel_size=PIPELINE_PARALLEL_SIZE, + seed=SEED, + backend="nccl" + ) + + rank = parallel_context.get_local_rank(ParallelMode.TENSOR) + set_random_seed(SEED + rank) + + Logger()(f"rank={rank}, initialized parallel_context") + + BATCH_SIZE = 1 + IN_FEATURES = 4 + OUT_FEATURES = 6 + + X = torch.randn(BATCH_SIZE, IN_FEATURES, device="cuda", requires_grad=True) + L_weight = torch.randn(BATCH_SIZE, OUT_FEATURES, device="cuda") + + # Rank 0 brodcast X and W to other rank + dist.broadcast(X, src=0) + dist.broadcast(L_weight, src=0) + + Logger()(f"[rank {rank}]: {X}") + Logger()(f"[rank {rank}]: {L_weight}") + + model = NN(input_size=IN_FEATURES, output_size=OUT_FEATURES) + model_ref = deepcopy(model) + + dist.barrier() + + model = TensorParallel(model, parallel_context).parallelize() + model.to("cuda") + device = next(model.parameters()).device + model_ref.to(device) + Logger()(f"[rank {rank}]: model is moved to device: {device}") + + # Reference + Y_ref = model_ref(X) + L_ref = torch.mul(Y_ref, L_weight).sum() + # Manually compute the gradient + dLdW_ref = torch.matmul(L_weight.t(), X) + dLdX_ref = torch.matmul(L_weight, model_ref.debug_single_mlp.weight) + + dist.barrier() + + # Distributed + Logger()("===========FORWARD===========") + Y = model(X) + L = torch.mul(Y, L_weight).sum() + Y.retain_grad() + + Logger()("===========BACKWARD===========") + L.backward() + + #HACK: we need to divide by world size because we are calling L.backward() on rank 0 and 1 + # Too lazy to find a way to merge into a single matrix + dLdX = X.grad / dist.get_world_size() + + if rank == 0: + #NOTE: tests inspired from https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/mpu/tests/test_layers.py#L173 + Logger()(f"error Y_ref - Y: {Y_ref.sub(Y).abs().max()}") + Logger()(f"error L_ref - L: {L_ref.sub(L).abs().max()}") + Logger()(f"error dLdX_ref - dLdX: {dLdX_ref.sub(dLdX).abs().max()}") + + dist.barrier() + + dLdW_ref = torch.split(dLdW_ref, OUT_FEATURES // dist.get_world_size(), dim=0)[rank].contiguous() + dLdW = model.debug_single_mlp.weight.grad + Logger()(f"error dLdW_ref - dLdW (rank {rank}): {dLdW_ref.sub(dLdW).abs().max()}") \ No newline at end of file diff --git a/tests/convergence/sandbox.ipynb b/tests/convergence/sandbox.ipynb new file mode 100644 index 0000000..5f5282e --- /dev/null +++ b/tests/convergence/sandbox.ipynb @@ -0,0 +1,250 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " NN(\n", + " (out): Linear(in_features=10, out_features=5, bias=True)\n", + ")\n", + "out Linear(in_features=10, out_features=5, bias=True)\n" + ] + } + ], + "source": [ + "import torch\n", + "import torch.nn as nn\n", + "\n", + "class NN(nn.Module):\n", + " def __init__(self, input_size, output_size):\n", + " super(NN, self).__init__()\n", + " self.out = nn.Linear(input_size, output_size)\n", + "\n", + " def forward(self, x):\n", + " x = torch.flatten(x, 1)\n", + " x = self.out(x)\n", + " return x\n", + "\n", + "# Example of using named_children\n", + "model = NN(input_size=10, output_size=5)\n", + "for name, module in model.named_modules():\n", + " print(name, module)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Image Shape: (1, 32, 32)\n", + "\n", + "Training Set: 54000 samples\n", + "Validation Set: 6000 samples\n", + "Test Set: 10000 samples\n", + "torch.Size([1, 1, 32, 32])\n" + ] + } + ], + "source": [ + "from torchvision import datasets, transforms\n", + "from torch.utils.data import DataLoader, random_split\n", + "\n", + "\n", + "class MNISTloader:\n", + " def __init__(\n", + " self,\n", + " batch_size: int = 64,\n", + " data_dir: str = \"./data/\",\n", + " num_workers: int = 0,\n", + " pin_memory: bool = False,\n", + " shuffle: bool = False,\n", + " train_val_split: float = 0.1,\n", + " ):\n", + " self.batch_size = batch_size\n", + " self.data_dir = data_dir\n", + " self.num_workers = num_workers\n", + " self.pin_memory = pin_memory\n", + " self.shuffle = shuffle\n", + " self.train_val_split = train_val_split\n", + "\n", + " self.setup()\n", + "\n", + " def setup(self):\n", + " transform = transforms.Compose(\n", + " [\n", + " transforms.Resize((32, 32)),\n", + " transforms.ToTensor(),\n", + " transforms.Normalize(mean=[0.5], std=[0.5]),\n", + " ]\n", + " )\n", + "\n", + " self.train_dataset = datasets.MNIST(\n", + " self.data_dir, train=True, download=True, transform=transform\n", + " )\n", + " val_split = int(len(self.train_dataset) * self.train_val_split)\n", + " train_split = len(self.train_dataset) - val_split\n", + "\n", + " self.train_dataset, self.val_dataset = random_split(\n", + " self.train_dataset, [train_split, val_split]\n", + " )\n", + " self.test_dataset = datasets.MNIST(\n", + " self.data_dir, train=False, download=True, transform=transform\n", + " )\n", + "\n", + " print(\n", + " \"Image Shape: {}\".format(self.train_dataset[0][0].numpy().shape),\n", + " end=\"\\n\\n\",\n", + " )\n", + " print(\"Training Set: {} samples\".format(len(self.train_dataset)))\n", + " print(\"Validation Set: {} samples\".format(len(self.val_dataset)))\n", + " print(\"Test Set: {} samples\".format(len(self.test_dataset)))\n", + "\n", + " def load(self):\n", + " train_loader = DataLoader(\n", + " dataset=self.train_dataset,\n", + " batch_size=self.batch_size,\n", + " num_workers=self.num_workers,\n", + " pin_memory=self.pin_memory,\n", + " shuffle=self.shuffle,\n", + " )\n", + "\n", + " val_loader = DataLoader(\n", + " dataset=self.val_dataset,\n", + " batch_size=self.batch_size,\n", + " num_workers=self.num_workers,\n", + " pin_memory=self.pin_memory,\n", + " shuffle=self.shuffle,\n", + " )\n", + "\n", + " test_loader = DataLoader(\n", + " dataset=self.test_dataset,\n", + " batch_size=self.batch_size,\n", + " num_workers=self.num_workers,\n", + " pin_memory=self.pin_memory,\n", + " shuffle=self.shuffle,\n", + " )\n", + "\n", + " return train_loader, val_loader, test_loader\n", + "\n", + "\n", + "# Load only 1 image\n", + "train_loader, val_loader, test_loader = MNISTloader(batch_size=1).load()\n", + "for batch_idx, (data, target) in enumerate(train_loader):\n", + " print(data.shape)\n", + " break\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "torch.Size([1, 1, 32, 32])\n" + ] + } + ], + "source": [ + "print(data.shape)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "# Plot the first image\n", + "plt.imshow(data[0][0], cmap=\"gray\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "\n", + "n = 1024\n", + "m = 1024\n", + "\n", + "X = torch.randn(m,m)\n", + "W = nn.Parameter(torch.randn(n, m))\n", + "b = nn.Parameter(torch.randn(n))\n", + "\n", + "\n", + "ref = F.linear(X, W, b)\n", + "out = F.linear(X, W) + b\n", + "\n", + "torch.testing.assert_close(ref, out, msg=lambda msg: f\"{name}:\\n{msg}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env-pipegoose", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.2" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tests/convergence/train_mlp.py b/tests/convergence/train_mlp.py new file mode 100644 index 0000000..0d9078a --- /dev/null +++ b/tests/convergence/train_mlp.py @@ -0,0 +1,138 @@ +import torch +import torch.nn as nn +import torch.optim as optim +from torchvision import datasets, transforms +from torch.utils.data import DataLoader, random_split + +from pipegoose.utils.logger import Logger + +def seed_everything(seed: int): + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + +class NN(nn.Module): + def __init__(self, input_size, output_size): + super(NN, self).__init__() + self.debug_single_mlp = nn.Linear(input_size, output_size) + + def forward(self, x): + x = torch.flatten(x, 1) + x = self.debug_single_mlp(x) + return x + +class MNISTloader: + def __init__( + self, + batch_size: int = 64, + data_dir: str = "./data/", + num_workers: int = 0, + pin_memory: bool = False, + shuffle: bool = False, + train_val_split: float = 0.1, + ): + self.batch_size = batch_size + self.data_dir = data_dir + self.num_workers = num_workers + self.pin_memory = pin_memory + self.shuffle = shuffle + self.train_val_split = train_val_split + + self.setup() + + def setup(self): + transform = transforms.Compose( + [ + transforms.Resize((32, 32)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.5], std=[0.5]), + ] + ) + + self.train_dataset = datasets.MNIST( + self.data_dir, train=True, download=True, transform=transform + ) + val_split = int(len(self.train_dataset) * self.train_val_split) + train_split = len(self.train_dataset) - val_split + + self.train_dataset, self.val_dataset = random_split( + self.train_dataset, [train_split, val_split] + ) + self.test_dataset = datasets.MNIST( + self.data_dir, train=False, download=True, transform=transform + ) + + print( + "Image Shape: {}".format(self.train_dataset[0][0].numpy().shape), + end="\n\n", + ) + print("Training Set: {} samples".format(len(self.train_dataset))) + print("Validation Set: {} samples".format(len(self.val_dataset))) + print("Test Set: {} samples".format(len(self.test_dataset))) + + def load(self): + train_loader = DataLoader( + dataset=self.train_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + val_loader = DataLoader( + dataset=self.val_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + test_loader = DataLoader( + dataset=self.test_dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + pin_memory=self.pin_memory, + shuffle=self.shuffle, + ) + + return train_loader, val_loader, test_loader + +if __name__ == "__main__": + seed_everything(42) + LR = 0.001 + EPOCHS = 30 + + model = NN(input_size=32 * 32, output_size=10) + device = torch.device("cuda") + optimizer = optim.SGD(model.parameters(), LR) + criterion = nn.CrossEntropyLoss() + train_loader, _, _ = MNISTloader(train_val_split=0.).load() + + model = model.to(device) + + for epoch in range(EPOCHS): + + train_loss_running, train_acc_running = 0, 0 + + for inputs, labels in train_loader: + + inputs, labels = inputs.to(device), labels.to(device) + + optimizer.zero_grad() + + outputs = model(inputs) + + _, predictions = torch.max(outputs, dim=1) + loss = criterion(outputs, labels) + + loss.backward() + optimizer.step() + + train_loss_running += loss.item() * inputs.shape[0] + train_acc_running += torch.sum(predictions == labels.data) + + train_loss = train_loss_running / len(train_loader.sampler) + train_acc = train_acc_running / len(train_loader.sampler) + + info = "Epoch: {:3}/{} \t train_loss: {:.3f} \t train_acc: {:.3f}" + Logger()(info.format(epoch + 1, EPOCHS, train_loss, train_acc)) + torch.save(model.state_dict(), "model.pt") \ No newline at end of file