Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions omlx/engine_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,28 @@ async def _evict_idle_lru_for_prefill(
await self._unload_engine(victim)
evicted_any = True

def _other_entries_serving(self, model_id: str) -> bool:
"""True when any loaded entry other than ``model_id`` is serving.

Used by the settle barrier in ``_unload_engine``: the barrier's
freed-memory check is a delta of the process-global
``mx.get_active_memory()`` gauge, which only measures THIS unload
while no other engine is allocating concurrently.
"""
# Snapshot the items: admin unload routes call _unload_engine without
# the pool lock, so discover_models() can mutate _entries mid-iteration.
for mid, e in list(self._entries.items()):
if mid == model_id or e.engine is None:
continue
if e.in_use > 0:
return True
try:
if e.engine.has_active_requests():
return True
except AttributeError:
pass
return False

async def _unload_engine(self, model_id: str) -> None:
"""
Immediately stop and unload an engine with memory settle barrier.
Expand Down Expand Up @@ -759,6 +781,7 @@ async def _unload_engine(self, model_id: str) -> None:
settle_tolerance = max(2 * 1024**3, int(entry.estimated_size * 0.05))
min_expected_freed = max(0, entry.estimated_size - settle_tolerance)
settled = False
settle_indeterminate = False
for _settle_round in range(10):
active_now = mx.get_active_memory()
actual_freed = pre_unload_active - active_now
Expand All @@ -770,6 +793,23 @@ async def _unload_engine(self, model_id: str) -> None:
f"(need>={format_size(min_expected_freed)}) - settled"
)
break
if self._other_entries_serving(model_id):
# actual_freed is a delta of the process-global MLX gauge,
# so while another engine allocates (prefill/KV growth) the
# amount freed by THIS unload is unmeasurable — the delta can
# even read negative. Burning settle rounds here serializes
# gc/synchronize/clear_cache against live decode for seconds,
# under memory pressure, with the enforcer holding the pool
# lock. Bail out instead: pre-load admission re-reads the
# live gauge, so nothing downstream trusts this sample.
settle_indeterminate = True
logger.info(
f"Settle for '{model_id}' indeterminate under concurrent "
f"activity (freed={format_size(actual_freed)}, "
f"need>={format_size(min_expected_freed)}); skipping "
f"settle wait"
)
break
logger.debug(
f"Settle round {_settle_round + 1} for '{model_id}': "
f"freed={format_size(actual_freed)} "
Expand All @@ -791,6 +831,16 @@ async def _unload_engine(self, model_id: str) -> None:
f"(expected>={format_size(min_expected_freed)}), "
f"active_memory: {format_size(active_now)} (settled)"
)
elif settle_indeterminate:
# Settle wait skipped (logged above). Emergency reclaim is
# deliberately skipped too: its gc + synchronize + clear_cache
# rounds would stall the live engines that made the measurement
# indeterminate in the first place. Recovery is not lost:
# _wake_process_memory_enforcer() below triggers an immediate
# enforcer re-poll, and pre-load admission re-reads the live gauge
# alongside the tracked accumulator (the #1623 max() in
# get_engine), so any unreleased memory stays visible to both.
pass
else:
# Barrier timed out - try emergency reclaim
logger.warning(
Expand Down
110 changes: 110 additions & 0 deletions tests/test_engine_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import asyncio
import json
import logging
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -1829,6 +1830,115 @@ def mock_get_active():
assert pool._entries["model-a"].engine is None
assert pool._current_model_memory == 0

@pytest.mark.asyncio
async def test_settle_bails_out_under_concurrent_activity(
self, pool_with_loaded_model, caplog
):
"""1774 regression: with another engine serving, the global freed
delta is unmeasurable (it can read negative as the other engine
allocates). The barrier must bail after one sample instead of burning
10 settle rounds + emergency reclaim — ~8s of gc/synchronize/
clear_cache serialized against live decode, under the pool lock.
"""
pool = pool_with_loaded_model

# Second entry actively serving.
other_engine = MagicMock()
other_engine.has_active_requests = MagicMock(return_value=True)
pool._entries["model-b"].engine = other_engine

# Global gauge RISES during settle (concurrent prefill/KV growth),
# so freed = pre_unload - active_now is negative every round.
call_idx = [0]

def rising_gauge():
val = (10 + call_idx[0]) * 1024**3
call_idx[0] += 1
return val

sleep_calls: list[float] = []

async def record_sleep(duration, *args, **kwargs):
sleep_calls.append(duration)

with (
patch("omlx.engine_pool.mx") as mock_mx,
patch("omlx.engine_pool.get_mlx_executor", return_value=None),
patch("asyncio.sleep", side_effect=record_sleep),
caplog.at_level(logging.DEBUG, logger="omlx.engine_pool"),
):
mock_mx.get_active_memory = rising_gauge
mock_mx.synchronize = MagicMock()
mock_mx.clear_cache = MagicMock()

await pool._unload_engine("model-a")

assert "indeterminate under concurrent activity" in caplog.text
# No settle-round burn, no timeout warning, no emergency reclaim.
assert sleep_calls.count(0.5) == 0
assert "Settle barrier timed out" not in caplog.text
assert "Emergency reclaim" not in caplog.text
# Only the initial pre-barrier release cycle touched the executor.
assert mock_mx.synchronize.call_count == 1
assert mock_mx.clear_cache.call_count == 1
# The unload itself still completes and is accounted.
assert pool._entries["model-a"].engine is None
assert pool._current_model_memory == 0

@pytest.mark.asyncio
async def test_settle_still_waits_when_pool_otherwise_idle(
self, pool_with_loaded_model, caplog
):
"""Idle-pool behavior is unchanged (#768 protection): with no other
entry serving, an unsatisfied barrier still burns its settle rounds
and escalates to emergency reclaim.
"""
pool = pool_with_loaded_model

call_idx = [0]

def rising_gauge():
val = (10 + call_idx[0]) * 1024**3
call_idx[0] += 1
return val

with (
patch("omlx.engine_pool.mx") as mock_mx,
patch("omlx.engine_pool.get_mlx_executor", return_value=None),
patch("asyncio.sleep", new_callable=AsyncMock),
caplog.at_level(logging.DEBUG, logger="omlx.engine_pool"),
):
mock_mx.get_active_memory = rising_gauge
mock_mx.synchronize = MagicMock()
mock_mx.clear_cache = MagicMock()

await pool._unload_engine("model-a")

assert "indeterminate under concurrent activity" not in caplog.text
assert "Settle barrier timed out" in caplog.text
# Full barrier behavior preserved: 1 initial release cycle + 10 settle
# rounds + 3 emergency-reclaim rounds on the executor.
assert mock_mx.synchronize.call_count == 14
assert mock_mx.clear_cache.call_count == 14

def test_other_entries_serving_in_use_lease_counts(
self, pool_with_loaded_model
):
"""The in-use lease (acquired but not yet active) also marks the pool
as serving — eviction paths already treat it as activity (#1667).
"""
pool = pool_with_loaded_model
entry_b = pool._entries["model-b"]

assert pool._other_entries_serving("model-a") is False

entry_b.engine = MagicMock()
entry_b.engine.has_active_requests = MagicMock(return_value=False)
assert pool._other_entries_serving("model-a") is False

entry_b.in_use = 1
assert pool._other_entries_serving("model-a") is True


class TestEnginePoolInUseLease:
"""Tests for the acquire-vs-use in-use lease (#1667).
Expand Down