diff --git a/tests/integration_tests.py b/tests/integration_tests.py index 902b1b35..7048439c 100755 --- a/tests/integration_tests.py +++ b/tests/integration_tests.py @@ -378,14 +378,22 @@ def build_test_list(): [ [ "--checkpoint.enable_checkpoint", - "--experimental.pipeline_parallel_degree 2", + "--training.tensor_parallel_degree=2", + "--experimental.context_parallel_degree=2", + "--training.enable_cpu_offload", + "--optimizer.early_step_in_backward", + ], + [ + "--training.tensor_parallel_degree=2", + "--experimental.context_parallel_degree=2", + "--training.data_parallel_replicate_degree=2", "--training.enable_cpu_offload", "--optimizer.early_step_in_backward", ], ], - "Enable CPU Offload with PP", - "enable_cpu_offload+PP", - ngpu=4, + "Enable CPU Offload, Optimizer in backward with TP, DP, CP", + "cpu_offload+opt_in_bwd+TP+DP+CP", + ngpu=8, ), OverrideDefinitions( [ diff --git a/torchtitan/optimizer.py b/torchtitan/optimizer.py index 4e205f04..2356b95a 100644 --- a/torchtitan/optimizer.py +++ b/torchtitan/optimizer.py @@ -81,30 +81,37 @@ def __init__( ) -> None: self.optimizers = [] self.model_parts = model_parts + optim_dict = {} for model in self.model_parts: if name == "Adam": # TODO: make the optimizer options configurable by toml/cmd args - optim_dict = { - param: torch.optim.Adam([param], **optimizer_kwargs) - for param in model.parameters() - } + optim_dict.update( + { + param: torch.optim.Adam([param], **optimizer_kwargs) + for param in model.parameters() + } + ) elif name == "AdamW": - optim_dict = { - param: torch.optim.AdamW([param], **optimizer_kwargs) - for param in model.parameters() - } + optim_dict.update( + { + param: torch.optim.AdamW([param], **optimizer_kwargs) + for param in model.parameters() + } + ) else: raise NotImplementedError(f"Optimizer {name} not added.") - def optim_hook(param) -> None: - optim_dict[param].step() - optim_dict[param].zero_grad() + def optim_hook(param) -> None: + optim_dict[param].step() + optim_dict[param].zero_grad() + for model in self.model_parts: for param in model.parameters(): if param.requires_grad: param.register_post_accumulate_grad_hook(optim_hook) self.optimizers.extend([optim_dict[param] for param in model.parameters()]) + self._validate_length( sum( len([param for param in model.parameters()]) @@ -127,6 +134,10 @@ def build_optimizers( step() and zero_grad() method for all the child optimizers. """ optim_in_bwd = job_config.optimizer.early_step_in_backward + if optim_in_bwd and job_config.experimental.pipeline_parallel_degree > 1: + raise NotImplementedError( + "OptimizersInBackwardContainer is not supported with pipeline parallelism" + ) name = job_config.optimizer.name lr = job_config.optimizer.lr fused = job_config.optimizer.fused