fix(engine): per-engine threads to eliminate cross-engine stream contamination#1304
Open
ivaniguarans wants to merge 1 commit into
Open
fix(engine): per-engine threads to eliminate cross-engine stream contamination#1304ivaniguarans wants to merge 1 commit into
ivaniguarans wants to merge 1 commit into
Conversation
…amination Replace the shared _global_mlx_executor with per-EngineCore ThreadPoolExecutor + mx.Stream, and fix the MTP patch reading the module-level generation_stream instead of the per-engine stream.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
When multiple LM engines run concurrently, they share a single
_global_mlx_executorwithmax_workers=1, serializing allscheduler.step()calls through one thread. More critically, the MTP patch reads the module-levelgeneration_streamviasys.modulesfor its forward passes, bypassing whatever stream theBatchGeneratorwas instantiated with. If two MTP-capable engines run simultaneously, their MTP forwards land on the same module-level stream regardless of which engine dispatched them — a stream-ordering violation that upstream'sBatchGenerator(stream=...)parameter (mlx-lm 0.31.3) was designed to prevent.This PR gives each
EngineCoreits ownThreadPoolExecutorandmx.Stream, passes the stream throughSchedulerintoBatchGenerator, and removes the_get_generation_stream()indirection from the MTP patch so MTP operations inherit the correct per-engine stream from the enclosingBatchGeneratorcontext — the same pattern upstream'sGenerationBatch._step()already uses.The global executor is retained for non-LM engines (TTS, STT, embedding, reranker) that still rely on
get_mlx_executor()and_init_mlx_thread.Changes
engine_core.py:EngineCore.__init__creates a per-engineThreadPoolExecutor+mx.new_thread_local_stream()and passes the stream toScheduler.close()shuts down the per-engine executor after scheduler cleanup. Added_ensure_wired_limit()so the process-globalmx.set_wired_limit()runs once rather than racing across concurrentBatchGeneratorinits.scheduler.py:Scheduler.__init__accepts an optionalstreamparameter (falls back to the module-levelgeneration_streamwhen not provided). All 37 internal references togeneration_stream— sync barriers, cache clears,mx.stream()context managers,BatchGeneratorcreation — now useself._stream.batch_generator.py(MTP patch): Removed_get_generation_stream()and the 4 explicitwith mx.stream(...)wrappers that pushed the module-level stream. MTP forwards now inherit the per-engine stream from the enclosingBatchGeneratorcontext, matchingGenerationBatch._step()'s existing pattern.Concurrent throughput
Two models generating simultaneously vs sequentially:
Sub-2x is expected — Metal command buffers still serialize on one GPU. The win is CPU-side overlap (prefill + decode can be submitted concurrently) and eliminating head-of-line blocking where one engine's long prefill stalls another's token emission.
Test plan
tests/test_per_engine_threads.py(10 tests): verifiesSchedulerstores and uses explicit streams, regex-scans theSchedulerclass body for baregeneration_streamreferences, confirms eachEngineCoregets a distinct executor/stream, validates executor shutdown onclose(), and asserts the MTP patch no longer contains_get_generation_streamor anygeneration_streamreference.tests/test_engine_core.py: existing executor tests now assertis not(distinct executors) and concurrent execution (both executors active simultaneously).Related to #1248