Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions omlx/engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@

_global_mlx_executor: concurrent.futures.ThreadPoolExecutor | None = None

# Module-global keep-alive registries for per-engine MLX resources (#1248).
#
# PROVEN ROOT CAUSE (native crash frame, 2026-05-30): MLX's @mx.compile graph
# cache — mlx::core::detail::CompilerCache — is a C++ `thread_local`. Every
# thread that runs compiled functions gets its own CompilerCache holding the
# compiled graphs, which reference Python objects (captured constants/arrays).
#
# Before #1248, all engines shared ONE process-global executor whose worker
# thread NEVER exits mid-process, so its thread_local CompilerCache is never
# destructed while the server runs. #1248 gave each EngineCore its own
# ThreadPoolExecutor, and EngineCore.close() called executor.shutdown(wait=True)
# — which EXITS that worker thread. On thread exit, dyld runs the thread_local
# destructor ~CompilerCache(), which frees the cached graphs' Python tuples
# from a thread-exit handler (no GIL held, and after deep_reset/gc already
# released those objects) -> use-after-free -> asynchronous native SIGSEGV.
# Crash frame: _pthread_exit -> dyld finalizeList -> ~CompilerCache ->
# tupledealloc (KERN_INVALID_ADDRESS). It is DeepSeek-V4-only because V4 is the
# only model with module-scope @mx.compile graphs (deepseek_v4_model.py /
# hyper_connection.py) that populate the per-engine thread's cache. It is
# sync-immune (a C++ thread-exit destructor, nothing to do with GPU streams) —
# which is why every prior synchronize/flush/stream fix only relocated it.
#
# FIX: never let a per-engine worker thread exit mid-process. Hold a strong
# reference to every per-engine executor (and its stream) here so the thread —
# and its thread_local CompilerCache — lives for the process lifetime,
# restoring the good-baseline invariant. No synchronization (sync-immune).
# Growth is O(engines ever created): one idle thread + two small handles per
# model load, reclaimed at process exit — negligible for a model server.
# Do NOT shut these down and do NOT add synchronization.
_immortal_engine_streams: list = []
_immortal_engine_executors: list = []


