diff --git a/megatron/__init__.py b/megatron/__init__.py index fe37132ffb..39d95e0741 100644 --- a/megatron/__init__.py +++ b/megatron/__init__.py @@ -12,35 +12,42 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import torch +import os + +logger = logging.getLogger(__name__) + +from .package_info import ( + __description__, + __contact_names__, + __url__, + __download_url__, + __keywords__, + __license__, + __package_name__, + __version__, +) -from .global_vars import get_args -from .global_vars import get_current_global_batch_size -from .global_vars import get_num_microbatches -from .global_vars import get_signal_handler -from .global_vars import update_num_microbatches -from .global_vars import get_tokenizer -from .global_vars import get_tensorboard_writer -from .global_vars import get_adlr_autoresume -from .global_vars import get_timers -from .initialize import initialize_megatron +if "MEGATRON_SETUP" not in os.environ: + from .global_vars import get_args + from .global_vars import get_current_global_batch_size + from .global_vars import get_num_microbatches + from .global_vars import get_signal_handler + from .global_vars import update_num_microbatches + from .global_vars import get_tokenizer + from .global_vars import get_tensorboard_writer + from .global_vars import get_adlr_autoresume + from .global_vars import get_timers + from .initialize import initialize_megatron def print_rank_0(message): - """If distributed is initialized, print only on rank 0.""" - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == 0: - print(message, flush=True) - else: - print(message, flush=True) + logger.info(str(message)) def is_last_rank(): return torch.distributed.get_rank() == ( torch.distributed.get_world_size() - 1) def print_rank_last(message): - """If distributed is initialized, print only on last rank.""" - if torch.distributed.is_initialized(): - if is_last_rank(): - print(message, flush=True) - else: - print(message, flush=True) + logger.info(str(message)) diff --git a/megatron/arguments.py b/megatron/arguments.py index e50ce48fe3..39d9dd21be 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -16,10 +16,13 @@ """Megatron arguments.""" import argparse +import logging import os import torch +logger = logging.getLogger(__name__) + def parse_args(extra_args_provider=None, defaults={}, ignore_unknown_args=False): """Parse all arguments.""" @@ -74,13 +77,12 @@ def parse_args(extra_args_provider=None, defaults={}, 'size ({})'.format(args.world_size, args.tensor_model_parallel_size, args.pipeline_model_parallel_size) args.data_parallel_size = args.world_size // model_parallel_size - if args.rank == 0: - print('using world size: {}, data-parallel-size: {}, ' - 'tensor-model-parallel size: {}, ' - 'pipeline-model-parallel size: {} '.format( - args.world_size, args.data_parallel_size, - args.tensor_model_parallel_size, - args.pipeline_model_parallel_size), flush=True) + logger.info('using world size: {}, data-parallel-size: {}, ' + 'tensor-model-parallel size: {}, ' + 'pipeline-model-parallel size: {} '.format( + args.world_size, args.data_parallel_size, + args.tensor_model_parallel_size, + args.pipeline_model_parallel_size)) if args.pipeline_model_parallel_size > 1: if args.pipeline_model_parallel_split_rank is not None: assert args.pipeline_model_parallel_split_rank < \ @@ -112,11 +114,9 @@ def parse_args(extra_args_provider=None, defaults={}, # arguments that are passed to the program. We check this by # ensuring the arg is set to None. if getattr(args, key) is not None: - if args.rank == 0: - print('WARNING: overriding default arguments for {key}:{v} \ - with {key}:{v2}'.format(key=key, v=defaults[key], - v2=getattr(args, key)), - flush=True) + logger.warning('Overriding default arguments for {key}:{v} \ + with {key}:{v2}'.format(key=key, v=defaults[key], + v2=getattr(args, key))) else: setattr(args, key, defaults[key]) @@ -125,9 +125,8 @@ def parse_args(extra_args_provider=None, defaults={}, assert args.micro_batch_size > 0 if args.global_batch_size is None: args.global_batch_size = args.micro_batch_size * args.data_parallel_size - if args.rank == 0: - print('setting global batch size to {}'.format( - args.global_batch_size), flush=True) + logger.info('setting global batch size to {}'.format( + args.global_batch_size)) assert args.global_batch_size > 0 if args.num_layers_per_virtual_pipeline_stage is not None: assert args.pipeline_model_parallel_size > 2, \ @@ -154,13 +153,10 @@ def parse_args(extra_args_provider=None, defaults={}, # be done in fp32. if not args.accumulate_allreduce_grads_in_fp32: args.accumulate_allreduce_grads_in_fp32 = True - if args.rank == 0: - print('accumulate and all-reduce gradients in fp32 for ' - 'bfloat16 data type.', flush=True) + logger.info('accumulate and all-reduce gradients in fp32 for ' + 'bfloat16 data type.') - if args.rank == 0: - print('using {} for parameters ...'.format(args.params_dtype), - flush=True) + logger.info('using {} for parameters ...'.format(args.params_dtype)) # If we do accumulation and all-reduces in fp32, we need to have local DDP # and we should make sure use-contiguous-buffers-in-local-ddp is not off. @@ -275,17 +271,14 @@ def parse_args(extra_args_provider=None, defaults={}, def _print_args(args): """Print arguments.""" - if args.rank == 0: - print('------------------------ arguments ------------------------', - flush=True) - str_list = [] - for arg in vars(args): - dots = '.' * (48 - len(arg)) - str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg))) - for arg in sorted(str_list, key=lambda x: x.lower()): - print(arg, flush=True) - print('-------------------- end of arguments ---------------------', - flush=True) + logger.info('------------------------ arguments ------------------------') + str_list = [] + for arg in vars(args): + dots = '.' * (48 - len(arg)) + str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg))) + for arg in sorted(str_list, key=lambda x: x.lower()): + logger.info(arg) + logger.info('-------------------- end of arguments ---------------------') def _check_arg_is_not_none(args, arg): @@ -350,8 +343,12 @@ def _add_network_size_args(parser): def _add_logging_args(parser): group = parser.add_argument_group(title='logging') + group.add_argument('--name', type=str, default=None, + help='A name for the experiment.') group.add_argument('--log-params-norm', action='store_true', help='If set, calculate and log parameters norm.') + group.add_argument('--log-scales', action='store_true', + help='Log the scales of parameters, gradients and activations.') group.add_argument('--log-num-zeros-in-grad', action='store_true', help='If set, calculate and log the number of zeros in gradient.') group.add_argument('--tensorboard-log-interval', type=int, default=1, @@ -708,6 +705,8 @@ def _add_data_args(parser): '1) a single data path, 2) multiple datasets in the' 'form: dataset1-weight dataset1-path dataset2-weight ' 'dataset2-path ...') + group.add_argument('--indexmap-path', type=str, default=None, + help='Path for intermediate data files') group.add_argument('--split', type=str, default='969, 30, 1', help='Comma-separated list of proportions for training,' ' validation, and test split. For example the split ' diff --git a/megatron/data/bert_dataset.py b/megatron/data/bert_dataset.py index 916a3be065..b2c9a7893d 100644 --- a/megatron/data/bert_dataset.py +++ b/megatron/data/bert_dataset.py @@ -118,7 +118,7 @@ def build_training_sample(sample, if binary_head: # We assume that we have at least two sentences in the sample assert len(sample) > 1 - assert target_seq_length <= max_seq_length + assert target_seq_length <= max_seq_length-2 # Divide sample into two segments (A and B). if binary_head: diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index f7b3b961b8..7e4956c40f 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -1,8 +1,10 @@ import os import time +from pathlib import Path import numpy as np import torch +import torch.distributed from megatron import get_args, get_tokenizer, mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, \ @@ -146,6 +148,12 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name + # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ not os.path.isfile(indexmap_filename): @@ -184,13 +192,15 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( - group=mpu.get_data_parallel_group()) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 426e965c85..a7571abcb1 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -22,9 +22,11 @@ import os import time import collections +from pathlib import Path import numpy as np import torch +import torch.distributed from megatron import ( get_args, @@ -446,7 +448,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, prefixes[i], data_impl, splits_string, datasets_train_valid_test_num_samples[i], max_seq_length, masked_lm_prob, short_seq_prob, - seed, skip_warmup, binary_head, dataset_type=dataset_type) + seed, skip_warmup, binary_head,max_seq_length_dec, dataset_type=dataset_type) if train_ds: train_datasets.append(train_ds) if valid_ds: @@ -661,6 +663,12 @@ def get_samples_mapping(indexed_dataset, indexmap_filename += '_{}s'.format(seed) indexmap_filename += '.npy' + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(args.indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name + # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0 and \ not os.path.isfile(indexmap_filename): @@ -696,15 +704,17 @@ def get_samples_mapping(indexed_dataset, print_rank_0(' > elasped time to build and save samples mapping ' '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) - assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + + # Wait until rank 0 generate the index file. + print_rank_0(f"Barrier device {int(os.environ['LOCAL_RANK'])}") + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index e6c64e975d..fa1efb6ace 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -17,11 +17,13 @@ import os import time +from pathlib import Path import numpy as np import torch +import torch.distributed -from megatron import mpu, print_rank_0 +from megatron import mpu, print_rank_0, get_args from megatron.data.blendable_dataset import BlendableDataset from megatron.data.dataset_utils import get_datasets_weights_and_num_samples from megatron.data.dataset_utils import get_train_valid_test_split_ @@ -211,6 +213,14 @@ def _build_index_mappings(name, data_prefix, documents, sizes, sample_idx_filename = _filename + '_sample_idx.npy' shuffle_idx_filename = _filename + '_shuffle_idx.npy' + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(args.indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + doc_idx_filename = indexmap_path/Path(doc_idx_filename).name + sample_idx_filename = indexmap_path/Path(sample_idx_filename).name + shuffle_idx_filename = indexmap_path/Path(shuffle_idx_filename).name + # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0: if (not os.path.isfile(doc_idx_filename)) or \ @@ -293,15 +303,16 @@ def _build_index_mappings(name, data_prefix, documents, sizes, print_rank_0(' > elasped time to build and save shuffle-idx mapping' ' (seconds): {:4f}'.format(time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) - assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) + + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if doc_idx_filename.is_file() and sample_idx_filename.is_file() and shuffle_idx_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index files...") + time.sleep(1.0) # Load mappings. start_time = time.time() diff --git a/megatron/data/indexed_dataset.py b/megatron/data/indexed_dataset.py index 1251066232..3bae8f1a6f 100644 --- a/megatron/data/indexed_dataset.py +++ b/megatron/data/indexed_dataset.py @@ -18,7 +18,7 @@ import numpy as np import torch -from megatron import print_rank_0 +#from megatron import print_rank_0 def __best_fitting_dtype(vocab_size=None): @@ -401,21 +401,21 @@ def __init__(self, path, skip_warmup=False): offset = stream.tell() if not skip_warmup: - print_rank_0(" warming up index mmap file...") + #print_rank_0(" warming up index mmap file...") _warmup_mmap_file(path) self._bin_buffer_mmap = np.memmap(path, mode='r', order='C') self._bin_buffer = memoryview(self._bin_buffer_mmap) - print_rank_0(" reading sizes...") + #print_rank_0(" reading sizes...") self._sizes = np.frombuffer( self._bin_buffer, dtype=np.int32, count=self._len, offset=offset) - print_rank_0(" reading pointers...") + #print_rank_0(" reading pointers...") self._pointers = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes) - print_rank_0(" reading document index...") + #print_rank_0(" reading document index...") self._doc_idx = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._doc_count, offset=offset + self._sizes.nbytes + self._pointers.nbytes) @@ -462,11 +462,11 @@ def _do_init(self, path, skip_warmup): self._index = self.Index(index_file_path(self._path), skip_warmup) if not skip_warmup: - print_rank_0(" warming up data mmap file...") + #print_rank_0(" warming up data mmap file...") _warmup_mmap_file(data_file_path(self._path)) - print_rank_0(" creating numpy buffer of mmap...") + #print_rank_0(" creating numpy buffer of mmap...") self._bin_buffer_mmap = np.memmap(data_file_path(self._path), mode='r', order='C') - print_rank_0(" creating memory view of numpy buffer...") + #print_rank_0(" creating memory view of numpy buffer...") self._bin_buffer = memoryview(self._bin_buffer_mmap) def __del__(self): diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index aecf5549a7..e313c42f53 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -1,8 +1,10 @@ import os import time +from pathlib import Path import numpy as np import torch +import torch.distributed from megatron import mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, pad_and_convert_to_numpy @@ -136,6 +138,12 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name + # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ not os.path.isfile(indexmap_filename): @@ -174,13 +182,15 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( - group=mpu.get_data_parallel_group()) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/fused_kernels/__init__.py b/megatron/fused_kernels/__init__.py index 0a234f20ab..c1354cdc1e 100644 --- a/megatron/fused_kernels/__init__.py +++ b/megatron/fused_kernels/__init__.py @@ -24,7 +24,7 @@ # leading to recompilation of fused kernels. Set it to empty string # to avoid recompilation and assign arch flags explicity in # extra_cuda_cflags below -os.environ["TORCH_CUDA_ARCH_LIST"] = "" +#os.environ["TORCH_CUDA_ARCH_LIST"] = "" def load(args): diff --git a/megatron/global_vars.py b/megatron/global_vars.py index 59f5960adc..7977db864d 100644 --- a/megatron/global_vars.py +++ b/megatron/global_vars.py @@ -267,9 +267,5 @@ def log(self, names, normalizer=1.0, reset=True): elapsed_time = self.timers[name].elapsed( reset=reset) * 1000.0 / normalizer string += ' | {}: {:.2f}'.format(name, elapsed_time) - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == ( - torch.distributed.get_world_size() - 1): - print(string, flush=True) - else: - print(string, flush=True) + from megatron import print_rank_last + print_rank_last(string) diff --git a/megatron/initialize.py b/megatron/initialize.py index 5c4c4e54b0..03ea26c5f1 100644 --- a/megatron/initialize.py +++ b/megatron/initialize.py @@ -31,7 +31,8 @@ from megatron.global_vars import set_global_variables from megatron.mpu import (set_tensor_model_parallel_rank, set_tensor_model_parallel_world_size) - +import logging +logger = logging.getLogger(__name__) def initialize_megatron(extra_args_provider=None, args_defaults={}, ignore_unknown_args=False, allow_no_cuda=False): @@ -61,7 +62,7 @@ def finish_mpu_init(): # Random seeds for reproducibility. if args.rank == 0: - print('> setting random seeds to {} ...'.format(args.seed)) + logger.info('> setting random seeds to {} ...'.format(args.seed)) _set_random_seed(args.seed, args.data_parallel_random_init) # Set pytorch JIT layer fusion options. @@ -101,11 +102,11 @@ def _compile_dependencies(): # TODO: move this to ninja if torch.distributed.get_rank() == 0: start_time = time.time() - print('> compiling dataset index builder ...') + logger.info('> compiling dataset index builder ...') from megatron.data.dataset_utils import compile_helper compile_helper() - print('>>> done with dataset index builder. Compilation time: {:.3f} ' - 'seconds'.format(time.time() - start_time), flush=True) + logger.info('>>> done with dataset index builder. Compilation time: {:.3f} ' + 'seconds'.format(time.time() - start_time)) # ================== # Load fused kernels @@ -124,29 +125,37 @@ def _compile_dependencies(): if not ((args.fp16 or args.bf16) and custom_kernel_constraint and args.masked_softmax_fusion): - if args.rank == 0: - print('WARNING: constraints for invoking optimized' - ' fused softmax kernel are not met. We default' - ' back to unfused kernel invocations.', flush=True) - + logger.warning('constraints for invoking optimized' + ' fused softmax kernel are not met. We default' + ' back to unfused kernel invocations.') + + start_time = time.time() + logger.info(f'> compiling and loading fused kernels ...') + fused_kernels.load(args) + logger.info("Compile done, waiting for others to compile") + torch.distributed.barrier() # Always build on rank zero first. - if torch.distributed.get_rank() == 0: - start_time = time.time() - print('> compiling and loading fused kernels ...', flush=True) - fused_kernels.load(args) - torch.distributed.barrier() - else: - torch.distributed.barrier() - fused_kernels.load(args) + # if torch.distributed.get_rank() == 0: + # start_time = time.time() + # logger.info(f'> compiling and loading fused kernels ... (rank = {torch.distributed.get_rank()})') + # fused_kernels.load(args) + # logger.info("Compile done, waiting on barrier") + # torch.distributed.barrier() + # logger.info("Barrier done, waiting for others to compile") + # else: + # logger.info("Waiting for rank 0 to compile") + # torch.distributed.barrier() + # logger.info(f'> compiling and loading fused kernels ... (rank = {torch.distributed.get_rank()})') + # fused_kernels.load(args) + # logger.info("Compile done, waiting on barrier") # Simple barrier to make sure all ranks have passed the # compilation phase successfully before moving on to the # rest of the program. We think this might ensure that # the lock is released. - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: - print('>>> done with compiling and loading fused kernels. ' - 'Compilation time: {:.3f} seconds'.format( - time.time() - start_time), flush=True) + #torch.distributed.barrier() + logger.info('>>> done with compiling and loading fused kernels. ' + 'Compilation time: {:.3f} seconds'.format( + time.time() - start_time)) @@ -158,15 +167,15 @@ def _initialize_distributed(): if torch.distributed.is_initialized(): if args.rank == 0: - print('torch distributed is already initialized, ' - 'skipping initialization ...', flush=True) + logger.info('torch distributed is already initialized, ' + 'skipping initialization ...') args.rank = torch.distributed.get_rank() args.world_size = torch.distributed.get_world_size() else: if args.rank == 0: - print('> initializing torch distributed ...', flush=True) + logger.info('> initializing torch distributed ...') # Manually set the device ids. if device_count > 0: device = args.rank % device_count @@ -176,17 +185,17 @@ def _initialize_distributed(): else: args.local_rank = device torch.cuda.set_device(device) - # Call the init process - torch.distributed.init_process_group( - backend=args.distributed_backend, - world_size=args.world_size, rank=args.rank, - timeout=timedelta(minutes=10)) + # Call the init process + torch.distributed.init_process_group( + backend=args.distributed_backend, + world_size=args.world_size, rank=args.rank, + timeout=timedelta(minutes=10)) # Set the tensor model-parallel, pipeline model-parallel, and # data-parallel communicators. if device_count > 0: if mpu.model_parallel_is_initialized(): - print('model parallel is already initialized') + logger.info('model parallel is already initialized') else: mpu.initialize_model_parallel(args.tensor_model_parallel_size, args.pipeline_model_parallel_size, diff --git a/megatron/metrics.py b/megatron/metrics.py new file mode 100644 index 0000000000..8883a9dbb5 --- /dev/null +++ b/megatron/metrics.py @@ -0,0 +1,73 @@ +import logging +import math + +import torch +from megatron.global_vars import get_args + +logger = logging.getLogger(__name__) + +_iteration=0 +_metrics={} +_LOGGING_WIDTH=50 + +def next_iteration(iteration:int): + global _iteration, _metrics + _metrics={} + _iteration=iteration + + +def record_scale(name:str,x:torch.Tensor,grad=True, bias=None): + global _metrics + if get_log_scales(): + _metrics[f"{name}.scale" if grad else name]=get_scale(x if bias is None else x+bias) + if grad and x.requires_grad: + x.register_hook(lambda g: record_scale(f"{name}.grad",g,False)) + + +def get_scale(x): + return x.detach().float().pow(2).mean().pow(0.5) + + +def get_log_scales(): + args=get_args() + return args.log_scales and (_iteration+1) % args.log_interval == 0 + + +def log_metrics(): + metrics = {} + for key, value in _metrics.items(): + metrics_ = metrics + keys = key.split(".") + for prefix in keys[:-1]: + if prefix not in metrics_: + metrics_[prefix] = {} + metrics_ = metrics_[prefix] + metrics_[keys[-1]] = _format_value(value) + _log_dicts(metrics) + + +def _log_dicts(metrics, indent=0): + for key, value in metrics.items(): + key_ = key.rjust(len(key) + indent) + + # Merge keys when there is only one entry. + while isinstance(value, dict) and len(value) == 1: + for value_key, value_ in value.items(): + key_ = ".".join([key_, value_key]) + value = value_ + if isinstance(value, dict): + logger.info(key_ + ":") + _log_dicts(value, indent + 2) + else: + sep = _LOGGING_WIDTH - len(value) - len(key_) - 2 + logger.info(f"{key_.ljust(len(key_)+sep,'.')} {value}") + + +def _format_value(value, precision=5,max_leading_zeros=3): + decimals = 0 if value == 0 or not math.isfinite(value) else precision - math.floor(math.log10(abs(value))) + + if 0 <= decimals <= precision + max_leading_zeros: + value = f"{value:.{decimals}f}" + else: + value = f"{value:.{precision}e}" + return value \ No newline at end of file diff --git a/megatron/model/bert_model.py b/megatron/model/bert_model.py index 3ff5039d5f..a649885760 100644 --- a/megatron/model/bert_model.py +++ b/megatron/model/bert_model.py @@ -15,10 +15,12 @@ """BERT model.""" +import logging import torch from megatron import get_args from megatron import mpu +from megatron.metrics import record_scale from megatron.model.enums import AttnMaskType from megatron.model.language_model import parallel_lm_logits from megatron.model.language_model import get_language_model @@ -67,18 +69,20 @@ class BertLMHead(MegatronModule): """ def __init__(self, mpu_vocab_size, hidden_size, init_method, - layernorm_epsilon, parallel_output): + layernorm_epsilon, parallel_output, name_=""): super(BertLMHead, self).__init__() + self.name_=name_ args = get_args() self.bias = torch.nn.Parameter(torch.zeros(mpu_vocab_size)) + self.bias.name_=f"{self.name_}.logits.linear_bias" mpu.set_tensor_model_parallel_attributes(self.bias, True, 0, 1) self.parallel_output = parallel_output - self.dense = get_linear_layer(hidden_size, hidden_size, init_method) - self.layernorm = LayerNorm(hidden_size, eps=layernorm_epsilon) + self.dense = get_linear_layer(hidden_size, hidden_size, init_method, name_=f"{self.name_}.dense") + self.layernorm = LayerNorm(hidden_size, eps=layernorm_epsilon, name_=f"{self.name_}.layer_norm") self.gelu = torch.nn.functional.gelu if args.openai_gelu: self.gelu = openai_gelu @@ -86,13 +90,16 @@ def __init__(self, mpu_vocab_size, hidden_size, init_method, self.gelu = erf_gelu def forward(self, hidden_states, word_embeddings_weight): + record_scale(f"{self.name_}.hidden",hidden_states) hidden_states = self.dense(hidden_states) hidden_states = self.gelu(hidden_states) + record_scale(f"{self.name_}.gelu",hidden_states) hidden_states = self.layernorm(hidden_states) output = parallel_lm_logits(hidden_states, word_embeddings_weight, self.parallel_output, bias=self.bias) + record_scale(f"{self.name_}.logits",output) return output @@ -129,9 +136,11 @@ def __init__(self, add_binary_head=True, parallel_output=True, pre_process=True, - post_process=True): + post_process=True, + name_="bert"): super(BertModel, self).__init__() args = get_args() + self.name_=name_ self.fp16_lm_cross_entropy = args.fp16_lm_cross_entropy self.add_binary_head = add_binary_head @@ -150,18 +159,20 @@ def __init__(self, init_method=init_method, scaled_init_method=scaled_init_method, pre_process=self.pre_process, - post_process=self.post_process) + post_process=self.post_process, + name_=self.name_) self.initialize_word_embeddings(init_method_normal) if self.post_process: self.lm_head = BertLMHead( self.word_embeddings_weight().size(0), - args.hidden_size, init_method, args.layernorm_epsilon, parallel_output) + args.hidden_size, init_method, args.layernorm_epsilon, parallel_output, + name_=f"{self.name_}.output_layer.lm_head") self._lm_head_key = 'lm_head' self.binary_head = None if self.add_binary_head: self.binary_head = get_linear_layer(args.hidden_size, 2, - init_method) + init_method, name_=f"{self.name_}.output_layer.sop_head.binary_head") self._binary_head_key = 'binary_head' def set_input_tensor(self, input_tensor): diff --git a/megatron/model/fused_layer_norm.py b/megatron/model/fused_layer_norm.py index 5c7e066b5d..8c71138bad 100644 --- a/megatron/model/fused_layer_norm.py +++ b/megatron/model/fused_layer_norm.py @@ -23,6 +23,7 @@ from torch.nn import init import importlib +from megatron.metrics import record_scale try: from apex.contrib.layer_norm.layer_norm import FastLayerNormFN HAVE_PERSIST_LAYER_NORM = True @@ -67,8 +68,9 @@ def backward(ctx, grad_output): class MixedFusedLayerNorm(torch.nn.Module): - def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True): + def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True, name_=""): super(MixedFusedLayerNorm, self).__init__() + self.name_=name_ global fused_mix_prec_layer_norm_cuda fused_mix_prec_layer_norm_cuda = importlib.import_module( @@ -89,7 +91,9 @@ def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True): self.normalized_shape = torch.Size(normalized_shape) self.eps = eps self.weight = Parameter(torch.Tensor(*normalized_shape)) + self.weight.name_=f"{self.name_}.layer_norm_weight" self.bias = Parameter(torch.Tensor(*normalized_shape)) + self.bias.name_=f"{self.name_}.layer_norm_bias" self.reset_parameters() self.no_persist_layer_norm = no_persist_layer_norm @@ -101,11 +105,12 @@ def reset_parameters(self): def forward(self, input): - if self.no_persist_layer_norm: - return FusedLayerNormAffineFunction.apply( + output= FusedLayerNormAffineFunction.apply( input, self.weight, self.bias, self.normalized_shape, self.eps) else: - return FastLayerNormFN.apply( + output= FastLayerNormFN.apply( input, self.weight, self.bias, self.eps) + record_scale(self.name_, output) + return output diff --git a/megatron/model/language_model.py b/megatron/model/language_model.py index 96e1a51353..84004a29f7 100644 --- a/megatron/model/language_model.py +++ b/megatron/model/language_model.py @@ -21,6 +21,7 @@ from megatron import get_args from megatron import mpu from .module import MegatronModule +from megatron.metrics import record_scale from megatron.model.enums import LayerType, AttnMaskType from megatron.model.transformer import ParallelTransformer from megatron.model.utils import get_linear_layer @@ -48,7 +49,7 @@ def get_language_model(num_tokentypes, add_pooler, scaled_init_method=None, add_encoder=True, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, - pre_process=True, post_process=True): + pre_process=True, post_process=True, name_=""): """Build language model and return along with the key to save.""" args = get_args() @@ -70,7 +71,8 @@ def get_language_model(num_tokentypes, add_pooler, decoder_attn_mask_type=decoder_attn_mask_type, add_pooler=add_pooler, pre_process=pre_process, - post_process=post_process + post_process=post_process, + name_=name_ ) # key used for checkpoints. language_model_key = 'language_model' @@ -90,16 +92,20 @@ class Pooler(MegatronModule): bias is set to zero. """ - def __init__(self, hidden_size, init_method): + def __init__(self, hidden_size, init_method, name_=""): super(Pooler, self).__init__() - self.dense = get_linear_layer(hidden_size, hidden_size, init_method) + self.name_=name_ + self.dense = get_linear_layer(hidden_size, hidden_size, init_method, name_=f"{self.name_}.dense") def forward(self, hidden_states, sequence_index=0): # hidden_states: [b, s, h] # sequence_index: index of the token to pool. + record_scale(f"{self.name_}.input",hidden_states) pooled = hidden_states[:, sequence_index, :] + record_scale(f"{self.name_}.pooled",pooled) pooled = self.dense(pooled) pooled = torch.tanh(pooled) + record_scale(f"{self.name_}.tanh",pooled) return pooled @@ -123,7 +129,8 @@ def __init__(self, max_sequence_length, embedding_dropout_prob, init_method, - num_tokentypes=0): + num_tokentypes=0, + name_=""): super(Embedding, self).__init__() self.hidden_size = hidden_size @@ -131,17 +138,22 @@ def __init__(self, self.num_tokentypes = num_tokentypes args = get_args() + self.name_=name_ # Word embeddings (parallel). self.word_embeddings = mpu.VocabParallelEmbedding( vocab_size, self.hidden_size, init_method=self.init_method) self._word_embeddings_key = 'word_embeddings' + self.word_embeddings.name_=f"{self.name_}.word_embeddings" + self.word_embeddings.weight.name_=f"{self.word_embeddings.name_}.embedding_weight" # Position embedding (serial). self.position_embeddings = torch.nn.Embedding( max_sequence_length, self.hidden_size) self._position_embeddings_key = 'position_embeddings' + self.position_embeddings.name_=f"{self.name_}.position_embeddings" + self.position_embeddings.weight.name_=f"{self.position_embeddings.name_}.embedding_weight" # Initialize the position embeddings. self.init_method(self.position_embeddings.weight) @@ -153,6 +165,8 @@ def __init__(self, if self.num_tokentypes > 0: self.tokentype_embeddings = torch.nn.Embedding(self.num_tokentypes, self.hidden_size) + self.tokentype_embeddings.name_=f"{self.name_}.tokentype_embeddings" + self.tokentype_embeddings.weight.name_=f"{self.tokentype_embeddings.name_}.embedding_weight" # Initialize the token-type embeddings. self.init_method(self.tokentype_embeddings.weight) else: @@ -190,17 +204,24 @@ def add_tokentype_embeddings(self, num_tokentypes): def forward(self, input_ids, position_ids, tokentype_ids=None): # Embeddings. + args=get_args() words_embeddings = self.word_embeddings(input_ids) + record_scale(self.word_embeddings.name_,words_embeddings) position_embeddings = self.position_embeddings(position_ids) + record_scale(self.position_embeddings.name_,position_embeddings) embeddings = words_embeddings + position_embeddings if tokentype_ids is not None: assert self.tokentype_embeddings is not None - embeddings = embeddings + self.tokentype_embeddings(tokentype_ids) + tokentype_embeddings=self.tokentype_embeddings(tokentype_ids) + record_scale(self.tokentype_embeddings.name_,tokentype_embeddings) + embeddings = embeddings + tokentype_embeddings else: assert self.tokentype_embeddings is None + record_scale(f"{self.name_}.embeddings",embeddings) # Dropout. embeddings = self.embedding_dropout(embeddings) + record_scale(f"{self.name_}.dropout",embeddings) return embeddings @@ -290,9 +311,11 @@ def __init__(self, decoder_attn_mask_type=AttnMaskType.causal, add_pooler=False, pre_process=True, - post_process=True): + post_process=True, + name_=""): super(TransformerLanguageModel, self).__init__() args = get_args() + self.name_ = name_ self.pre_process = pre_process self.post_process = post_process @@ -313,7 +336,8 @@ def __init__(self, args.max_position_embeddings, args.hidden_dropout, self.init_method, - self.num_tokentypes) + self.num_tokentypes, + name_=f"{self.name_}.input_layer.embedding") self._embedding_key = 'embedding' # Transformer. @@ -325,7 +349,8 @@ def __init__(self, output_layer_init_method, self_attn_mask_type=self.encoder_attn_mask_type, pre_process=self.pre_process, - post_process=self.post_process + post_process=self.post_process, + name_=self.name_, ) self._encoder_key = 'encoder' else: @@ -340,7 +365,9 @@ def __init__(self, layer_type=LayerType.decoder, self_attn_mask_type=self.decoder_attn_mask_type, pre_process=self.pre_process, - post_process=self.post_process) + post_process=self.post_process, + name_=f"{self.name_}.decoder" + ) self._decoder_key = 'decoder' else: self.decoder = None @@ -348,7 +375,7 @@ def __init__(self, if self.post_process: # Pooler. if self.add_pooler: - self.pooler = Pooler(self.hidden_size, self.init_method) + self.pooler = Pooler(self.hidden_size, self.init_method, name_=f"{self.name_}.output_layer.sop_head") self._pooler_key = 'pooler' def set_input_tensor(self, input_tensor): diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 54dbe6fd5f..68cdb48c70 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -20,6 +20,7 @@ from megatron import get_args from megatron import mpu +from megatron.metrics import record_scale from .module import MegatronModule from megatron.model.enums import AttnMaskType, ModelType, LayerType, AttnType from megatron.model import LayerNorm @@ -50,9 +51,10 @@ class ParallelMLP(MegatronModule): state back into h hidden dimension. """ - def __init__(self, init_method, output_layer_init_method): + def __init__(self, init_method, output_layer_init_method, name_=""): super(ParallelMLP, self).__init__() args = get_args() + self.name_=name_ # Project to 4h. self.dense_h_to_4h = mpu.ColumnParallelLinear( @@ -60,7 +62,8 @@ def __init__(self, init_method, output_layer_init_method): args.ffn_hidden_size, gather_output=False, init_method=init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{name_}.dense_0") self.bias_gelu_fusion = args.bias_gelu_fusion self.activation_func = F.gelu @@ -75,7 +78,8 @@ def __init__(self, init_method, output_layer_init_method): args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{name_}.dense_1") def forward(self, hidden_states): @@ -89,6 +93,7 @@ def forward(self, hidden_states): intermediate_parallel = \ self.activation_func(intermediate_parallel + bias_parallel) + record_scale(f"{self.name_}.gelu", intermediate_parallel) # [s, b, h] output, output_bias = self.dense_4h_to_h(intermediate_parallel) return output, output_bias @@ -104,9 +109,11 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding): + attn_mask_type=AttnMaskType.padding, + name_=""): super(ParallelAttention, self).__init__() args = get_args() + self.name_=name_ self.fp16 = args.fp16 self.bf16 = args.bf16 @@ -136,20 +143,23 @@ def __init__(self, init_method, args.hidden_size, 3 * projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.query_key_value") else: assert attention_type == AttnType.cross_attn self.query = mpu.ColumnParallelLinear( args.hidden_size, projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.query") self.key_value = mpu.ColumnParallelLinear( args.hidden_size, 2 * projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.key_value") coeff = None self.norm_factor = math.sqrt(self.hidden_size_per_attention_head) @@ -176,7 +186,8 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{self.name_}.dense") def _allocate_memory(self, inference_max_sequence_len, batch_size): @@ -252,6 +263,9 @@ def forward(self, hidden_states, attention_mask, self.hidden_size_per_attention_head) query_layer = query_layer.view(*new_tensor_shape) + record_scale(f"{self.name_}.query_layer", query_layer) + record_scale(f"{self.name_}.key_layer", key_layer) + record_scale(f"{self.name_}.value_layer", value_layer) # ================================== # Adjust key and value for inference @@ -310,6 +324,7 @@ def forward(self, hidden_states, attention_mask, # change view to [b, np, sq, sk] attention_scores = matmul_result.view(*output_size) + record_scale(f"{self.name_}.attention_scores", attention_scores) # =========================== # Attention probs and dropout @@ -318,6 +333,7 @@ def forward(self, hidden_states, attention_mask, # attention scores and attention mask [b, np, sq, sk] attention_probs = self.scale_mask_softmax(attention_scores, attention_mask) + record_scale(f"{self.name_}.attention_probs", attention_probs) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. @@ -359,6 +375,8 @@ def forward(self, hidden_states, attention_mask, (self.hidden_size_per_partition,) context_layer = context_layer.view(*new_context_layer_shape) + record_scale(f"{self.name_}.context_layer", context_layer) + # ================= # Output. [sq, b, h] # ================= @@ -406,10 +424,12 @@ class ParallelTransformerLayer(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, layer_type=LayerType.encoder, - self_attn_mask_type=AttnMaskType.padding): + self_attn_mask_type=AttnMaskType.padding, + name_=""): args = get_args() super(ParallelTransformerLayer, self).__init__() + self.name_=name_ self.layer_number = layer_number self.layer_type = layer_type @@ -423,7 +443,9 @@ def __init__(self, init_method, output_layer_init_method, self.input_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, - no_persist_layer_norm=args.no_persist_layer_norm) + name_=f"{self.name_}.input_layer_norm", + no_persist_layer_norm=args.no_persist_layer_norm, + ) # Self attention. self.self_attention = ParallelAttention( @@ -431,7 +453,8 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type) + attn_mask_type=self_attn_mask_type, + name_=f"{self.name_}.self_attention") self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -439,23 +462,28 @@ def __init__(self, init_method, output_layer_init_method, self.post_attention_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, - no_persist_layer_norm=args.no_persist_layer_norm) + no_persist_layer_norm=args.no_persist_layer_norm, + name_=f"{self.name_}.post_attention_layer_norm", + ) if self.layer_type == LayerType.decoder: self.inter_attention = ParallelAttention( init_method, output_layer_init_method, layer_number, - attention_type=AttnType.cross_attn) + attention_type=AttnType.cross_attn, + name_=f"{self.name_}.inter_attention") # Layernorm on the attention output. self.post_inter_attention_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, - no_persist_layer_norm=args.no_persist_layer_norm) + no_persist_layer_norm=args.no_persist_layer_norm, + name_=f"{self.name_}.post_inter_attention_layer_norm", + ) # MLP self.mlp = ParallelMLP(init_method, - output_layer_init_method) + output_layer_init_method, name_=f"{self.name_}.mlp") def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, @@ -471,11 +499,13 @@ def forward(self, hidden_states, attention_mask, attention_mask, inference_params=inference_params) + record_scale(f"{self.name_}.attention", attention_output, bias=attention_bias) # Residual connection. if self.apply_residual_connection_post_layernorm: residual = layernorm_output else: residual = hidden_states + record_scale(f"{self.name_}.attention_residual_input", residual) # jit scripting for a nn.module (with dropout) is not # trigerring the fusion kernel. For now, we use two @@ -489,6 +519,7 @@ def forward(self, hidden_states, attention_mask, else: bias_dropout_add_func = get_bias_dropout_add(self.training) + # re-enable torch grad to enable fused optimization. with torch.enable_grad(): layernorm_input = bias_dropout_add_func( @@ -497,6 +528,8 @@ def forward(self, hidden_states, attention_mask, residual, self.hidden_dropout) + record_scale(f"{self.name_}.attention_residual", layernorm_input) + # Layer norm post the self attention. layernorm_output = self.post_attention_layernorm(layernorm_input) @@ -505,11 +538,13 @@ def forward(self, hidden_states, attention_mask, self.inter_attention(layernorm_output, enc_dec_attn_mask, encoder_output=encoder_output) + record_scale(f"{self.name_}.inter_attention", attention_output, bias=attention_bias) # residual connection if self.apply_residual_connection_post_layernorm: residual = layernorm_output else: residual = layernorm_input + record_scale(f"{self.name_}.inter_attention_residual_input", residual) # re-enable torch grad to enable fused optimization. with torch.enable_grad(): @@ -518,6 +553,7 @@ def forward(self, hidden_states, attention_mask, attention_bias.expand_as(residual), residual, self.hidden_dropout) + record_scale(f"{self.name_}.inter_attention_residual", layernorm_input) # Layer norm post the decoder attention layernorm_output = self.post_inter_attention_layernorm(layernorm_input) @@ -530,6 +566,7 @@ def forward(self, hidden_states, attention_mask, residual = layernorm_output else: residual = layernorm_input + record_scale(f"{self.name_}.mlp_residual_input", residual) # re-enable torch grad to enable fused optimization. with torch.enable_grad(): @@ -539,6 +576,8 @@ def forward(self, hidden_states, attention_mask, residual, self.hidden_dropout) + record_scale(f"{self.name_}.mlp_residual", layernorm_input) + return output @@ -548,9 +587,11 @@ class ParallelTransformer(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding, - pre_process=True, post_process=True): + pre_process=True, post_process=True, + name_=""): super(ParallelTransformer, self).__init__() args = get_args() + self.name_=name_ self.bf16 = args.bf16 self.fp32_residual_connection = args.fp32_residual_connection @@ -574,7 +615,8 @@ def build_layer(layer_number): output_layer_init_method, layer_number, layer_type=layer_type, - self_attn_mask_type=self_attn_mask_type) + self_attn_mask_type=self_attn_mask_type, + name_=f"{self.name_}.layer_{layer_number-1}.transformer_layer") if args.virtual_pipeline_model_parallel_size is not None: assert args.num_layers % args.virtual_pipeline_model_parallel_size == 0, \ 'num_layers_per_stage must be divisible by ' \ @@ -615,7 +657,9 @@ def build_layer(layer_number): self.final_layernorm = LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, - no_persist_layer_norm=args.no_persist_layer_norm) + no_persist_layer_norm=args.no_persist_layer_norm, + name_=f"{self.name_}.output_layer.final_layer_norm", + ) def _get_layer(self, layer_number): return self.layers[layer_number] diff --git a/megatron/model/utils.py b/megatron/model/utils.py index 465e8aa4ff..d87616c6d9 100644 --- a/megatron/model/utils.py +++ b/megatron/model/utils.py @@ -18,8 +18,7 @@ import math import torch - -from megatron import get_args +from megatron.metrics import record_scale def init_method_normal(sigma): """Init method based on N(0, sigma).""" @@ -31,7 +30,7 @@ def init_(tensor): def scaled_init_method_normal(sigma, num_layers): """Init method based on N(0, sigma/sqrt(2*num_layers).""" - std = sigma / math.sqrt(2.0 * num_layers) + std = sigma / math.sqrt(2.0 * max(num_layers,1)) def init_(tensor): return torch.nn.init.normal_(tensor, mean=0.0, std=std) @@ -44,12 +43,26 @@ def attention_mask_func(attention_scores, attention_mask): return attention_scores -def get_linear_layer(rows, columns, init_method): +def get_linear_layer(rows, columns, init_method, name_=""): """Simple linear layer with weight initialization.""" layer = torch.nn.Linear(rows, columns) init_method(layer.weight) with torch.no_grad(): layer.bias.zero_() + layer.name_=name_ + layer.weight.name_=f"{name_}.linear_weight" + layer.bias.name_=f"{name_}.linear_bias" + + + old_forward=layer.forward + + def forward(input): + output=old_forward(input) + record_scale(layer.name_,output) + return output + + layer.forward=forward + return layer @torch.jit.script diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index 2d10eb6127..3611e8202a 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -36,7 +36,7 @@ from .utils import divide from .utils import split_tensor_along_last_dim from .utils import VocabUtility -from megatron import get_args +from megatron.metrics import get_args, get_log_scales, record_scale _MODEL_PARALLEL_ATTRIBUTE_DEFAULTS = {'tensor_model_parallel': False, @@ -257,8 +257,9 @@ class ColumnParallelLinear(torch.nn.Module): def __init__(self, input_size, output_size, bias=True, gather_output=True, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False): + skip_bias_add=False,name_=""): super(ColumnParallelLinear, self).__init__() + self.name_=name_ # Keep input parameters self.input_size = input_size @@ -288,7 +289,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=0, stride=stride) - + self.weight.name_=f"{self.name_}.linear_weight" if bias: if args.use_cpu_initialization: self.bias = Parameter(torch.empty( @@ -302,6 +303,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() + self.bias.name_ = f"{self.name_}.linear_bias" else: self.register_parameter('bias', None) self.async_tensor_model_parallel_allreduce = ( @@ -334,6 +336,7 @@ def forward(self, input_): else: output = output_parallel output_bias = self.bias if self.skip_bias_add else None + record_scale(self.name_, output, bias=output_bias) return output, output_bias @@ -371,8 +374,9 @@ def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False): + skip_bias_add=False,name_=""): super(RowParallelLinear, self).__init__() + self.name_=name_ # Keep input parameters self.input_size = input_size @@ -402,6 +406,7 @@ def __init__(self, input_size, output_size, bias=True, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=1, stride=stride) + self.weight.name_ = f"{self.name_}.linear_weight" if bias: if args.use_cpu_initialization: self.bias = Parameter(torch.empty(self.output_size, @@ -413,6 +418,7 @@ def __init__(self, input_size, output_size, bias=True, # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() + self.bias.name_ = f"{self.name_}.linear_bias" else: self.register_parameter('bias', None) @@ -434,5 +440,6 @@ def forward(self, input_): else: output = output_ output_bias = self.bias + record_scale(self.name_, output, bias=output_bias) return output, output_bias diff --git a/megatron/optimizer/__init__.py b/megatron/optimizer/__init__.py index 59278691de..3edc612ade 100644 --- a/megatron/optimizer/__init__.py +++ b/megatron/optimizer/__init__.py @@ -13,8 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from apex.optimizers import FusedAdam as Adam -from apex.optimizers import FusedSGD as SGD +import warnings + +try: + from apex.optimizers import FusedAdam as Adam + from apex.optimizers import FusedSGD as SGD +except ImportError: + warnings.warn("Apex not found") from megatron import get_args from megatron.model import LayerNorm @@ -52,6 +57,7 @@ def get_megatron_optimizer(model): # Base optimizer. param_groups = _get_params_for_weight_decay_optimization(model) + print("weight_decay", args.weight_decay) if args.optimizer == 'adam': optimizer = Adam(param_groups, lr=args.lr, diff --git a/megatron/optimizer/clip_grads.py b/megatron/optimizer/clip_grads.py index 36cd915644..7bf82cca25 100644 --- a/megatron/optimizer/clip_grads.py +++ b/megatron/optimizer/clip_grads.py @@ -17,9 +17,13 @@ import torch from torch._six import inf +import warnings -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + warnings.warn("Apex not found") from megatron import mpu from megatron.model.module import param_is_not_shared diff --git a/megatron/optimizer/optimizer.py b/megatron/optimizer/optimizer.py index 7ce2850530..f43372dc22 100644 --- a/megatron/optimizer/optimizer.py +++ b/megatron/optimizer/optimizer.py @@ -17,15 +17,20 @@ from abc import ABC from abc import abstractmethod +import warnings import torch -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + warnings.warn("Apex not found") from megatron import get_timers from megatron import mpu from megatron import print_rank_0 +from megatron.metrics import record_scale,get_log_scales from .clip_grads import clip_grad_norm_fp32, count_zeros_fp32 @@ -142,6 +147,13 @@ def state_dict(self): def load_state_dict(self, state_dict): pass + def _record_scales(self): + if get_log_scales(): + for group in self.optimizer.param_groups: + for p in group['params']: + name_=getattr(p, "name_", "unknown") + record_scale(f"optimizer.{name_}.scale", p, False) + record_scale(f"optimizer.{name_}.grad", p.grad, False) # Promote state so it can be retrieved or set via # "optimizer_instance.state" @@ -252,6 +264,8 @@ def __init__(self, optimizer, clip_grad, log_num_zeros_in_grad, float16_params_this_group.append(param) # Create a copy main_param = param.detach().clone().float() + if hasattr(param, "name_"): + main_param.name_=param.name_ # Copy tensor model parallel attributes. mpu.copy_tensor_model_parallel_attributes(main_param, param) @@ -432,6 +446,7 @@ def step(self): num_zeros_in_grad = self.count_zeros() if \ self.log_num_zeros_in_grad else None + self._record_scales() # Step the optimizer. self.optimizer.step() @@ -537,6 +552,7 @@ def step(self): num_zeros_in_grad = self.count_zeros() if \ self.log_num_zeros_in_grad else None + self._record_scales() # Update parameters. self.optimizer.step() diff --git a/megatron/package_info.py b/megatron/package_info.py new file mode 100644 index 0000000000..bd5decdef0 --- /dev/null +++ b/megatron/package_info.py @@ -0,0 +1,30 @@ +# coding=utf-8 +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MAJOR = 1 +MINOR = 1.5 + +# Use the following formatting: (major, minor) +VERSION = (MAJOR, MINOR) + +__version__ = '.'.join(map(str, VERSION)) +__package_name__ = 'megatron-lm' +__contact_names__ = 'NVIDIA INC' +__url__ = 'https://github.com/NVIDIA/Megatron-LM' +__download_url__ = 'https://github.com/NVIDIA/Megatron-LM/releases' +__description__ = 'Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism.' +__license__ = 'See https://github.com/NVIDIA/Megatron-LM/blob/master/LICENSE' +__keywords__ = 'deep learning, Megatron, gpu, NLP, nvidia, pytorch, torch, language' + diff --git a/megatron/tokenizer/gpt2_tokenization.py b/megatron/tokenizer/gpt2_tokenization.py index 3f37e44908..fb6b44de37 100644 --- a/megatron/tokenizer/gpt2_tokenization.py +++ b/megatron/tokenizer/gpt2_tokenization.py @@ -22,7 +22,10 @@ import json import logging import os -import regex as re +try: + import regex as re +except ImportError: + pass from io import open try: diff --git a/megatron/training.py b/megatron/training.py index 0f458e30d7..978741aa54 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -25,7 +25,7 @@ import torch from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP -from megatron import get_args +from megatron.metrics import get_args, get_log_scales, next_iteration, log_metrics from megatron import get_signal_handler from megatron import get_timers from megatron import get_tensorboard_writer @@ -51,7 +51,9 @@ from megatron.utils import calc_params_l2_norm from megatron.schedules import get_forward_backward_func from megatron.utils import report_memory +import logging +logger = logging.getLogger(__name__) def print_datetime(string): @@ -93,6 +95,7 @@ def pretrain(train_valid_test_dataset_provider, """ # Initalize and get arguments, timers, and Tensorboard writer. + logger.info("Initializing megatron") initialize_megatron(extra_args_provider=extra_args_provider, args_defaults=args_defaults) @@ -104,7 +107,7 @@ def pretrain(train_valid_test_dataset_provider, torch.distributed.all_reduce(start_time_tensor, op=torch.distributed.ReduceOp.MIN) _TRAIN_START_TIME = start_time_tensor.item() - print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( + logger.info('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) print_datetime('after megatron is initialized') @@ -474,11 +477,16 @@ def train_step(forward_step_func, data_iterator, return loss_reduced, skipped_iter, grad_norm, num_zeros_in_grad return {}, skipped_iter, grad_norm, num_zeros_in_grad +last_time=None +first_time=None +num_iters=None def training_log(loss_dict, total_loss_dict, learning_rate, iteration, loss_scale, report_memory_flag, skipped_iter, grad_norm, params_norm, num_zeros_in_grad): """Log training information such as losses, timing, ....""" + + global last_time, first_time, num_iters args = get_args() timers = get_timers() writer = get_tensorboard_writer() @@ -599,6 +607,9 @@ def add_to_logging(name): iteration, ) + if get_log_scales(): + log_metrics() + if iteration % args.log_interval == 0: elapsed_time = timers('interval-time').elapsed() elapsed_time_per_iteration = elapsed_time / total_iterations @@ -606,7 +617,10 @@ def add_to_logging(name): if args.log_timers_to_tensorboard: writer.add_scalar('iteration-time', elapsed_time_per_iteration, iteration) - log_string = ' iteration {:8d}/{:8d} |'.format( + log_string = '' + if args.name is not None: + log_string += ' {} |'.format(args.name) + log_string += ' iteration {:8d}/{:8d} |'.format( iteration, args.train_iters) log_string += ' consumed samples: {:12d} |'.format( args.consumed_train_samples) @@ -633,14 +647,29 @@ def add_to_logging(name): total_loss_dict[skipped_iters_key]) log_string += ' number of nan iterations: {:3d} |'.format( total_loss_dict[nan_iters_key]) + current_time=time.perf_counter() + # Skip the slower first batch to be more accurate + if first_time is None: + first_time=current_time + num_iters=0 + else: + num_iters+=1 + log_string += f' batch time: {1000*(current_time-last_time)/args.log_interval:.2f} ms |' + log_string += f' avg time: {1000*(current_time-first_time)/num_iters/args.log_interval:.2f} ms |' + last_time=current_time + + log_string += f' memory: {torch.cuda.memory_allocated():,} |' + log_string += f' max memory: {torch.cuda.max_memory_allocated():,} |' + log_string += f' max reserved: {torch.cuda.max_memory_reserved():,}' + torch.cuda.reset_peak_memory_stats() total_loss_dict[advanced_iters_key] = 0 total_loss_dict[skipped_iters_key] = 0 total_loss_dict[nan_iters_key] = 0 print_rank_last(log_string) - if report_memory_flag and learning_rate > 0.: - # Report memory after optimizer state has been initialized. - report_memory('(after {} iterations)'.format(iteration)) - report_memory_flag = False + #if report_memory_flag and learning_rate > 0.: + # # Report memory after optimizer state has been initialized. + # report_memory('(after {} iterations)'.format(iteration)) + # report_memory_flag = False timers.log(timers_to_log, normalizer=args.log_interval) return report_memory_flag @@ -681,6 +710,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler, print_datetime('before the start of training step') report_memory_flag = True while iteration < args.train_iters: + next_iteration(iteration) update_num_microbatches(args.consumed_train_samples) loss_dict, skipped_iter, grad_norm, num_zeros_in_grad = \ train_step(forward_step_func, @@ -901,8 +931,10 @@ def build_train_valid_test_data_iterators( # Need to broadcast num_tokens and num_type_tokens. flags = torch.cuda.LongTensor( [int(do_train), int(do_valid), int(do_test)]) + logger.info("Broadcasting data iterator") else: flags = torch.cuda.LongTensor([0, 0, 0]) + logger.info("Waiting for data iterator") # Broadcast num tokens. torch.distributed.broadcast(flags, diff --git a/megatron/utils.py b/megatron/utils.py index 6289c405ac..2714a52c59 100644 --- a/megatron/utils.py +++ b/megatron/utils.py @@ -20,8 +20,11 @@ import torch from torch.nn.parallel import DistributedDataParallel as torchDDP -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + pass from megatron import get_args from megatron import print_rank_0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000..ec661c9cc7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pybind11 +torch +six +regex +numpy +ninja +nltk \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000..9cd8161b78 --- /dev/null +++ b/setup.py @@ -0,0 +1,93 @@ +# coding=utf-8 +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Setup for pip package.""" + +import os +import sys +import setuptools + +if sys.version_info < (3,): + raise Exception("Python 2 is not supported by Megatron.") + +os.environ["MEGATRON_SETUP"]="TRUE" + +from megatron.package_info import ( + __description__, + __contact_names__, + __url__, + __download_url__, + __keywords__, + __license__, + __package_name__, + __version__, +) + +with open("README.md", "r") as fh: + long_description = fh.read() + +############################################################################### +# Dependency Loading # +# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # + + +def req_file(filename): + with open(filename) as f: + content = f.readlines() + return [x.strip() for x in content] + + +install_requires = req_file("requirements.txt") + +setuptools.setup( + name=__package_name__, + # Versions should comply with PEP440. For a discussion on single-sourcing + # the version across setup.py and the project code, see + # https://packaging.python.org/en/latest/single_source_version.html + version=__version__, + description=__description__, + long_description=long_description, + long_description_content_type="text/markdown", + # The project's main homepage. + url=__url__, + author=__contact_names__, + maintainer=__contact_names__, + # The licence under which the project is released + license=__license__, + classifiers=[ + 'Intended Audience :: Developers', + 'Intended Audience :: Science/Research', + 'Intended Audience :: Information Technology', + # Indicate what your project relates to + 'Topic :: Scientific/Engineering :: Artificial Intelligence', + 'Topic :: Software Development :: Libraries :: Python Modules', + # Supported python versions + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + # Additional Setting + 'Environment :: Console', + 'Natural Language :: English', + 'Operating System :: OS Independent', + ], + python_requires='>=3.6', + packages=setuptools.find_packages(), + install_requires=install_requires, + # Add in any packaged data. + include_package_data=True, + zip_safe=False, + # PyPI package information. + keywords=__keywords__ +)