Skip to content

Conversation

@krishung5
Copy link
Contributor

@krishung5 krishung5 commented Nov 21, 2025

Overview:

KV event consolidator for vLLM: #3725

This PR extends the KV Event Consolidator to support TensorRT-LLM, enabling deduplication of KV cache events from TensorRT-LLM and KVBM before publishing to the router.

Key Differences from vLLM Implementation:

  • Output Transport: TensorRT-LLM uses NATS for consolidated events (vLLM uses ZMQ)
  • Block Hash Format: TensorRT-LLM uses signed i64 hashes (vLLM uses unsigned u64)
  • Event Publisher: TensorRT-LLM uses Python ZMQ publisher (vLLM uses Rust)

closes DIS-944

Details:

Configuration Flow: Port Allocation

Shows how the consolidator ZMQ port is configured for TensorRT-LLM and passed to both KVBM and Router components

graph LR
    A[Set<br/>DYN_KVBM_TRTLLM_ZMQ_PORT=20081<br/>in environment]
    
    subgraph Main["Main Process"]
        B[main.py: should_enable_consolidator<br/>checks DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR]
        C[main.py: get_consolidator_endpoints<br/>reads DYN_KVBM_TRTLLM_ZMQ_PORT from env]
        D[main.py: passes endpoint<br/>to get_publisher]
    end
    
    subgraph MPI["MPI Workers"]
        E[MPI Worker inherits<br/>DYN_KVBM_TRTLLM_ZMQ_PORT from env]
        F[Publisher reads env var<br/>creates ZmqKvEventPublisher]
    end
    
    subgraph KVBM["KVBM Components"]
        G[KvConnectorLeader<br/>reads DYN_KVBM_TRTLLM_ZMQ_PORT from env]
        H[KVBM Resources Init<br/>creates DynamoEventManager]
        I[DynamoEventManager<br/>creates & starts consolidator]
        J[KvEventConsolidator<br/>ZMQ Sub + NATS Pub]
        
        G --> H
        H --> I
        I --> J
    end
    
    K[NATS Stream<br/>namespace.dynamo.component.tensorrt_llm.kv_events]
    L[KV Router Indexer<br/>subscribes to NATS]
    
    A --> B
    A --> E
    A --> G
    B --> C
    C --> D
    D --> F
    F --> J
    J --> K
    K --> L
Loading

