diff --git a/tests/test_engine_core_idle_polling.py b/tests/test_engine_core_idle_polling.py new file mode 100644 index 000000000..522da1fd9 --- /dev/null +++ b/tests/test_engine_core_idle_polling.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: Apache-2.0 +"""Regression tests for EngineCore idle polling behavior.""" + +import pytest + + +class _IdleScheduler: + def has_requests(self): + return False + + def _close_batch_generator(self): + pass + + +class _RequestCapturingScheduler: + def __init__(self): + self.requests = [] + + def add_request(self, request): + self.requests.append(request) + + +@pytest.mark.anyio +async def test_engine_loop_uses_idle_interval_when_scheduler_is_empty(monkeypatch): + """An empty scheduler should not keep polling at the active 1ms interval.""" + import vllm_mlx.engine_core as engine_core + from vllm_mlx.engine_core import EngineConfig, EngineCore + + engine = object.__new__(EngineCore) + engine.config = EngineConfig(step_interval=0.001, idle_step_interval=0.25) + engine.scheduler = _IdleScheduler() + engine._running = True + engine._request_event = None + engine._steps_executed = 0 + + sleeps = [] + + async def fake_sleep(delay): + sleeps.append(delay) + engine._running = False + + monkeypatch.setattr(engine_core.asyncio, "sleep", fake_sleep) + + await engine._engine_loop() + + assert sleeps == [0.25] + + +@pytest.mark.anyio +async def test_add_request_wakes_idle_engine_loop(): + """Adding work should wake the idle loop instead of waiting for timeout.""" + import asyncio + + from vllm_mlx.engine_core import EngineConfig, EngineCore + + engine = object.__new__(EngineCore) + engine.config = EngineConfig() + engine.scheduler = _RequestCapturingScheduler() + engine._output_collectors = {} + engine._stream_states = {} + engine._finished_events = {} + engine._request_event = asyncio.Event() + + request_id = await engine.add_request("hello", request_id="req-1") + + assert request_id == "req-1" + assert len(engine.scheduler.requests) == 1 + assert engine._request_event.is_set() diff --git a/vllm_mlx/engine_core.py b/vllm_mlx/engine_core.py index 1662e01c6..4b0ee654d 100644 --- a/vllm_mlx/engine_core.py +++ b/vllm_mlx/engine_core.py @@ -36,6 +36,35 @@ def _is_stream_thread_error(error: Exception) -> bool: return "no Stream(" in message or "no Stream(gpu" in message +def _clear_request_event(request_event: Optional[asyncio.Event]) -> None: + if request_event is not None: + request_event.clear() + + +def _set_request_event(request_event: Optional[asyncio.Event]) -> None: + if request_event is not None: + request_event.set() + + +async def _wait_for_idle_or_request( + request_event: Optional[asyncio.Event], timeout: float +) -> None: + if timeout <= 0: + await asyncio.sleep(0) + return + + if request_event is None: + await asyncio.sleep(timeout) + return + + try: + await asyncio.wait_for(request_event.wait(), timeout=timeout) + except asyncio.TimeoutError: + pass + finally: + request_event.clear() + + @dataclass class EngineConfig: """Configuration for the engine.""" @@ -43,6 +72,7 @@ class EngineConfig: model_name: str = "" scheduler_config: Optional[SchedulerConfig] = None step_interval: float = 0.001 # 1ms between steps + idle_step_interval: float = 0.1 # 100ms idle wait when scheduler is empty stream_interval: int = 1 # Tokens to batch before streaming (1=every token) gpu_memory_utilization: float = 0.90 # Fraction of device memory for allocation @@ -108,6 +138,7 @@ def __init__( # Engine state self._running = False self._task: Optional[asyncio.Task] = None + self._request_event: Optional[asyncio.Event] = None self._start_time: Optional[float] = None self._steps_executed = 0 @@ -119,6 +150,7 @@ async def start(self) -> None: return self._running = True + self._request_event = asyncio.Event() self._start_time = time.time() self._task = asyncio.create_task(self._engine_loop()) logger.info("Engine started") @@ -222,8 +254,8 @@ def _close_batch_generator_on_worker() -> None: _bind_worker_streams_once() self.scheduler._close_batch_generator() - step_interval = self.config.step_interval stream_interval = self.config.stream_interval + idle_step_interval = self.config.idle_step_interval use_simple_streaming = stream_interval == 1 # Emergency memory pressure threshold — dynamic based on gpu_memory_utilization @@ -241,6 +273,7 @@ def _close_batch_generator_on_worker() -> None: while self._running: try: if self.scheduler.has_requests(): + _clear_request_event(getattr(self, "_request_event", None)) if use_worker_thread: try: output = await loop.run_in_executor( @@ -314,8 +347,11 @@ def _close_batch_generator_on_worker() -> None: # making the server unresponsive to all HTTP requests. await asyncio.sleep(0) else: - # No work, yield control - await asyncio.sleep(step_interval) + # No work; wait longer than the active loop but wake + # immediately when add_request signals new work. + await _wait_for_idle_or_request( + getattr(self, "_request_event", None), idle_step_interval + ) except asyncio.CancelledError: raise @@ -380,6 +416,7 @@ async def add_request( # Add to scheduler self.scheduler.add_request(request) + _set_request_event(getattr(self, "_request_event", None)) return request_id