Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions tests/test_engine_core_idle_polling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# SPDX-License-Identifier: Apache-2.0
"""Regression tests for EngineCore idle polling behavior."""

import pytest


class _IdleScheduler:
def has_requests(self):
return False

def _close_batch_generator(self):
pass


class _RequestCapturingScheduler:
def __init__(self):
self.requests = []

def add_request(self, request):
self.requests.append(request)


@pytest.mark.anyio
async def test_engine_loop_uses_idle_interval_when_scheduler_is_empty(monkeypatch):
"""An empty scheduler should not keep polling at the active 1ms interval."""
import vllm_mlx.engine_core as engine_core
from vllm_mlx.engine_core import EngineConfig, EngineCore

engine = object.__new__(EngineCore)
engine.config = EngineConfig(step_interval=0.001, idle_step_interval=0.25)
engine.scheduler = _IdleScheduler()
engine._running = True
engine._request_event = None
engine._steps_executed = 0

sleeps = []

async def fake_sleep(delay):
sleeps.append(delay)
engine._running = False

monkeypatch.setattr(engine_core.asyncio, "sleep", fake_sleep)

await engine._engine_loop()

assert sleeps == [0.25]


@pytest.mark.anyio
async def test_add_request_wakes_idle_engine_loop():
"""Adding work should wake the idle loop instead of waiting for timeout."""
import asyncio

from vllm_mlx.engine_core import EngineConfig, EngineCore

engine = object.__new__(EngineCore)
engine.config = EngineConfig()
engine.scheduler = _RequestCapturingScheduler()
engine._output_collectors = {}
engine._stream_states = {}
engine._finished_events = {}
engine._request_event = asyncio.Event()

request_id = await engine.add_request("hello", request_id="req-1")

assert request_id == "req-1"
assert len(engine.scheduler.requests) == 1
assert engine._request_event.is_set()
43 changes: 40 additions & 3 deletions vllm_mlx/engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,43 @@ def _is_stream_thread_error(error: Exception) -> bool:
return "no Stream(" in message or "no Stream(gpu" in message


def _clear_request_event(request_event: Optional[asyncio.Event]) -> None:
if request_event is not None:
request_event.clear()


def _set_request_event(request_event: Optional[asyncio.Event]) -> None:
if request_event is not None:
request_event.set()


async def _wait_for_idle_or_request(
request_event: Optional[asyncio.Event], timeout: float
) -> None:
if timeout <= 0:
await asyncio.sleep(0)
return

if request_event is None:
await asyncio.sleep(timeout)
return

try:
await asyncio.wait_for(request_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
pass
finally:
request_event.clear()


@dataclass
class EngineConfig:
"""Configuration for the engine."""

model_name: str = ""
scheduler_config: Optional[SchedulerConfig] = None
step_interval: float = 0.001 # 1ms between steps
idle_step_interval: float = 0.1 # 100ms idle wait when scheduler is empty
stream_interval: int = 1 # Tokens to batch before streaming (1=every token)
gpu_memory_utilization: float = 0.90 # Fraction of device memory for allocation

Expand Down Expand Up @@ -108,6 +138,7 @@ def __init__(
# Engine state
self._running = False
self._task: Optional[asyncio.Task] = None
self._request_event: Optional[asyncio.Event] = None
self._start_time: Optional[float] = None
self._steps_executed = 0

Expand All @@ -119,6 +150,7 @@ async def start(self) -> None:
return

self._running = True
self._request_event = asyncio.Event()
self._start_time = time.time()
self._task = asyncio.create_task(self._engine_loop())
logger.info("Engine started")
Expand Down Expand Up @@ -222,8 +254,8 @@ def _close_batch_generator_on_worker() -> None:
_bind_worker_streams_once()
self.scheduler._close_batch_generator()

step_interval = self.config.step_interval
stream_interval = self.config.stream_interval
idle_step_interval = self.config.idle_step_interval
use_simple_streaming = stream_interval == 1

# Emergency memory pressure threshold — dynamic based on gpu_memory_utilization
Expand All @@ -241,6 +273,7 @@ def _close_batch_generator_on_worker() -> None:
while self._running:
try:
if self.scheduler.has_requests():
_clear_request_event(getattr(self, "_request_event", None))
if use_worker_thread:
try:
output = await loop.run_in_executor(
Expand Down Expand Up @@ -314,8 +347,11 @@ def _close_batch_generator_on_worker() -> None:
# making the server unresponsive to all HTTP requests.
await asyncio.sleep(0)
else:
# No work, yield control
await asyncio.sleep(step_interval)
# No work; wait longer than the active loop but wake
# immediately when add_request signals new work.
await _wait_for_idle_or_request(
getattr(self, "_request_event", None), idle_step_interval
)

except asyncio.CancelledError:
raise
Expand Down Expand Up @@ -380,6 +416,7 @@ async def add_request(

# Add to scheduler
self.scheduler.add_request(request)
_set_request_event(getattr(self, "_request_event", None))

return request_id

Expand Down
Loading