diff --git a/tests/fault_tolerance/cancellation/test_trtllm.py b/tests/fault_tolerance/cancellation/test_trtllm.py index 609ee3918d..ecfac89d37 100644 --- a/tests/fault_tolerance/cancellation/test_trtllm.py +++ b/tests/fault_tolerance/cancellation/test_trtllm.py @@ -362,3 +362,94 @@ def test_request_cancellation_trtllm_prefill_cancel( logger.info( "Completion request cancellation during prefill phase detected successfully" ) + + +@pytest.mark.trtllm_marker +@pytest.mark.gpu_1 +@pytest.mark.e2e +@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) +def test_request_cancellation_trtllm_kv_transfer_cancel( + request, runtime_services, predownload_models +): + """ + End-to-end test for request cancellation during prefill to decode KV transfer phase. + + This test verifies that when a request is cancelled by the client during the KV transfer phase, + the system properly handles the cancellation and cleans up resources on the workers. + """ + + # Step 1: Start the frontend + with DynamoFrontendProcess(request) as frontend: + logger.info("Frontend started successfully") + + # Step 2: Start the prefill worker + with DynamoWorkerProcess(request, mode="prefill") as prefill_worker: + logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") + + # Step 3: Start the decode worker + with DynamoWorkerProcess(request, mode="decode") as decode_worker: + logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") + + # TODO: Why wait after worker ready fixes frontend 404 / 500 flakiness? + time.sleep(2) + + # Step 4: Test request cancellation during KV transfer phase + logger.info( + "Testing completion request cancellation during KV transfer phase..." + ) + + # Send request with long prompt + cancellable_req = send_cancellable_request( + "completion", use_long_prompt=True + ) + + # Poll for "Prefill Request ID" pattern in prefill worker + request_id, prefill_log_offset = poll_for_pattern( + process=prefill_worker, + pattern="Prefill Request ID: ", + match_type="contains", + ) + + # Poll for decode worker entry signaling start of KV transfer phase + _, decode_log_offset = poll_for_pattern( + process=decode_worker, + pattern=f"Decode Request ID: {request_id}", + poll_interval_ms=2, + ) + + # Cancel during KV transfer phase in decode worker + cancellable_req.cancel() + logger.info( + f"Cancelled request ID: {request_id} at beginning of decode" + ) + + # Poll for "Aborted Request ID" in decode worker + _, decode_log_offset = poll_for_pattern( + process=decode_worker, + pattern=f"Aborted Request ID: {request_id}", + log_offset=decode_log_offset, + ) + + # Verify frontend log has kill message + _, frontend_log_offset = poll_for_pattern( + process=frontend, + pattern="issued control message Kill to sender", + ) + + logger.info( + "Completion request cancellation at beginning of decode detected successfully" + ) + + # Verify the workers are still functional + cancellable_req = send_cancellable_request("chat_completion_stream") + _, decode_log_offset = poll_for_pattern( + process=decode_worker, + pattern="Decode Request ID: ", + log_offset=decode_log_offset, + match_type="contains", + ) + read_streaming_responses(cancellable_req, expected_count=5) + + logger.info( + "Workers are functional after cancellation during KV transfer" + )