Initialization:

  1. Port Configuration (before TensorRT-LLM starts):

    • Sets DYN_KVBM_TRTLLM_ZMQ_PORT=20081 in environment (e.g., in launch script)
    • This environment variable is inherited by both main process and MPI workers
  2. Main Process (during TensorRT-LLM setup):

    • main.py checks should_enable_consolidator() (reads DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR, which is true by default)
    • If enabled, calls get_consolidator_endpoints() which reads DYN_KVBM_TRTLLM_ZMQ_PORT from environment
    • Passes the bind endpoint (tcp://*:20081) to get_publisher() for the main process's ZmqKvEventPublisher
  3. MPI Workers:

    • Inherit DYN_KVBM_TRTLLM_ZMQ_PORT from environment
    • Each MPI worker's Publisher reads the environment variable directly to create its own ZmqKvEventPublisher
    • All workers bind to the same port (tcp://*:20081)
  4. Consolidator Setup (during KVBM initialization):

    • Consolidator subscribes to TensorRT-LLM's ZMQ port (tcp://127.0.0.1:20081)
    • Consolidator publishes consolidated events to NATS (not ZMQ like vLLM)
    • Router subscribes to NATS stream: namespace.dynamo.component.tensorrt_llm.kv_events
  5. KVBM Integration:

    • DynamoEventManager holds a handle to the same consolidator instance
    • Can send events directly to consolidator without ZMQ

Event Flow

Shows how KV events from TensorRT-LLM (G1) and KVBM (G2/G3) are deduplicated and sent to the Router via NATS

graph LR
    subgraph Sources["Event Sources"]
        S1[TensorRT-LLM G1<br/>GPU Cache<br/>Python ZMQ Publisher]
        S2[KVBM G2 CPU<br/>KVBM G3 Disk]
    end
    
    subgraph Consolidator["KVBM <br/>KV Event Consolidator"]
        C1[ZMQ<br/>Subscriber<br/>receives i64 hashes]
        C2[DynamoEventManager<br/>Direct API]
        C1 --> C3
        C2 --> C3
        C3[CacheStatusTracker<br/>Deduplication<br/>converts i64->u64]
        C3 --> C4[NATS<br/>Publisher<br/>RouterEvent format]
    end
    
    subgraph Router["KV Router"]
        R1[NATS<br/>Subscriber]
        R1 --> R2[Radix Tree<br/>Indexer]
    end
    
    S1 -->|KV Events<br/>via ZMQ<br/>msgpack format| C1
    S2 -->|Rust API<br/>calls| C2
    C4 -->|Consolidated Events<br/>RouterEvent format| R1
Loading

When TensorRT-LLM stores a block:

  1. TensorRT-LLM's Python ZmqKvEventPublisher emits BlockStored event via ZMQ (msgpack format)
  2. Consolidator's ZMQ subscriber receives event:
    • Block Hash Handling: TensorRT-LLM sends signed i64 hashes, consolidator converts to u64 using two's complement
    • Deserializes event batch: [timestamp, [events], data_parallel_rank]
  3. Consolidator checks tracker:
    • Computes SequenceHash from token content (same algorithm as vLLM)
    • If new block: stores in tracker, publishes RouterEvent to NATS
    • If already tracked: adds TensorRT-LLM as source, skips publishing

When KVBM stores the same block:

  1. DynamoEventManager sends event directly to consolidator (Rust API)
  2. Consolidator receives event and checks tracker:
    • Computes same SequenceHash from token content
    • Block already tracked (from TensorRT-LLM): adds KVBM as source, skips publishing

Result: Router receives only one store event per unique block via NATS, avoiding duplicate indexing

When TensorRT-LLM removes a block:

  1. TensorRT-LLM's Python publisher emits BlockRemoved event via ZMQ
  2. Consolidator's ZMQ subscriber receives event:
    • Converts i64 block hash to u64 for internal tracking
  3. Consolidator looks up block in tracker using external hash
  4. Removes TensorRT-LLM from block's sources
  5. Checks remaining sources:
    • If KVBM still has the block: skips publishing (block still exists)
    • If no sources remain: removes from tracker, publishes RouterEvent to NATS

When KVBM removes the same block:

  1. DynamoEventManager sends remove event to consolidator
  2. Consolidator looks up block in tracker
  3. Removes KVBM from block's sources
  4. Checks remaining sources:
    • If no sources remain: removes from tracker, publishes RouterEvent to NATS
    • (TensorRT-LLM already removed it earlier)

Result: Router receives remove event via NATS only after block is evicted from ALL caches

Testing

  • Extended e2e test to support both vLLM and TensorRT-LLM: test_consolidator_router_e2e.py
    • Parameterized tests run for both engines
    • Validates consolidator startup, event deduplication, and router integration

Where should the reviewer start?

Core consolidator files:

  • lib/llm/src/block_manager/kv_consolidator/ - Core consolidator logic
    • config.rs - Configuration and transport inference
    • subscriber.rs - ZMQ subscriber with i64/u64 hash handling
    • publisher.rs - NATS publisher for TensorRT-LLM (ZMQ publisher for vLLM)
    • tracker.rs - Deduplication logic with EventSource::Trtllm support

TensorRT-LLM integration:

  • components/src/dynamo/trtllm/publisher.py - Python ZMQ publisher
  • components/src/dynamo/trtllm/main.py - Consolidator endpoint configuration
  • lib/bindings/kvbm/python/kvbm/trtllm_integration/ - Python connector integration
  • lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs - Rust connector leader

Configuration:

  • lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py - Endpoint configuration helper

Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)

  • closes GitHub issue: #xxx

Summary by CodeRabbit

Release Notes

  • New Features

    • Added ZMQ-based KV event consolidation support for TensorRT-LLM engines.
    • Introduced optional consolidation endpoint configuration for flexible event routing.
    • Extended support for multiple event transport mechanisms (ZMQ and NATS).
  • Dependencies

    • Added msgpack library for event serialization.
  • Tests

    • Enhanced test coverage to validate consolidation across both vLLM and TensorRT-LLM engines.

✏️ Tip: You can customize this high-level summary in your review settings.

@krishung5 krishung5 requested review from a team as code owners November 21, 2025 11:51
@github-actions github-actions bot added the feat label Nov 21, 2025
@krishung5
Copy link
Contributor Author

/ok to test 9e39854

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 21, 2025

Walkthrough

Introduces optional KV event consolidation for TensorRT-LLM via ZMQ transport alongside existing vLLM support. Configuration helpers determine consolidation enablement, optional ZMQ endpoints route events, and dual transport types (ZMQ for vLLM/TRT, NATS for TRT) are supported with flexible block hash representations.

Changes

Cohort / File(s) Summary
Python Configuration and Integration
lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py, lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py, components/src/dynamo/trtllm/main.py
Adds new consolidator_config module with helpers (is_truthy, should_enable_consolidator, get_consolidator_endpoints) to determine consolidation enablement via environment variables; extends DynamoKVBMConnectorLeader to accept and forward consolidator_trtllm_endpoint; main.py conditionally wires consolidation endpoint to publisher on initialization.
ZMQ Publisher Implementation
components/src/dynamo/trtllm/publisher.py
Introduces ZmqKvEventPublisher class for ZMQ-based KV event publishing in vLLM-compatible format; extends Publisher and get_publisher to accept optional zmq_endpoint; routes KV events to ZMQ when consolidator enabled, fallbacks to NATS; adds lifecycle management and state tracking for dual publishers.
Rust Consolidator Configuration
lib/llm/src/block_manager/kv_consolidator/config.rs, lib/llm/src/block_manager/kv_consolidator/mod.rs
Introduces ConsolidatorOutputTransport enum (Zmq, Nats); adds new_vllm and new_trtllm constructor methods to KvEventConsolidatorConfig; updates KvEventConsolidator to support dual publishers (publisher_zmq, publisher_nats) with transport-specific initialization; changes shutdown signature to consume self.
Rust Block Manager and Connectors
lib/bindings/kvbm/src/block_manager.rs, lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs, lib/bindings/kvbm/src/block_manager/vllm/connector/leader/recorder.rs, lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs
Makes consolidator_config output_endpoint optional (Option); updates BlockManagerBuilder to dispatch consolidator config creation via EventSource match; updates EventSource import paths; extends PyTrtllmKvConnectorLeader signature to accept optional consolidator_trtllm_endpoint and conditionally applies consolidation.
Rust Event Publishing and Tracking
lib/llm/src/block_manager/kv_consolidator/publisher.rs, lib/llm/src/block_manager/kv_consolidator/subscriber.rs, lib/llm/src/block_manager/kv_consolidator/tracker.rs
Adds KvEventConsolidatorPublisherNats for NATS-based KV event publishing; updates BlockHash enum to support IntU64, IntI64, and Str variants with custom deserialization and to_u64() helper; extends CacheStatusTracker to resolve parent_hash via hash_mapping when handling new blocks.
Dependencies and Tests
container/deps/requirements.txt, tests/kvbm_integration/test_consolidator_router_e2e.py
Adds msgpack dependency; parameterizes E2E tests to run with both vLLM and TensorRT-LLM engines; introduces create_trtllm_config helper; updates fixtures and test logic to be engine-agnostic with conditional commands, flags, and assertions per engine.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • High-density logic changes: Publisher implementations (ZMQ and NATS), block hash deserialization and conversion, and consolidator configuration dispatch each require careful reasoning.
  • Cross-module dependencies: Changes span Python integration, Rust block manager, publisher/subscriber layers, and require verifying correctness of routing logic (ZMQ for TRT, NATS fallback).
  • Transport abstraction: The dual-publisher pattern (publisher_zmq and publisher_nats) and ConsolidatorOutputTransport enum introduce new abstraction that must be validated across engine sources.
  • Block hash handling: The new BlockHash variants with custom deserialization and to_u64() conversion logic necessitate validation against both signed and unsigned hash sources.
  • Test parameterization: Engine-type parameterization and dual-engine fixtures increase test complexity; assertions must cover both vLLM and TensorRT-LLM paths.

Areas requiring extra attention:

  • publisher.rs: NATS publisher loop, RouterEvent conversion, and tokens_hash computation with xxhash seeding
  • mod.rs: Conditional publisher initialization and shutdown logic for dual-transport handling
  • subscriber.rs: BlockHash deserialization and variant mapping correctness for signed/unsigned/hex inputs
  • trtllm_leader.rs: Optional consolidator endpoint threading and conditionality in block manager builder setup
  • test_consolidator_router_e2e.py: Engine-type parameterization coverage and assertion correctness for both engines

Poem

🐰 A consolidator hops with grace,
ZMQ speeds through cyberspace,
Block hashes dance in triples grand,
NATS and Zmq hand in hand,
Two engines run, one message flows,
Events consolidated—onward goes! 🎉

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: extending KV Event Consolidator to support TensorRT-LLM.
Description check ✅ Passed The description includes Overview, Details, Configuration Flow with diagrams, Event Flow, Testing notes, and specific reviewer entry points. However, the 'Related Issues' section references a placeholder '#xxx' instead of the actual issue DIS-944.
Docstring Coverage ✅ Passed Docstring coverage is 96.92% which is sufficient. The required threshold is 80.00%.

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (14)
components/src/dynamo/trtllm/main.py (1)

238-255: Optional: gate consolidator setup on publish_events_and_metrics to avoid noisy logs

The consolidator endpoint discovery is always attempted, even when config.publish_events_and_metrics is false, which can yield warnings/logs about connector config and env vars in configurations that will never publish KV events.

Consider moving the trtllm_zmq_bind_endpoint setup under the if config.publish_events_and_metrics: block (or at least short‑circuiting when it’s false) so consolidator logic only runs when publishing is enabled. The current behavior is functionally fine; this is mainly to reduce confusion/noise in non‑publishing deployments.

Also applies to: 420-421

lib/llm/src/block_manager/kv_consolidator/tracker.rs (1)

341-361: Preserve original parent_hash when resolution fails to avoid regression

The new resolved_parent_hash logic correctly normalizes the parent to first_block_hash when the parent is known, fixing the cross‑source hash mismatch case described in the comment.

However, when parent_hash doesn’t exist in hash_mapping (e.g., parent event dropped or came from an unexpected source), resolved_parent_hash becomes None, whereas previously the original parent_hash would have been forwarded. That’s a mild behavior change and could make the router treat a block as root where it previously would at least see the (possibly unknown) parent ID.

You might want to fall back to the original value when resolution fails, e.g.:

let resolved_parent_hash = parent_hash.and_then(|ph| {
    self.hash_mapping
        .get(&ph)
        .and_then(|&parent_seq_hash| {
            self.blocks
                .get(&parent_seq_hash)
                .map(|parent_metadata| parent_metadata.first_block_hash.clone())
        })
        // If we can't resolve, keep the original hash
        .or(Some(ph))
});

Also consider adding a unit test that exercises the “TRTLLM parent hash” → “KVBM canonical parent hash” scenario that this block is meant to fix.

lib/llm/src/block_manager/kv_consolidator/config.rs (1)

10-17: Validate that engine_source=Kvbm is never used with this config

The new ConsolidatorOutputTransport and helpers (new_vllm, new_trtllm, output_transport) look reasonable and keep the existing struct layout intact.

output_transport uses unreachable!() for EventSource::Kvbm, assuming configs are always created via new_vllm/new_trtllm. Please double‑check that no call sites construct KvEventConsolidatorConfig with engine_source=Kvbm (e.g., via new(...) or serde) to avoid a surprising panic at runtime.

If that’s a concern, you could either assert in the constructors or map Kvbm to a specific transport instead of panicking.

Also applies to: 25-33, 58-75, 76-92

lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py (1)

4-5: Use the consolidator_trtllm_endpoint parameter or remove it to avoid confusion

DynamoKVBMConnectorLeader.__init__ accepts consolidator_trtllm_endpoint but then immediately shadows it with a local trtllm_ep computed from env vars, so any caller‑supplied endpoint is ignored.

If you intend to support an explicit override, consider:

def __init__(self, llm_args: TorchLlmArgs, consolidator_trtllm_endpoint: Optional[str] = None):
    ...
    trtllm_ep = consolidator_trtllm_endpoint
    if trtllm_ep is None:
        from kvbm.trtllm_integration.consolidator_config import is_truthy
        consolidator_enabled = is_truthy(
            os.getenv("DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR", "true")
        )
        if consolidator_enabled:
            zmq_port = os.getenv("DYN_KVBM_TRTLLM_ZMQ_PORT")
            ...

If an explicit override is not needed, dropping the parameter from the signature will make the API clearer and fix the “unused argument” warning.

Additionally, inside the except ValueError block you can use logger.exception(...) instead of logger.error(...) to automatically include the traceback for the invalid port value.

Finally, now that a module logger exists, you may eventually want to replace the print(f"KvConnectorLeader initialized with rank: {mappings.rank}") with a logger.info call, but that’s purely cosmetic.

Also applies to: 20-21, 28-32, 49-86, 87-94

lib/llm/src/block_manager/kv_consolidator/publisher.rs (1)

292-466: NATS publisher behavior is reasonable; consider reconnect/semantics tweaks and deduping hash logic

The NATS publisher implementation generally looks good:

  • Uses a slugified, TensorRT‑LLM‑specific stream (namespace.dynamo.component.tensorrt_llm.{KV_EVENT_SUBJECT}) consistent with the router’s expectations.
  • Drains CacheStatusTracker every 50ms and converts ConsolidatedEventRouterEvent, computing tokens_hash with the same xxHash3(seed=1337) scheme as the tracker.
  • Cleanly shuts down via CancellationToken.

A couple of refinements you may want to consider:

  1. Failure semantics & reconnects

    If NatsQueue::connect() fails, run_publisher_loop returns an error, the task logs it, and then exits. After that, the tracker’s event queue is never drained and consolidated events will pile up in memory while the consolidator keeps running.

    To match the ZMQ publisher’s “fail fast” behavior or to provide better resilience, you could either:

    • Panic on run_publisher_loop errors (like the ZMQ path), or
    • Add a simple reconnect/backoff loop around connect() so transient NATS issues don’t permanently disable publishing.

    Either way, clarifying the intended behavior (crash vs. degrade) would help.

  2. Unused sequence counter and duplicated hashing logic

    • sequence is created and passed into run_publisher_loop but never used (_sequence). You can drop this field entirely unless you plan to emit a sequence ID on NATS.
    • compute_tokens_hash duplicates the xxHash3 logic from the tracker. If possible, factoring this into a shared helper (or reusing compute_local_block_hash) would avoid drift if the hash seed or algorithm ever changes.

These are non‑blocking; the current implementation is functionally correct for the described TRT‑LLM KV events path.

Also applies to: 468-552

lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (1)

76-106: Harden ZMQ port parsing and align with Rust’s u16 semantics

Right now env_port is fed directly to int(env_port), with no whitespace trim or range check. A malformed value (e.g. " abc", "70000") will either crash here or silently produce an invalid port.

Consider tightening this to:

  • Strip whitespace.
  • Validate 1 <= port <= 65535 (to match the Rust side’s parse::<u16> behavior in lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs).
  • Raise a clearer error if parsing or range validation fails.

For example:

-    env_port = os.getenv("DYN_KVBM_TRTLLM_ZMQ_PORT")
-    if env_port:
-        zmq_port = int(env_port)
+    env_port = os.getenv("DYN_KVBM_TRTLLM_ZMQ_PORT")
+    if env_port:
+        try:
+            zmq_port = int(env_port.strip())
+        except ValueError as exc:
+            raise ValueError(
+                f"Invalid DYN_KVBM_TRTLLM_ZMQ_PORT value {env_port!r}; expected integer"
+            ) from exc
+        if not (0 < zmq_port < 2**16):
+            raise ValueError(
+                f"DYN_KVBM_TRTLLM_ZMQ_PORT out of range [1, 65535]: {zmq_port}"
+            )

This keeps failures explicit and behavior consistent across Python and Rust.

lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs (1)

451-475: Consider whether BlockManager worker_id should reflect the provided worker_id

KvConnectorLeader::new continues to call .worker_id(0) on BlockManagerBuilder, ignoring the worker_id: u64 argument, and PyTrtllmKvConnectorLeader::new passes that worker_id through unchanged.

If TRT‑LLM deployments ever use multiple leaders/workers concurrently, it might be clearer to forward the actual worker_id into the builder:

- let mut block_manager_builder = BlockManagerBuilder::new()
-     .worker_id(0)
+ let mut block_manager_builder = BlockManagerBuilder::new()
+     .worker_id(worker_id)

This would align the BlockManager’s notion of the worker with the Python‑side ID, similar to other connectors. If the hard‑coded 0 is intentional (e.g., single‑worker TRT‑LLM setups), a short comment explaining that assumption would also help future readers.

components/src/dynamo/trtllm/publisher.py (1)

71-110: Minor cleanups: redundant _to_signed_i64 and unused arguments

Two small nits you may want to address (low priority):

  • block_hashes and parent_hash are already run through _to_signed_i64 in _publish_kv_cache_events_task, then _to_signed_i64 is applied again inside ZmqKvEventPublisher.publish_stored / publish_removed. The second conversion is effectively a no‑op; you could drop it there and document that callers must provide signed 64‑bit ints.
  • event_id and num_block_tokens in ZmqKvEventPublisher.publish_stored, and event_id in publish_removed, are unused but required to mirror the NATS publisher’s interface. To quiet linters like Ruff, you could prefix them with _:
-    def publish_stored(
-        self,
-        event_id: int,
-        token_ids: list[int],
-        num_block_tokens: list[int],
+    def publish_stored(
+        self,
+        _event_id: int,
+        token_ids: list[int],
+        _num_block_tokens: list[int],
...
-    def publish_removed(self, event_id: int, block_hashes: list[int]):
+    def publish_removed(self, _event_id: int, block_hashes: list[int]):

This keeps the public signature stable while avoiding unused‑argument warnings.

lib/bindings/kvbm/src/block_manager.rs (1)

370-387: EventSource dispatch looks correct; clarify Kvbm branch semantics

The match on engine_source is consistent with the new multi‑engine behavior:

  • EventSource::Vllmnew_vllm(engine_ep, output_ep) with expect(...) to guard against missing output_endpoint.
  • EventSource::Trtllmnew_trtllm(engine_ep) ignoring output_ep as intended for the NATS publisher.
  • EventSource::Kvbmnew_vllm(engine_ep, output_ep) with a comment that KVBM doesn’t currently use the consolidator.

Two minor points:

  1. The expect messages are fine for now, but if these can be misconfigured by users, converting them into fallible config building (e.g., returning a Result) would produce better error messages than panics.
  2. The Kvbm arm plus comment is slightly confusing—either remove this arm until it’s actually needed, or expand the comment to explain the realistic scenario where EventSource::Kvbm would be set.

Otherwise, the wiring into config_builder.consolidator_config(consolidator_config) is clean.

tests/kvbm_integration/test_consolidator_router_e2e.py (5)

79-129: Unused engine argument in extract_consolidator_stats (Ruff ARG001)

extract_consolidator_stats(log_path: Path, engine: str = "vllm") currently ignores engine. That’s harmless but triggers Ruff ARG001 and is a minor readability wart.

Two simple options:

  • If you don’t need engine‑specific parsing yet, drop the parameter:

  • def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:

  • def extract_consolidator_stats(log_path: Path) -> dict:
    
    and update call sites accordingly.
    
    
  • If you intend to keep it for future divergence between vLLM/TRT‑LLM logs, make the intent explicit and silence Ruff, e.g.:

    def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
        _ = engine  # reserved for future engine-specific parsing
        ...

Either way, it’s worth resolving so the tests stay Ruff‑clean.


147-160: Broad except Exception in wait_for_worker_registration (Ruff BLE001)

Catching Exception in the health‑poll loop is understandable to keep retrying, but Ruff flags it (BLE001) and it also risks hiding non‑network bugs.

You can narrow this without changing behavior much:

-        try:
-            response = requests.get(f"{frontend_url}/health", timeout=5)
-            health_data = response.json()
-            ...
-        except Exception as e:
-            logger.debug(f"Health check failed: {e}")
+        try:
+            response = requests.get(f"{frontend_url}/health", timeout=5)
+            health_data = response.json()
+            ...
+        except (requests.RequestException, ValueError) as e:
+            logger.debug(f"Health check failed: {e}")

This still covers network/HTTP errors and basic JSON parsing issues while letting unexpected exceptions surface.


224-233: Engine‑parameterized fixtures are well‑structured; handle Ruff’s unused runtime_services

  • engine_type fixture and the engine‑aware llm_worker fixture cleanly abstract over vLLM vs TRT‑LLM while sharing startup and log handling logic.
  • The tester fixture now correctly uses llm_worker["model_id"], guaranteeing the frontend requests target the actual running model.

Regarding Ruff’s ARG001 on runtime_services in llm_worker:

  • The parameter is intentionally unused value‑wise but required to ensure the runtime_services fixture has run (NATS/ETCD, etc.).

  • To keep that dependency while appeasing Ruff, add a no‑op use:

    def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
        _ = runtime_services  # ensure fixture runs; value unused
        ...

Same pattern applies for runtime_services in test_remove_deduplication_across_sources.

Also applies to: 236-338, 340-346


489-582: STORE deduplication log parsing is engine‑agnostic and robust

The test_store_deduplication_across_sources logic:

  • Uses order‑agnostic regexes to count:

    • First‑source STORE events that will be published.
    • Second‑source DEDUP events.
    • Total STORE events and published STORE events.
  • Asserts:

    • Both first‑source and DEDUP events are present.
    • published_stores == first_source_stores.
    • total_stores_received == first_source_stores + dedup_stores.

Given the consolidator’s logs are shared across vLLM and TRT‑LLM, this should hold for both engines and nicely validates cross‑source deduplication.

Just be aware that if log message formats change (e.g., different wording around DEDUP), these regexes will need to be updated to keep tests stable.


583-803: REMOVE deduplication test correctly covers both engines; minor duplication of engine availability checks

  • test_remove_deduplication_across_sources parametrized over engine_type plus the explicit HAS_VLLM/HAS_TRTLLM skips ensures this heavier test runs for each available engine.

  • Worker startup:

    • vLLM: uses --num-gpu-blocks-override 30 plus KVBM CPU/Disk override env vars.
    • TRT‑LLM: relies on the TRT config + KVBM overrides and DYN_KVBM_TRTLLM_ZMQ_PORT.
  • Log analysis uses engine‑agnostic regexes for:

    • “removed from source X, still in N source(s)” (dedup keeps block in router).
    • “removed from last source X … will publish REMOVE event”.
  • Assertions ensure:

    • Some removes still present in other sources.
    • Some REMOVE events published.
    • Published removes equal last‑source removes.
    • No errors in worker/frontend logs.

Two small improvements:

  1. The engine availability checks at lines 603‑607 duplicate the logic encoded in the engine_type fixture; consider factoring that into a shared helper or reusing the fixture’s logic to avoid drift.
  2. Apply the same _ = runtime_services trick here to resolve Ruff’s ARG002 without changing fixture behavior.

Otherwise the test design is solid and aligns with the described REMOVE dedup semantics.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d821a8b and 9e39854.

📒 Files selected for processing (15)
  • components/src/dynamo/trtllm/main.py (2 hunks)
  • components/src/dynamo/trtllm/publisher.py (12 hunks)
  • container/deps/requirements.txt (1 hunks)
  • lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py (3 hunks)
  • lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (1 hunks)
  • lib/bindings/kvbm/src/block_manager.rs (5 hunks)
  • lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs (2 hunks)
  • lib/bindings/kvbm/src/block_manager/vllm/connector/leader/recorder.rs (3 hunks)
  • lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs (5 hunks)
  • lib/llm/src/block_manager/kv_consolidator/config.rs (2 hunks)
  • lib/llm/src/block_manager/kv_consolidator/mod.rs (5 hunks)
  • lib/llm/src/block_manager/kv_consolidator/publisher.rs (2 hunks)
  • lib/llm/src/block_manager/kv_consolidator/subscriber.rs (5 hunks)
  • lib/llm/src/block_manager/kv_consolidator/tracker.rs (1 hunks)
  • tests/kvbm_integration/test_consolidator_router_e2e.py (19 hunks)
🧰 Additional context used
🧠 Learnings (13)
📓 Common learnings
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 2989
File: lib/llm/src/block_manager/distributed/transfer.rs:6-6
Timestamp: 2025-09-18T21:47:44.143Z
Learning: For PR ai-dynamo/dynamo#2989, the ConnectorTransferBatcher architectural issues will be addressed in a follow-up PR by removing the duplicate batching logic and integrating distributed transfers with the existing TransferBatcher + LocalTransferManager pipeline, rather than adding bounded concurrency primitives like Semaphore.
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

Applied to files:

  • lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs
  • lib/llm/src/block_manager/kv_consolidator/subscriber.rs
  • lib/llm/src/block_manager/kv_consolidator/tracker.rs
📚 Learning: 2025-08-25T23:24:42.076Z
Learnt from: tzulingk
Repo: ai-dynamo/dynamo PR: 2666
File: components/backends/trtllm/src/dynamo/trtllm/publisher.py:0-0
Timestamp: 2025-08-25T23:24:42.076Z
Learning: WorkerMetricsPublisher.create_endpoint method signature has been updated in _core.pyi to include metrics_labels parameter: `def create_endpoint(self, component: str, metrics_labels: Optional[List[Tuple[str, str]]] = None)`, making the metrics_labels parameter optional with default value of None.

Applied to files:

  • components/src/dynamo/trtllm/publisher.py
📚 Learning: 2025-08-29T09:53:45.266Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2756
File: tests/router/test_router_e2e_with_mockers.py:961-974
Timestamp: 2025-08-29T09:53:45.266Z
Learning: Indexer dumps in the KV router system are designed to never contain remove or clear events - they only contain "stored" events. Therefore, code that processes indexer dump events can safely assume the presence of event["event"]["data"]["stored"] structure without additional error handling for other event types.

Applied to files:

  • components/src/dynamo/trtllm/publisher.py
📚 Learning: 2025-06-05T01:04:24.775Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.

Applied to files:

  • components/src/dynamo/trtllm/publisher.py
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
Repo: ai-dynamo/dynamo PR: 1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/mod.rs
  • lib/llm/src/block_manager/kv_consolidator/publisher.rs
  • lib/llm/src/block_manager/kv_consolidator/tracker.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/mod.rs
  • lib/llm/src/block_manager/kv_consolidator/publisher.rs
📚 Learning: 2025-07-14T23:01:16.218Z
Learnt from: biswapanda
Repo: ai-dynamo/dynamo PR: 1890
File: examples/vllm/deploy/agg.yaml:63-70
Timestamp: 2025-07-14T23:01:16.218Z
Learning: In vLLM worker deployments, grep-based log checks for "VllmWorker.*has been initialized" are appropriate for readiness probes to verify worker startup, but should not be used for liveness probes which need to detect ongoing worker health.

Applied to files:

  • tests/kvbm_integration/test_consolidator_router_e2e.py
📚 Learning: 2025-07-25T22:34:11.384Z
Learnt from: nnshah1
Repo: ai-dynamo/dynamo PR: 2124
File: components/backends/vllm/deploy/disagg.yaml:54-60
Timestamp: 2025-07-25T22:34:11.384Z
Learning: In vLLM worker deployments, startup probes (with longer periods and higher failure thresholds like periodSeconds: 10, failureThreshold: 60) are used to handle the slow model loading startup phase, while liveness probes are intentionally kept aggressive (periodSeconds: 5, failureThreshold: 1) for quick failure detection once the worker is operational. This pattern separates startup concerns from operational health monitoring in GPU-heavy workloads.

Applied to files:

  • tests/kvbm_integration/test_consolidator_router_e2e.py
📚 Learning: 2025-06-13T22:07:24.843Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/publisher.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/publisher.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/publisher.rs
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.

Applied to files:

  • lib/llm/src/block_manager/kv_consolidator/tracker.rs
🧬 Code graph analysis (11)
lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs (3)
lib/bindings/kvbm/src/block_manager.rs (5)
  • new (94-205)
  • new (258-263)
  • worker_id (265-268)
  • page_size (269-272)
  • leader (273-276)
lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs (2)
  • new (91-195)
  • new (572-607)
lib/llm/src/block_manager/kv_consolidator/config.rs (1)
  • new (46-56)
components/src/dynamo/trtllm/main.py (2)
lib/bindings/kvbm/src/block_manager.rs (1)
  • consolidator_config (290-298)
lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (2)
  • get_consolidator_endpoints (76-106)
  • should_enable_consolidator (28-73)
components/src/dynamo/trtllm/publisher.py (3)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (716-766)
lib/llm/src/block_manager/kv_consolidator/subscriber.rs (1)
  • data_parallel_rank (40-42)
lib/llm/src/block_manager/kv_consolidator/mod.rs (1)
  • shutdown (147-168)
lib/bindings/kvbm/src/block_manager/vllm/connector/leader/recorder.rs (1)
lib/bindings/kvbm/src/block_manager.rs (1)
  • worker_id (265-268)
lib/llm/src/block_manager/kv_consolidator/mod.rs (2)
lib/llm/src/block_manager/kv_consolidator/config.rs (2)
  • output_transport (64-74)
  • new (46-56)
lib/llm/src/block_manager/kv_consolidator/publisher.rs (4)
  • new (150-175)
  • new (307-347)
  • shutdown (178-184)
  • shutdown (350-356)
lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py (3)
lib/bindings/kvbm/python/kvbm/utils.py (1)
  • is_dyn_runtime_enabled (8-20)
lib/bindings/kvbm/src/block_manager.rs (2)
  • consolidator_config (290-298)
  • leader (273-276)
lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (1)
  • is_truthy (14-25)
lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (1)
lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs (1)
  • val (662-662)
tests/kvbm_integration/test_consolidator_router_e2e.py (2)
tests/kvbm_integration/common.py (2)
  • ApiTester (51-130)
  • check_logs_for_patterns (25-48)
tests/utils/managed_process.py (2)
  • ManagedProcess (71-568)
  • log_path (98-100)
lib/llm/src/block_manager/kv_consolidator/publisher.rs (4)
lib/llm/src/block_manager/kv_consolidator/mod.rs (3)
  • tracker (171-173)
  • new (90-102)
  • shutdown (147-168)
lib/llm/src/block_manager/kv_consolidator/tracker.rs (3)
  • new (155-163)
  • new (235-241)
  • drain_events (501-510)
lib/runtime/src/slug.rs (1)
  • slugify (28-38)
lib/runtime/src/transports/nats.rs (1)
  • new_without_consumer (463-481)
lib/bindings/kvbm/src/block_manager.rs (1)
lib/llm/src/block_manager/kv_consolidator/config.rs (2)
  • new_vllm (77-83)
  • new_trtllm (86-92)
lib/llm/src/block_manager/kv_consolidator/subscriber.rs (2)
lib/llm/src/block_manager/kv_consolidator/publisher.rs (6)
  • h (88-88)
  • h (490-490)
  • block_hash (81-82)
  • block_hash (127-127)
  • block_hash (483-484)
  • block_hash (518-518)
lib/llm/src/mocker/sequence.rs (1)
  • block_hashes (111-117)
🪛 Ruff (0.14.5)
components/src/dynamo/trtllm/publisher.py

73-73: Unused method argument: event_id

(ARG002)


75-75: Unused method argument: num_block_tokens

(ARG002)


100-100: Unused method argument: event_id

(ARG002)

lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py

31-31: Unused method argument: consolidator_trtllm_endpoint

(ARG002)


70-73: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py

96-96: Avoid specifying long messages outside the exception class

(TRY003)

tests/kvbm_integration/test_consolidator_router_e2e.py

79-79: Unused function argument: engine

(ARG001)


157-157: Do not catch blind exception: Exception

(BLE001)


236-236: Unused function argument: runtime_services

(ARG001)


585-585: Unused method argument: runtime_services

(ARG002)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: clippy (launch/dynamo-run)
  • GitHub Check: Build and Test - dynamo
  • GitHub Check: clippy (lib/bindings/python)
  • GitHub Check: tests (lib/bindings/python)
  • GitHub Check: tests (launch/dynamo-run)
  • GitHub Check: clippy (.)
  • GitHub Check: clippy (lib/runtime/examples)
  • GitHub Check: tests (lib/runtime/examples)
  • GitHub Check: tests (.)
🔇 Additional comments (20)
container/deps/requirements.txt (1)

19-19: Dependency addition verified and correctly placed.

The msgpack addition is correctly alphabetized and actively used in the new TensorRT-LLM consolidator code (components/src/dynamo/trtllm/publisher.py:129), where it serializes batch data for ZMQ message publishing. The unpinned version aligns with other general-purpose libraries in the file.

lib/bindings/kvbm/src/block_manager/vllm/connector/leader.rs (1)

25-26: Consolidator wiring for vLLM looks consistent with new API

Using kv_consolidator::EventSource and passing Some(output_ep) into BlockManagerBuilder::consolidator_config with EventSource::Vllm matches the updated builder signature and keeps recorder and non‑recorder leaders aligned.

No changes needed here.

Also applies to: 147-151

lib/bindings/kvbm/src/block_manager/vllm/connector/leader/recorder.rs (1)

6-6: Recorder leader’s consolidator and metrics identifiers match non‑recorder path

The recorder now:

  • Uses kv_consolidator::EventSource and passes EventSource::Vllm into consolidator_config with Some(output_ep), matching the non‑recorder leader.
  • Supplies Some(format!("worker-{}", worker_id)) to ConnectorSlotManager::new, aligning recorder metrics/identifiers with the regular leader.

This keeps both code paths consistent with the new consolidator API.

Also applies to: 156-160, 176-177

lib/llm/src/block_manager/kv_consolidator/publisher.rs (1)

64-137: Hash/token conversion for vLLM publisher looks sound but will drop non‑numeric hashes

Event::from_consolidated correctly:

  • Parses block_hash/parent_hash as u64 for vLLM compatibility.
  • Clamps token_ids and block_size to i32::MAX with warnings.

Be aware this means any consolidated event whose block_hash or parent_hash is not a decimal u64 string will be dropped (logged as an error and filtered out). That matches existing expectations for vLLM/TRTLLM/KVBM where hashes are numeric, but if another source ever injects non‑numeric hashes, they’ll silently never reach the router.

No change required; just calling this out because the behavior is now explicit.

lib/bindings/kvbm/python/kvbm/trtllm_integration/consolidator_config.py (1)

28-73: Consolidator enablement logic is robust and conservative

The env override + structural validation of arg_map/kv_connector_config/connector_module gives a clear “opt‑out” path and avoids surprising auto‑enables when TRT KVBM isn’t wired. This looks correct and matches the described behavior for optional consolidation.

lib/llm/src/block_manager/kv_consolidator/subscriber.rs (2)

107-151: BlockHash conversions for signed/unsigned integers are consistent with TRT‑LLM requirements

The BlockHash::IntU64 / IntI64 split plus to_u64 using *n as u64 for IntI64 cleanly normalizes vLLM u64 and TRT‑LLM i64 hashes into a single internal u64 space. Combined with the Display impl, this gives consistent string keys for the tracker and downstream publisher.

Aside from the Str handling called out separately, this looks correct and aligns with the two’s‑complement representation TRT‑LLM uses.


331-371: Use normalized hashes consistently when interacting with the tracker

The updated use of BlockHash::to_u64().to_string() for both:

  • parent_block_hash chaining, and
  • handle_store / handle_remove calls

ensures the tracker always sees a canonical decimal u64 string, regardless of whether the engine sent u64, i64, or hex strings. This is important for cross‑engine deduplication and for the publisher, which parses u64 from these strings.

Once the Str parsing behavior is hardened, this path looks solid.

lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs (1)

67-153: TRT‑LLM consolidator wiring into BlockManagerBuilder looks correct

Passing consolidator_trtllm_endpoint through to BlockManagerBuilder::consolidator_config(trtllm_ep, None, EventSource::Trtllm) only when Some nicely keeps the consolidator optional and tags the source correctly. The async init keeps the existing pattern (wait on leader.wait_worker_sync_ready(); then build and install the slot manager) and just augments the builder when the endpoint is present.

This matches the intended flow where TRT‑LLM events are pushed into the consolidator via ZMQ and then emitted to NATS.

components/src/dynamo/trtllm/publisher.py (5)

41-151: ZMQ KV publisher format and wiring match consolidator expectations

ZmqKvEventPublisher’s design aligns with the Rust subscriber:

  • Binds a PUB socket on endpoints like "tcp://*:20081", while the subscriber connects to "tcp://127.0.0.1:PORT".
  • Emits msgpack batches [timestamp, [event], data_parallel_rank], exactly what VllmEventBatch in subscriber.rs deserializes.
  • Encodes BlockStored / BlockRemoved events with integer block hashes (after _to_signed_i64), which deserialize cleanly into the new BlockHash::{IntU64, IntI64} variants.

The shutdown path that closes the socket and terminates the context from Publisher.cleanup() looks correct and avoids leaving stray ZMQ resources around.


230-311: Conditional ZMQ vs NATS publisher selection keeps behavior backward‑compatible

The Publisher.__init__/initialize flow treats zmq_endpoint as a feature flag:

  • When zmq_endpoint is provided, a ZmqKvEventPublisher is created and kv_event_publisher is left as None, with a clear log message that the consolidator will handle publishing to NATS.
  • When zmq_endpoint is absent, the existing KvEventPublisher (NATS) path is used unchanged.

This is a clean split that preserves the pre‑existing behavior for deployments without consolidation while enabling the new TRT‑LLM → ZMQ → consolidator → NATS flow when configured.


434-537: Event routing logic correctly targets ZMQ when consolidation is enabled

The updated _publish_kv_cache_events_task:

  • Validates that at least one publisher (kv_event_publisher or zmq_kv_event_publisher) is available before entering the async loop.
  • For "stored" events, builds token_ids, num_block_tokens, and signed block_hashes, then:
    • Sends to ZmqKvEventPublisher.publish_stored when consolidator is enabled.
    • Falls back to KvEventPublisher.publish_stored otherwise.
  • For "removed" events, filters out partial blocks as before and routes to the appropriate publisher.

This cleanly reuses the existing filtering/partial‑block logic and just switches transport based on configuration, which is exactly what we want for this PR.


563-587: ZMQ publisher cleanup is integrated into Publisher lifecycle

Adding a ZMQ shutdown in Publisher.cleanup ensures the ZMQ resources are torn down along with the worker threads:

if self.zmq_kv_event_publisher:
    self.zmq_kv_event_publisher.shutdown()

This matches the context‑manager semantics in get_publisher and avoids resource leaks when TRT‑LLM components are stopped or restarted.


625-643: get_publisher context cleanly threads zmq_endpoint into Publisher

Extending get_publisher to accept zmq_endpoint and pass it through to Publisher(...) keeps the external API simple while enabling the consolidator wiring in components/src/dynamo/trtllm/main.py. The async context’s finally block already calls cleanup(), so ZMQ teardown is also covered.

lib/llm/src/block_manager/kv_consolidator/mod.rs (3)

13-101: Multi-transport consolidator state is well-structured

Refactoring KvEventConsolidator to hold separate publisher_zmq and publisher_nats options, and re-exporting ConsolidatorOutputTransport, makes the transport decision explicit and keeps the public surface tidy. Initializing both fields to None in new keeps construction simple, with start responsible for setting the appropriate one.


104-144: Transport-aware startup correctly chooses ZMQ vs NATS publishers

start() now logs the chosen transport and then:

  • For ConsolidatorOutputTransport::Zmq, creates a KvEventConsolidatorPublisher bound to config.consolidated_event_endpoint, stores it in publisher_zmq, and waits briefly for downstream subscribers to connect.
  • For ConsolidatorOutputTransport::Nats, creates a KvEventConsolidatorPublisherNats tied to the shared tracker and stores it in publisher_nats.

The subsequent ZMQ subscriber startup is unchanged and remains engine-agnostic, keyed only by engine_event_endpoint and engine_source. This matches the desired behavior: vLLM ⇒ ZMQ output, TRT‑LLM ⇒ NATS output.


146-167: Shutdown sequence cleanly stops subscriber and both publisher types

The new shutdown(self):

  • Cancels the listener via the CancellationToken.
  • Aborts and awaits the subscriber handle if present.
  • Awaits shutdown on whichever publishers (publisher_zmq and/or publisher_nats) were initialized.

This ensures all background tasks are terminated and avoids leaving dangling ZMQ/NATS tasks around the runtime. The move-by-value signature also prevents accidental reuse after shutdown.

tests/kvbm_integration/test_consolidator_router_e2e.py (3)

9-11: Engine‑agnostic test description and availability checks look good

  • Updated docstring explicitly mentions vLLM/TensorRT‑LLM and matches the new behavior.
  • HAS_VLLM / HAS_TRTLLM flags plus pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), ...) ensure the test suite runs if at least one engine is installed, which is exactly what you want here.

No changes needed.

Also applies to: 30-41


362-377: Basic and concurrent consolidator flow tests look correct for both engines

  • assert_no_errors_in_logs now tags worker logs with the engine name (VLLM Worker / TRTLLM Worker), which improves diagnostics while reusing the existing error patterns.

  • test_basic_consolidator_flow and test_consolidator_handles_concurrent_requests correctly:

    • Derive engine from llm_worker["engine"].
    • Exercise both engines via shared tester and llm_worker fixtures.
    • Assert consolidator startup and non‑zero store_events / published_events in logs.

This is a good, engine‑agnostic smoke test layer for the consolidator and router.

Also applies to: 405-447, 449-487


58-76: TRT‑LLM config helper and CLI wiring are consistent with the KVBM connector design

  • create_trtllm_config writes a per‑test trtllm_config.yaml under the test directory and sets:

    • kv_connector_config.connector_module = "kvbm.trtllm_integration.connector"
    • connector_scheduler_class = "DynamoKVBMConnectorLeader"
    • connector_worker_class = "DynamoKVBMConnectorWorker"

    which aligns with the TRT‑LLM KVBM integration specification.

  • Both in llm_worker and test_remove_deduplication_across_sources, the TRT‑LLM worker command passes:

    python -m dynamo.trtllm \
      --model-path MODEL \
      --served-model-name MODEL \
      --extra-engine-args /abs/path/trtllm_config.yaml \
      --publish-events-and-metrics

    plus DYN_KVBM_TRTLLM_ZMQ_PORT in the environment, matching the PR objectives.

The dynamo.trtllm CLI accepts a single YAML path via --extra-engine-args, and the config keys used (backend, cuda_graph_config, kv_cache_config, kv_connector_config) are confirmed to match the latest TensorRT‑LLM integration requirements.

lib/bindings/kvbm/src/block_manager.rs (1)

9-11: No breaking API change—cache_stats visibility was already internal

The cache_stats module is not re-exported anywhere in the codebase and is not exposed through lib.rs. All current uses are internal to the kvbm crate (e.g., in slot.rs). Changing pub mod cache_stats; to mod cache_stats; does not affect the public API, as the module was never part of the external bindings surface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants