diff --git a/omlx/engine_core.py b/omlx/engine_core.py index 2ed4c586b..55cba30e4 100644 --- a/omlx/engine_core.py +++ b/omlx/engine_core.py @@ -31,6 +31,38 @@ _global_mlx_executor: concurrent.futures.ThreadPoolExecutor | None = None +# Module-global keep-alive registries for per-engine MLX resources (#1248). +# +# PROVEN ROOT CAUSE (native crash frame, 2026-05-30): MLX's @mx.compile graph +# cache — mlx::core::detail::CompilerCache — is a C++ `thread_local`. Every +# thread that runs compiled functions gets its own CompilerCache holding the +# compiled graphs, which reference Python objects (captured constants/arrays). +# +# Before #1248, all engines shared ONE process-global executor whose worker +# thread NEVER exits mid-process, so its thread_local CompilerCache is never +# destructed while the server runs. #1248 gave each EngineCore its own +# ThreadPoolExecutor, and EngineCore.close() called executor.shutdown(wait=True) +# — which EXITS that worker thread. On thread exit, dyld runs the thread_local +# destructor ~CompilerCache(), which frees the cached graphs' Python tuples +# from a thread-exit handler (no GIL held, and after deep_reset/gc already +# released those objects) -> use-after-free -> asynchronous native SIGSEGV. +# Crash frame: _pthread_exit -> dyld finalizeList -> ~CompilerCache -> +# tupledealloc (KERN_INVALID_ADDRESS). It is DeepSeek-V4-only because V4 is the +# only model with module-scope @mx.compile graphs (deepseek_v4_model.py / +# hyper_connection.py) that populate the per-engine thread's cache. It is +# sync-immune (a C++ thread-exit destructor, nothing to do with GPU streams) — +# which is why every prior synchronize/flush/stream fix only relocated it. +# +# FIX: never let a per-engine worker thread exit mid-process. Hold a strong +# reference to every per-engine executor (and its stream) here so the thread — +# and its thread_local CompilerCache — lives for the process lifetime, +# restoring the good-baseline invariant. No synchronization (sync-immune). +# Growth is O(engines ever created): one idle thread + two small handles per +# model load, reclaimed at process exit — negligible for a model server. +# Do NOT shut these down and do NOT add synchronization. +_immortal_engine_streams: list = [] +_immortal_engine_executors: list = [] + def _init_mlx_thread() -> None: """Replace generation_stream with a thread-local stream on the executor thread. @@ -141,6 +173,14 @@ def __init__( max_workers=1, thread_name_prefix=f"mlx-engine-{self._engine_id[:8]}", ) + # Pin both immortal so this worker thread — and its thread_local MLX + # CompilerCache — is NEVER destructed mid-process at unload. Exiting the + # thread runs ~CompilerCache() which frees @mx.compile graphs' Python + # objects unsafely at thread-exit -> DeepSeek-V4 unload SIGSEGV. See the + # registry note above (#1248). close() drops EngineCore's refs but these + # registries keep the thread + stream alive for the process lifetime. + _immortal_engine_streams.append(self._mlx_stream) + _immortal_engine_executors.append(self._mlx_executor) # Create scheduler with per-engine stream scheduler_config = self.config.scheduler_config or SchedulerConfig() @@ -717,9 +757,15 @@ def close(self) -> None: except RuntimeError: pass - if self._mlx_executor is not None: - self._mlx_executor.shutdown(wait=True) - self._mlx_executor = None + # Do NOT shut down the per-engine executor: shutdown(wait=True) exits + # the worker thread, which runs MLX's thread_local CompilerCache + # destructor and frees @mx.compile graphs' Python objects from a dyld + # thread-exit handler -> use-after-free SIGSEGV for DeepSeek V4 (proven + # via native crash frame). The executor is already idle here (shutdown + + # deep_reset were dispatched and joined above). Just drop our reference; + # _immortal_engine_executors keeps the thread alive for the process + # lifetime, matching the good-baseline single-global-thread behavior. + self._mlx_executor = None # Clear output collectors for collector in self._output_collectors.values(): diff --git a/tests/test_per_engine_threads.py b/tests/test_per_engine_threads.py index 284e426bd..ec4b741a2 100644 --- a/tests/test_per_engine_threads.py +++ b/tests/test_per_engine_threads.py @@ -144,7 +144,15 @@ def test_engine_passes_stream_to_scheduler(self): engine.close() - def test_close_shuts_down_executor(self): + def test_close_pins_executor_immortal_not_shut_down(self): + """close() must NOT shut down the per-engine executor: exiting its + worker thread runs MLX's thread_local CompilerCache destructor, which + frees @mx.compile graphs' Python objects unsafely at thread-exit -> + DeepSeek-V4 unload SIGSEGV (#1248). The executor is pinned immortal so + its worker thread stays alive for the process lifetime; close() only + drops EngineCore's own reference.""" + import omlx.engine_core as ec + mock_model = MagicMock() mock_model.model_type = "test" mock_tokenizer = MagicMock() @@ -155,10 +163,129 @@ def test_close_shuts_down_executor(self): engine = EngineCore(mock_model, mock_tokenizer) executor = engine._mlx_executor + assert executor in ec._immortal_engine_executors engine.close() + # EngineCore drops its reference, but the executor (and its worker + # thread) must stay alive — NOT shut down. assert engine._mlx_executor is None - assert executor._shutdown + assert executor in ec._immortal_engine_executors + assert not executor._shutdown + + +class TestImmortalStreamRegistry: + """Per-engine streams must be pinned immortal so the underlying Metal + command queue is never recycled at unload (#1248 regression / DeepSeek-V4 + unload SIGSEGV). The fix restores the good-baseline immortal-queue + invariant by holding a process-lifetime strong reference to every + per-engine stream in a module-global registry.""" + + def test_registry_exists(self): + import omlx.engine_core as ec + + assert hasattr(ec, "_immortal_engine_streams") + assert isinstance(ec._immortal_engine_streams, list) + assert hasattr(ec, "_immortal_engine_executors") + assert isinstance(ec._immortal_engine_executors, list) + + def test_stream_registered_on_init(self): + import omlx.engine_core as ec + + mock_model = MagicMock() + mock_model.model_type = "test" + mock_tokenizer = MagicMock() + mock_tokenizer.eos_token_id = 0 + + before = len(ec._immortal_engine_streams) + with patch("omlx.engine_core.get_registry") as mock_registry: + mock_registry.return_value.acquire.return_value = True + + engine = EngineCore(mock_model, mock_tokenizer) + # The engine's stream must be held by the registry immediately. + assert engine._mlx_stream in ec._immortal_engine_streams + assert len(ec._immortal_engine_streams) == before + 1 + + engine.close() + + def test_stream_retained_after_close(self): + """close() must NOT drop the registry's strong reference: the stream + (and its Metal queue) stays alive for the process lifetime even after + the engine is closed and dereferenced.""" + import gc + import omlx.engine_core as ec + + mock_model = MagicMock() + mock_model.model_type = "test" + mock_tokenizer = MagicMock() + mock_tokenizer.eos_token_id = 0 + + with patch("omlx.engine_core.get_registry") as mock_registry: + mock_registry.return_value.acquire.return_value = True + + engine = EngineCore(mock_model, mock_tokenizer) + stream = engine._mlx_stream + engine.close() + + # Drop the only other strong references and force collection. + del engine + gc.collect() + + # Registry must still hold the stream -> Metal queue immortal. + assert stream in ec._immortal_engine_streams + + def test_registry_growth_bounded_by_engine_count(self): + """Growth is O(engines created), one entry per engine, regardless of + whether close() was called.""" + import omlx.engine_core as ec + + mock_tokenizer = MagicMock() + mock_tokenizer.eos_token_id = 0 + + with patch("omlx.engine_core.get_registry") as mock_registry: + mock_registry.return_value.acquire.return_value = True + + before = len(ec._immortal_engine_streams) + engines = [] + for _ in range(3): + m = MagicMock() + m.model_type = "test" + engines.append(EngineCore(m, mock_tokenizer)) + + assert len(ec._immortal_engine_streams) == before + 3 + + for e in engines: + e.close() + + # close() does not remove entries; count is unchanged. + assert len(ec._immortal_engine_streams) == before + 3 + + def test_close_does_not_shut_down_global_executor(self): + """Guardrail: an engine close() must NEVER shut down the process-global + executor (get_mlx_executor). (The per-engine executor is also no longer + shut down — it is pinned immortal — but this test specifically protects + the shared global executor.)""" + import omlx.engine_core as ec + + mock_model = MagicMock() + mock_model.model_type = "test" + mock_tokenizer = MagicMock() + mock_tokenizer.eos_token_id = 0 + + # Materialize the global executor and confirm it is distinct from the + # per-engine one and survives close(). + global_exec = ec.get_mlx_executor() + + with patch("omlx.engine_core.get_registry") as mock_registry: + mock_registry.return_value.acquire.return_value = True + + engine = EngineCore(mock_model, mock_tokenizer) + assert engine._mlx_executor is not global_exec + engine.close() + + assert not global_exec._shutdown + # Same singleton is still returned and usable. + assert ec.get_mlx_executor() is global_exec + assert not global_exec._shutdown class TestConcurrentStreamIsolation: