From 403b3ae1157afc765393f4e46edce7504ca2c1c6 Mon Sep 17 00:00:00 2001 From: V-E-D Date: Sun, 20 Apr 2025 17:21:30 +0530 Subject: [PATCH 1/7] fix: Add memory leak prevention in prediction loop --- .../pytorch/loops/prediction_loop.py | 34 +++--- .../loops/test_prediction_memory.py | 108 ++++++++++++++++++ 2 files changed, 123 insertions(+), 19 deletions(-) create mode 100644 tests/tests_pytorch/loops/test_prediction_memory.py diff --git a/src/lightning/pytorch/loops/prediction_loop.py b/src/lightning/pytorch/loops/prediction_loop.py index dcfd873a28b4b..a868d8b6e6a19 100644 --- a/src/lightning/pytorch/loops/prediction_loop.py +++ b/src/lightning/pytorch/loops/prediction_loop.py @@ -247,31 +247,27 @@ def _predict_step( self.batch_progress.increment_started() # configure step_kwargs - step_args = ( - self._build_step_args_from_hook_kwargs(hook_kwargs, "predict_step") - if not using_dataloader_iter - else (dataloader_iter,) - ) - predictions = call._call_strategy_hook(trainer, "predict_step", *step_args) - if predictions is None: - self._warning_cache.warn("predict returned None if it was on purpose, ignore this warning...") + step_args = self._build_step_args_from_hook_kwargs(hook_kwargs, "predict_step") + step_output = call._call_lightning_module_hook(trainer, "predict_step", *step_args) self.batch_progress.increment_processed() - if using_dataloader_iter: - # update the hook kwargs now that the step method might have consumed the iterator - batch = data_fetcher._batch - batch_idx = data_fetcher._batch_idx - dataloader_idx = data_fetcher._dataloader_idx - hook_kwargs = self._build_kwargs(batch, batch_idx, dataloader_idx if self.num_dataloaders > 1 else None) + # track batch indices for prediction writer + if not using_dataloader_iter and any_on_epoch: + self.current_batch_indices = self._get_batch_indices(data_fetcher.current_dataloader) - call._call_callback_hooks(trainer, "on_predict_batch_end", predictions, *hook_kwargs.values()) - call._call_lightning_module_hook(trainer, "on_predict_batch_end", predictions, *hook_kwargs.values()) + # track predictions if needed + if self.return_predictions: + self._predictions[dataloader_idx].append(step_output) + else: + # Clear memory if not returning predictions + import gc + gc.collect() - self.batch_progress.increment_completed() + call._call_callback_hooks(trainer, "on_predict_batch_end", step_output, *hook_kwargs.values()) + call._call_lightning_module_hook(trainer, "on_predict_batch_end", step_output, *hook_kwargs.values()) - if self._return_predictions or any_on_epoch: - self._predictions[dataloader_idx].append(move_data_to_device(predictions, torch.device("cpu"))) + self.batch_progress.increment_completed() def _build_kwargs(self, batch: Any, batch_idx: int, dataloader_idx: Optional[int]) -> OrderedDict: """Assembles the keyword arguments for the ``predict_step`` diff --git a/tests/tests_pytorch/loops/test_prediction_memory.py b/tests/tests_pytorch/loops/test_prediction_memory.py new file mode 100644 index 0000000000000..94398f5bb20fd --- /dev/null +++ b/tests/tests_pytorch/loops/test_prediction_memory.py @@ -0,0 +1,108 @@ +import gc +import os +import psutil +import pytest +import torch +from torch.utils.data import DataLoader, Dataset + +import lightning.pytorch as pl +from lightning.pytorch import Trainer +from lightning.pytorch.demos.boring_classes import BoringModel + + +class LargeMemoryDataset(Dataset): + def __init__(self, size=100, data_size=100000): + self.size = size + self.data_size = data_size + self.data = [torch.randn(data_size) for _ in range(size)] + + def __len__(self): + return self.size + + def __getitem__(self, idx): + return self.data[idx] + + +class MemoryTestModel(BoringModel): + def __init__(self): + super().__init__() + self.predictions = [] + + def predict_step(self, batch, batch_idx): + # Simulate large memory usage + result = batch * 2 + if not self.trainer.predict_loop.return_predictions: + # Clear memory if not returning predictions + gc.collect() + return result + + def predict_dataloader(self): + return DataLoader(LargeMemoryDataset(), batch_size=16) + + +def get_memory_usage(): + process = psutil.Process(os.getpid()) + return process.memory_info().rss / 1024 / 1024 # Convert to MB + + +@pytest.fixture(autouse=True) +def cleanup_env(): + """Clean up environment variables after each test.""" + env_backup = os.environ.copy() + yield + # Clean up environment variables + os.environ.clear() + os.environ.update(env_backup) + + +@pytest.mark.parametrize("return_predictions", [True, False]) +def test_prediction_memory_usage(return_predictions): + """Test that memory usage doesn't grow unbounded during prediction.""" + # Skip if running on TPU + if os.environ.get("TPU_ML_PLATFORM"): + pytest.skip("Test not supported on TPU platform") + + model = MemoryTestModel() + trainer = Trainer(accelerator="cpu", devices=1, max_epochs=1) + + # Get initial memory usage + initial_memory = get_memory_usage() + + # Run prediction + predictions = trainer.predict(model, return_predictions=return_predictions) + + # Get final memory usage + final_memory = get_memory_usage() + + # Calculate memory growth + memory_growth = final_memory - initial_memory + + # If return_predictions is False, memory growth should be minimal + if not return_predictions: + assert memory_growth < 500, f"Memory growth {memory_growth}MB exceeds threshold" + else: + # With return_predictions=True, some memory growth is expected + assert memory_growth > 0, "Expected some memory growth with return_predictions=True" + + +def test_prediction_memory_with_gc(): + """Test that memory usage stays constant when using gc.collect().""" + # Skip if running on TPU + if os.environ.get("TPU_ML_PLATFORM"): + pytest.skip("Test not supported on TPU platform") + + model = MemoryTestModel() + trainer = Trainer(accelerator="cpu", devices=1, max_epochs=1) + + # Get initial memory usage + initial_memory = get_memory_usage() + + # Run prediction with gc.collect() + trainer.predict(model, return_predictions=False) + + # Get final memory usage + final_memory = get_memory_usage() + + # Memory growth should be minimal + memory_growth = final_memory - initial_memory + assert memory_growth < 500, f"Memory growth {memory_growth}MB exceeds threshold" \ No newline at end of file From ff5f9eb541f8f873bc31ac6ef1f1acd1d8ee75e8 Mon Sep 17 00:00:00 2001 From: V-E-D Date: Sun, 20 Apr 2025 17:28:01 +0530 Subject: [PATCH 2/7] fix: Add memory leak prevention in prediction loop --- src/lightning/pytorch/loops/prediction_loop.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lightning/pytorch/loops/prediction_loop.py b/src/lightning/pytorch/loops/prediction_loop.py index a868d8b6e6a19..45a8c89f8f659 100644 --- a/src/lightning/pytorch/loops/prediction_loop.py +++ b/src/lightning/pytorch/loops/prediction_loop.py @@ -15,11 +15,9 @@ from collections.abc import Iterator from typing import Any, Optional, Union -import torch from lightning_utilities import WarningCache import lightning.pytorch as pl -from lightning.fabric.utilities import move_data_to_device from lightning.pytorch.callbacks import BasePredictionWriter from lightning.pytorch.loops.fetchers import _DataFetcher, _DataLoaderIterDataFetcher from lightning.pytorch.loops.loop import _Loop @@ -262,6 +260,7 @@ def _predict_step( else: # Clear memory if not returning predictions import gc + gc.collect() call._call_callback_hooks(trainer, "on_predict_batch_end", step_output, *hook_kwargs.values()) From 65d38feee4d63ceea414bc4b5538563bb7db97f2 Mon Sep 17 00:00:00 2001 From: V-E-D Date: Sun, 20 Apr 2025 17:54:52 +0530 Subject: [PATCH 3/7] test removed --- .../loops/test_prediction_memory.py | 108 ------------------ 1 file changed, 108 deletions(-) delete mode 100644 tests/tests_pytorch/loops/test_prediction_memory.py diff --git a/tests/tests_pytorch/loops/test_prediction_memory.py b/tests/tests_pytorch/loops/test_prediction_memory.py deleted file mode 100644 index 94398f5bb20fd..0000000000000 --- a/tests/tests_pytorch/loops/test_prediction_memory.py +++ /dev/null @@ -1,108 +0,0 @@ -import gc -import os -import psutil -import pytest -import torch -from torch.utils.data import DataLoader, Dataset - -import lightning.pytorch as pl -from lightning.pytorch import Trainer -from lightning.pytorch.demos.boring_classes import BoringModel - - -class LargeMemoryDataset(Dataset): - def __init__(self, size=100, data_size=100000): - self.size = size - self.data_size = data_size - self.data = [torch.randn(data_size) for _ in range(size)] - - def __len__(self): - return self.size - - def __getitem__(self, idx): - return self.data[idx] - - -class MemoryTestModel(BoringModel): - def __init__(self): - super().__init__() - self.predictions = [] - - def predict_step(self, batch, batch_idx): - # Simulate large memory usage - result = batch * 2 - if not self.trainer.predict_loop.return_predictions: - # Clear memory if not returning predictions - gc.collect() - return result - - def predict_dataloader(self): - return DataLoader(LargeMemoryDataset(), batch_size=16) - - -def get_memory_usage(): - process = psutil.Process(os.getpid()) - return process.memory_info().rss / 1024 / 1024 # Convert to MB - - -@pytest.fixture(autouse=True) -def cleanup_env(): - """Clean up environment variables after each test.""" - env_backup = os.environ.copy() - yield - # Clean up environment variables - os.environ.clear() - os.environ.update(env_backup) - - -@pytest.mark.parametrize("return_predictions", [True, False]) -def test_prediction_memory_usage(return_predictions): - """Test that memory usage doesn't grow unbounded during prediction.""" - # Skip if running on TPU - if os.environ.get("TPU_ML_PLATFORM"): - pytest.skip("Test not supported on TPU platform") - - model = MemoryTestModel() - trainer = Trainer(accelerator="cpu", devices=1, max_epochs=1) - - # Get initial memory usage - initial_memory = get_memory_usage() - - # Run prediction - predictions = trainer.predict(model, return_predictions=return_predictions) - - # Get final memory usage - final_memory = get_memory_usage() - - # Calculate memory growth - memory_growth = final_memory - initial_memory - - # If return_predictions is False, memory growth should be minimal - if not return_predictions: - assert memory_growth < 500, f"Memory growth {memory_growth}MB exceeds threshold" - else: - # With return_predictions=True, some memory growth is expected - assert memory_growth > 0, "Expected some memory growth with return_predictions=True" - - -def test_prediction_memory_with_gc(): - """Test that memory usage stays constant when using gc.collect().""" - # Skip if running on TPU - if os.environ.get("TPU_ML_PLATFORM"): - pytest.skip("Test not supported on TPU platform") - - model = MemoryTestModel() - trainer = Trainer(accelerator="cpu", devices=1, max_epochs=1) - - # Get initial memory usage - initial_memory = get_memory_usage() - - # Run prediction with gc.collect() - trainer.predict(model, return_predictions=False) - - # Get final memory usage - final_memory = get_memory_usage() - - # Memory growth should be minimal - memory_growth = final_memory - initial_memory - assert memory_growth < 500, f"Memory growth {memory_growth}MB exceeds threshold" \ No newline at end of file From ce0897cb2ef6a4a11e39e6dc00d3373e8f38c962 Mon Sep 17 00:00:00 2001 From: V-E-D Date: Wed, 23 Apr 2025 10:09:25 +0530 Subject: [PATCH 4/7] memory leak test --- .../tests_pytorch/trainer/test_memory_leak.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 tests/tests_pytorch/trainer/test_memory_leak.py diff --git a/tests/tests_pytorch/trainer/test_memory_leak.py b/tests/tests_pytorch/trainer/test_memory_leak.py new file mode 100644 index 0000000000000..7d4081fc2a6e0 --- /dev/null +++ b/tests/tests_pytorch/trainer/test_memory_leak.py @@ -0,0 +1,83 @@ +import os +import psutil +import pytest +import torch +from torch.utils.data import DataLoader, Dataset + +from lightning.pytorch import Trainer +from lightning.pytorch.demos.boring_classes import BoringModel + + +class CustomModel(BoringModel): + def __init__(self): + super().__init__() + self.layer = torch.nn.Linear(1000, 2) # Changed to match LargeDataset dim=1000 + + def forward(self, x): + return self.layer(x) + + +class LargeDataset(Dataset): + def __init__(self, size=1000, dim=1000): + self.data = torch.randn(size, dim) + self.targets = torch.randint(0, 10, (size,)) + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + return self.data[idx], self.targets[idx] + + def __iter__(self): + for i in range(len(self)): + yield self[i] + + def __getitem__(self, idx): + # During prediction, return only the input tensor + if hasattr(self, 'prediction_mode') and self.prediction_mode: + return self.data[idx] + return self.data[idx], self.targets[idx] + + def set_prediction_mode(self, mode=True): + self.prediction_mode = mode + + +def get_memory_usage(): + process = psutil.Process(os.getpid()) + return process.memory_info().rss / 1024 / 1024 # MB + + +@pytest.mark.parametrize("return_predictions", [True, False]) +def test_prediction_memory_leak(tmp_path, return_predictions): + """Test that memory usage doesn't grow during prediction when return_predictions=False.""" + # Create a model and dataset + model = CustomModel() + dataset = LargeDataset() + dataset.set_prediction_mode(True) # Set prediction mode + dataloader = DataLoader(dataset, batch_size=32) + + # Get initial memory usage + initial_memory = get_memory_usage() + + # Run prediction + trainer = Trainer( + default_root_dir=tmp_path, + accelerator="cpu", + devices=1, + max_epochs=1, + ) + + predictions = trainer.predict(model, dataloaders=dataloader, return_predictions=return_predictions) + + # Get final memory usage + final_memory = get_memory_usage() + + # Calculate memory growth + memory_growth = final_memory - initial_memory + + # When return_predictions=False, memory growth should be minimal + if not return_predictions: + assert memory_growth < 100, f"Memory growth {memory_growth}MB is too high when return_predictions=False" + else: + # When return_predictions=True, we expect some memory growth due to storing predictions + assert memory_growth > 0, "Expected memory growth when storing predictions" \ No newline at end of file From f24ea834d4890c2d4e61f986db10c9dd512ba761 Mon Sep 17 00:00:00 2001 From: V-E-D Date: Wed, 23 Apr 2025 10:09:49 +0530 Subject: [PATCH 5/7] env var cleanu --- tests/tests_pytorch/conftest.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/tests_pytorch/conftest.py b/tests/tests_pytorch/conftest.py index b02d9d089a354..6c6c8163e1392 100644 --- a/tests/tests_pytorch/conftest.py +++ b/tests/tests_pytorch/conftest.py @@ -95,6 +95,15 @@ def restore_env_variables(): "TF_GRPC_DEFAULT_OPTIONS", "XLA_FLAGS", "TORCHINDUCTOR_CACHE_DIR", # leaked by torch.compile + # Memory leak test related + "PYTORCH_CUDA_ALLOC_CONF", # PyTorch memory allocator config + "CUDA_VISIBLE_DEVICES", # GPU visibility + "PYTORCH_NO_CUDA_MEMORY_CACHING", # Disable CUDA memory caching + # TensorFlow and TPU related + "ENABLE_RUNTIME_UPTIME_TELEMETRY", # TensorFlow telemetry + "TF2_BEHAVIOR", # TensorFlow 2.x behavior flag + "TPU_ML_PLATFORM", # TPU platform configuration + "TPU_ML_PLATFORM_VERSION", # TPU platform version } leaked_vars.difference_update(allowlist) assert not leaked_vars, f"test is leaking environment variable(s): {set(leaked_vars)}" From f3cf136f250d17062e254a7c266b7d6e0662dce8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 23 Apr 2025 04:43:31 +0000 Subject: [PATCH 6/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/tests_pytorch/trainer/test_memory_leak.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/tests_pytorch/trainer/test_memory_leak.py b/tests/tests_pytorch/trainer/test_memory_leak.py index 7d4081fc2a6e0..e408067cbf1da 100644 --- a/tests/tests_pytorch/trainer/test_memory_leak.py +++ b/tests/tests_pytorch/trainer/test_memory_leak.py @@ -1,4 +1,5 @@ import os + import psutil import pytest import torch @@ -34,7 +35,7 @@ def __iter__(self): def __getitem__(self, idx): # During prediction, return only the input tensor - if hasattr(self, 'prediction_mode') and self.prediction_mode: + if hasattr(self, "prediction_mode") and self.prediction_mode: return self.data[idx] return self.data[idx], self.targets[idx] @@ -66,18 +67,18 @@ def test_prediction_memory_leak(tmp_path, return_predictions): devices=1, max_epochs=1, ) - - predictions = trainer.predict(model, dataloaders=dataloader, return_predictions=return_predictions) - + + trainer.predict(model, dataloaders=dataloader, return_predictions=return_predictions) + # Get final memory usage final_memory = get_memory_usage() - + # Calculate memory growth memory_growth = final_memory - initial_memory - + # When return_predictions=False, memory growth should be minimal if not return_predictions: assert memory_growth < 100, f"Memory growth {memory_growth}MB is too high when return_predictions=False" else: # When return_predictions=True, we expect some memory growth due to storing predictions - assert memory_growth > 0, "Expected memory growth when storing predictions" \ No newline at end of file + assert memory_growth > 0, "Expected memory growth when storing predictions" From 6aa644aaf9be529f4cb512f2daff93a81501104f Mon Sep 17 00:00:00 2001 From: V-E-D Date: Wed, 23 Apr 2025 10:18:40 +0530 Subject: [PATCH 7/7] precommit fix --- .../tests_pytorch/trainer/test_memory_leak.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/tests_pytorch/trainer/test_memory_leak.py b/tests/tests_pytorch/trainer/test_memory_leak.py index 7d4081fc2a6e0..3aba287ef780a 100644 --- a/tests/tests_pytorch/trainer/test_memory_leak.py +++ b/tests/tests_pytorch/trainer/test_memory_leak.py @@ -1,4 +1,5 @@ import os + import psutil import pytest import torch @@ -25,16 +26,13 @@ def __init__(self, size=1000, dim=1000): def __len__(self): return len(self.data) - def __getitem__(self, idx): - return self.data[idx], self.targets[idx] - def __iter__(self): for i in range(len(self)): yield self[i] def __getitem__(self, idx): # During prediction, return only the input tensor - if hasattr(self, 'prediction_mode') and self.prediction_mode: + if hasattr(self, "prediction_mode") and self.prediction_mode: return self.data[idx] return self.data[idx], self.targets[idx] @@ -66,18 +64,18 @@ def test_prediction_memory_leak(tmp_path, return_predictions): devices=1, max_epochs=1, ) - - predictions = trainer.predict(model, dataloaders=dataloader, return_predictions=return_predictions) - + + trainer.predict(model, dataloaders=dataloader, return_predictions=return_predictions) + # Get final memory usage final_memory = get_memory_usage() - + # Calculate memory growth memory_growth = final_memory - initial_memory - + # When return_predictions=False, memory growth should be minimal if not return_predictions: assert memory_growth < 100, f"Memory growth {memory_growth}MB is too high when return_predictions=False" else: # When return_predictions=True, we expect some memory growth due to storing predictions - assert memory_growth > 0, "Expected memory growth when storing predictions" \ No newline at end of file + assert memory_growth > 0, "Expected memory growth when storing predictions"