def _init_mlx_thread() -> None:
"""Replace generation_stream with a thread-local stream on the executor thread.
Expand Down Expand Up @@ -141,6 +173,14 @@ def __init__(
max_workers=1,
thread_name_prefix=f"mlx-engine-{self._engine_id[:8]}",
)
# Pin both immortal so this worker thread — and its thread_local MLX
# CompilerCache — is NEVER destructed mid-process at unload. Exiting the
# thread runs ~CompilerCache() which frees @mx.compile graphs' Python
# objects unsafely at thread-exit -> DeepSeek-V4 unload SIGSEGV. See the
# registry note above (#1248). close() drops EngineCore's refs but these
# registries keep the thread + stream alive for the process lifetime.
_immortal_engine_streams.append(self._mlx_stream)
_immortal_engine_executors.append(self._mlx_executor)

# Create scheduler with per-engine stream
scheduler_config = self.config.scheduler_config or SchedulerConfig()
Expand Down Expand Up @@ -717,9 +757,15 @@ def close(self) -> None:
except RuntimeError:
pass

if self._mlx_executor is not None:
self._mlx_executor.shutdown(wait=True)
self._mlx_executor = None
# Do NOT shut down the per-engine executor: shutdown(wait=True) exits
# the worker thread, which runs MLX's thread_local CompilerCache
# destructor and frees @mx.compile graphs' Python objects from a dyld
# thread-exit handler -> use-after-free SIGSEGV for DeepSeek V4 (proven
# via native crash frame). The executor is already idle here (shutdown +
# deep_reset were dispatched and joined above). Just drop our reference;
# _immortal_engine_executors keeps the thread alive for the process
# lifetime, matching the good-baseline single-global-thread behavior.
self._mlx_executor = None

# Clear output collectors
for collector in self._output_collectors.values():
Expand Down
131 changes: 129 additions & 2 deletions tests/test_per_engine_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,15 @@ def test_engine_passes_stream_to_scheduler(self):

engine.close()

def test_close_shuts_down_executor(self):
def test_close_pins_executor_immortal_not_shut_down(self):
"""close() must NOT shut down the per-engine executor: exiting its
worker thread runs MLX's thread_local CompilerCache destructor, which
frees @mx.compile graphs' Python objects unsafely at thread-exit ->
DeepSeek-V4 unload SIGSEGV (#1248). The executor is pinned immortal so
its worker thread stays alive for the process lifetime; close() only
drops EngineCore's own reference."""
import omlx.engine_core as ec

mock_model = MagicMock()
mock_model.model_type = "test"
mock_tokenizer = MagicMock()
Expand All @@ -155,10 +163,129 @@ def test_close_shuts_down_executor(self):

engine = EngineCore(mock_model, mock_tokenizer)
executor = engine._mlx_executor
assert executor in ec._immortal_engine_executors
engine.close()

# EngineCore drops its reference, but the executor (and its worker
# thread) must stay alive — NOT shut down.
assert engine._mlx_executor is None
assert executor._shutdown
assert executor in ec._immortal_engine_executors
assert not executor._shutdown


class TestImmortalStreamRegistry:
"""Per-engine streams must be pinned immortal so the underlying Metal
command queue is never recycled at unload (#1248 regression / DeepSeek-V4
unload SIGSEGV). The fix restores the good-baseline immortal-queue
invariant by holding a process-lifetime strong reference to every
per-engine stream in a module-global registry."""

def test_registry_exists(self):
import omlx.engine_core as ec

assert hasattr(ec, "_immortal_engine_streams")
assert isinstance(ec._immortal_engine_streams, list)
assert hasattr(ec, "_immortal_engine_executors")
assert isinstance(ec._immortal_engine_executors, list)

def test_stream_registered_on_init(self):
import omlx.engine_core as ec

mock_model = MagicMock()
mock_model.model_type = "test"
mock_tokenizer = MagicMock()
mock_tokenizer.eos_token_id = 0

before = len(ec._immortal_engine_streams)
with patch("omlx.engine_core.get_registry") as mock_registry:
mock_registry.return_value.acquire.return_value = True

engine = EngineCore(mock_model, mock_tokenizer)
# The engine's stream must be held by the registry immediately.
assert engine._mlx_stream in ec._immortal_engine_streams
assert len(ec._immortal_engine_streams) == before + 1

engine.close()

def test_stream_retained_after_close(self):
"""close() must NOT drop the registry's strong reference: the stream
(and its Metal queue) stays alive for the process lifetime even after
the engine is closed and dereferenced."""
import gc
import omlx.engine_core as ec

mock_model = MagicMock()
mock_model.model_type = "test"
mock_tokenizer = MagicMock()
mock_tokenizer.eos_token_id = 0

with patch("omlx.engine_core.get_registry") as mock_registry:
mock_registry.return_value.acquire.return_value = True

engine = EngineCore(mock_model, mock_tokenizer)
stream = engine._mlx_stream
engine.close()

# Drop the only other strong references and force collection.
del engine
gc.collect()

# Registry must still hold the stream -> Metal queue immortal.
assert stream in ec._immortal_engine_streams

def test_registry_growth_bounded_by_engine_count(self):
"""Growth is O(engines created), one entry per engine, regardless of
whether close() was called."""
import omlx.engine_core as ec

mock_tokenizer = MagicMock()
mock_tokenizer.eos_token_id = 0

with patch("omlx.engine_core.get_registry") as mock_registry:
mock_registry.return_value.acquire.return_value = True

before = len(ec._immortal_engine_streams)
engines = []
for _ in range(3):
m = MagicMock()
m.model_type = "test"
engines.append(EngineCore(m, mock_tokenizer))

assert len(ec._immortal_engine_streams) == before + 3

for e in engines:
e.close()

# close() does not remove entries; count is unchanged.
assert len(ec._immortal_engine_streams) == before + 3

def test_close_does_not_shut_down_global_executor(self):
"""Guardrail: an engine close() must NEVER shut down the process-global
executor (get_mlx_executor). (The per-engine executor is also no longer
shut down — it is pinned immortal — but this test specifically protects
the shared global executor.)"""
import omlx.engine_core as ec

mock_model = MagicMock()
mock_model.model_type = "test"
mock_tokenizer = MagicMock()
mock_tokenizer.eos_token_id = 0

# Materialize the global executor and confirm it is distinct from the
# per-engine one and survives close().
global_exec = ec.get_mlx_executor()

with patch("omlx.engine_core.get_registry") as mock_registry:
mock_registry.return_value.acquire.return_value = True

engine = EngineCore(mock_model, mock_tokenizer)
assert engine._mlx_executor is not global_exec
engine.close()

assert not global_exec._shutdown
# Same singleton is still returned and usable.
assert ec.get_mlx_executor() is global_exec
assert not global_exec._shutdown


class TestConcurrentStreamIsolation:
Expand Down