diff --git a/omlx/engine_pool.py b/omlx/engine_pool.py index 11d73df93..e839664e9 100644 --- a/omlx/engine_pool.py +++ b/omlx/engine_pool.py @@ -676,6 +676,28 @@ async def _evict_idle_lru_for_prefill( await self._unload_engine(victim) evicted_any = True + def _other_entries_serving(self, model_id: str) -> bool: + """True when any loaded entry other than ``model_id`` is serving. + + Used by the settle barrier in ``_unload_engine``: the barrier's + freed-memory check is a delta of the process-global + ``mx.get_active_memory()`` gauge, which only measures THIS unload + while no other engine is allocating concurrently. + """ + # Snapshot the items: admin unload routes call _unload_engine without + # the pool lock, so discover_models() can mutate _entries mid-iteration. + for mid, e in list(self._entries.items()): + if mid == model_id or e.engine is None: + continue + if e.in_use > 0: + return True + try: + if e.engine.has_active_requests(): + return True + except AttributeError: + pass + return False + async def _unload_engine(self, model_id: str) -> None: """ Immediately stop and unload an engine with memory settle barrier. @@ -759,6 +781,7 @@ async def _unload_engine(self, model_id: str) -> None: settle_tolerance = max(2 * 1024**3, int(entry.estimated_size * 0.05)) min_expected_freed = max(0, entry.estimated_size - settle_tolerance) settled = False + settle_indeterminate = False for _settle_round in range(10): active_now = mx.get_active_memory() actual_freed = pre_unload_active - active_now @@ -770,6 +793,23 @@ async def _unload_engine(self, model_id: str) -> None: f"(need>={format_size(min_expected_freed)}) - settled" ) break + if self._other_entries_serving(model_id): + # actual_freed is a delta of the process-global MLX gauge, + # so while another engine allocates (prefill/KV growth) the + # amount freed by THIS unload is unmeasurable — the delta can + # even read negative. Burning settle rounds here serializes + # gc/synchronize/clear_cache against live decode for seconds, + # under memory pressure, with the enforcer holding the pool + # lock. Bail out instead: pre-load admission re-reads the + # live gauge, so nothing downstream trusts this sample. + settle_indeterminate = True + logger.info( + f"Settle for '{model_id}' indeterminate under concurrent " + f"activity (freed={format_size(actual_freed)}, " + f"need>={format_size(min_expected_freed)}); skipping " + f"settle wait" + ) + break logger.debug( f"Settle round {_settle_round + 1} for '{model_id}': " f"freed={format_size(actual_freed)} " @@ -791,6 +831,16 @@ async def _unload_engine(self, model_id: str) -> None: f"(expected>={format_size(min_expected_freed)}), " f"active_memory: {format_size(active_now)} (settled)" ) + elif settle_indeterminate: + # Settle wait skipped (logged above). Emergency reclaim is + # deliberately skipped too: its gc + synchronize + clear_cache + # rounds would stall the live engines that made the measurement + # indeterminate in the first place. Recovery is not lost: + # _wake_process_memory_enforcer() below triggers an immediate + # enforcer re-poll, and pre-load admission re-reads the live gauge + # alongside the tracked accumulator (the #1623 max() in + # get_engine), so any unreleased memory stays visible to both. + pass else: # Barrier timed out - try emergency reclaim logger.warning( diff --git a/tests/test_engine_pool.py b/tests/test_engine_pool.py index 9e823420b..ec7042220 100644 --- a/tests/test_engine_pool.py +++ b/tests/test_engine_pool.py @@ -3,6 +3,7 @@ import asyncio import json +import logging from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch @@ -1829,6 +1830,115 @@ def mock_get_active(): assert pool._entries["model-a"].engine is None assert pool._current_model_memory == 0 + @pytest.mark.asyncio + async def test_settle_bails_out_under_concurrent_activity( + self, pool_with_loaded_model, caplog + ): + """1774 regression: with another engine serving, the global freed + delta is unmeasurable (it can read negative as the other engine + allocates). The barrier must bail after one sample instead of burning + 10 settle rounds + emergency reclaim — ~8s of gc/synchronize/ + clear_cache serialized against live decode, under the pool lock. + """ + pool = pool_with_loaded_model + + # Second entry actively serving. + other_engine = MagicMock() + other_engine.has_active_requests = MagicMock(return_value=True) + pool._entries["model-b"].engine = other_engine + + # Global gauge RISES during settle (concurrent prefill/KV growth), + # so freed = pre_unload - active_now is negative every round. + call_idx = [0] + + def rising_gauge(): + val = (10 + call_idx[0]) * 1024**3 + call_idx[0] += 1 + return val + + sleep_calls: list[float] = [] + + async def record_sleep(duration, *args, **kwargs): + sleep_calls.append(duration) + + with ( + patch("omlx.engine_pool.mx") as mock_mx, + patch("omlx.engine_pool.get_mlx_executor", return_value=None), + patch("asyncio.sleep", side_effect=record_sleep), + caplog.at_level(logging.DEBUG, logger="omlx.engine_pool"), + ): + mock_mx.get_active_memory = rising_gauge + mock_mx.synchronize = MagicMock() + mock_mx.clear_cache = MagicMock() + + await pool._unload_engine("model-a") + + assert "indeterminate under concurrent activity" in caplog.text + # No settle-round burn, no timeout warning, no emergency reclaim. + assert sleep_calls.count(0.5) == 0 + assert "Settle barrier timed out" not in caplog.text + assert "Emergency reclaim" not in caplog.text + # Only the initial pre-barrier release cycle touched the executor. + assert mock_mx.synchronize.call_count == 1 + assert mock_mx.clear_cache.call_count == 1 + # The unload itself still completes and is accounted. + assert pool._entries["model-a"].engine is None + assert pool._current_model_memory == 0 + + @pytest.mark.asyncio + async def test_settle_still_waits_when_pool_otherwise_idle( + self, pool_with_loaded_model, caplog + ): + """Idle-pool behavior is unchanged (#768 protection): with no other + entry serving, an unsatisfied barrier still burns its settle rounds + and escalates to emergency reclaim. + """ + pool = pool_with_loaded_model + + call_idx = [0] + + def rising_gauge(): + val = (10 + call_idx[0]) * 1024**3 + call_idx[0] += 1 + return val + + with ( + patch("omlx.engine_pool.mx") as mock_mx, + patch("omlx.engine_pool.get_mlx_executor", return_value=None), + patch("asyncio.sleep", new_callable=AsyncMock), + caplog.at_level(logging.DEBUG, logger="omlx.engine_pool"), + ): + mock_mx.get_active_memory = rising_gauge + mock_mx.synchronize = MagicMock() + mock_mx.clear_cache = MagicMock() + + await pool._unload_engine("model-a") + + assert "indeterminate under concurrent activity" not in caplog.text + assert "Settle barrier timed out" in caplog.text + # Full barrier behavior preserved: 1 initial release cycle + 10 settle + # rounds + 3 emergency-reclaim rounds on the executor. + assert mock_mx.synchronize.call_count == 14 + assert mock_mx.clear_cache.call_count == 14 + + def test_other_entries_serving_in_use_lease_counts( + self, pool_with_loaded_model + ): + """The in-use lease (acquired but not yet active) also marks the pool + as serving — eviction paths already treat it as activity (#1667). + """ + pool = pool_with_loaded_model + entry_b = pool._entries["model-b"] + + assert pool._other_entries_serving("model-a") is False + + entry_b.engine = MagicMock() + entry_b.engine.has_active_requests = MagicMock(return_value=False) + assert pool._other_entries_serving("model-a") is False + + entry_b.in_use = 1 + assert pool._other_entries_serving("model-a") is True + class TestEnginePoolInUseLease: """Tests for the acquire-vs-use in-use lease (#1667).