From 9a9815eba62a4063d746286f040bf54826615220 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 20:18:23 +0530 Subject: [PATCH 01/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 101 +++++++++++++++++++ src/ensemble_scheduler/ensemble_scheduler.h | 6 ++ 2 files changed, 107 insertions(+) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index ca6535ddf..ea7145706 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -28,6 +28,7 @@ #include "ensemble_scheduler.h" +#include #include #include "cuda_utils.h" @@ -370,6 +371,13 @@ class EnsembleContext { size_t inflight_step_counter_; + // Backpressure support: Limits memory growth from decoupled models. + // Tracks inflight responses per step; blocks producers when downstream + // consumers are overloaded. Only active if max_inflight_responses_ > 0. + std::vector step_inflight_response_counts_; + std::vector> step_mutexes_; + std::vector> step_cvs_; + // pointer that either points to 'pruned_tensor_to_step_' or to // 'info_->tensor_to_step_' if all ensemble outputs are requested std::unordered_map>* tensor_to_step_; @@ -505,6 +513,20 @@ EnsembleContext::EnsembleContext( } } + // Initialize backpressure tracking if enabled. + size_t num_steps = info_->steps_.size(); + step_inflight_response_counts_.resize(num_steps, 0); + + if (info_->max_inflight_responses_ > 0) { + step_mutexes_.resize(num_steps); + step_cvs_.resize(num_steps); + + for (size_t i = 0; i < num_steps; i++) { + step_mutexes_[i].reset(new std::mutex()); + step_cvs_[i].reset(new std::condition_variable()); + } + } + if (ensemble_status_.IsOk()) { request_id_ = lrequest->Id(); correlation_id_ = lrequest->CorrelationId(); @@ -669,6 +691,46 @@ EnsembleContext::ResponseComplete( auto pool = step_raw_ptr->ctx_->CallbackPool(); auto fn = [response, flags, step_raw_ptr]() { auto step_ptr = std::unique_ptr(step_raw_ptr); + auto& context = step_ptr->ctx_; + size_t this_step_idx = step_ptr->step_idx_; + const auto& istep = context->info_->steps_[this_step_idx]; + + // Block this producer if downstream consumers are overloaded. + // Prevents memory exhaustion by limiting concurrent inflight responses. + if (context->info_->max_inflight_responses_ > 0 && + !context->step_cvs_.empty()) { + for (const auto& output_pair : istep.output_to_tensor_) { + const auto& tensor_name = output_pair.second; + const auto& downstream_steps = (*context->tensor_to_step_)[tensor_name]; + + for (const auto& downstream_step_idx : downstream_steps) { + std::unique_lock lk( + *context->step_mutexes_[downstream_step_idx]); + + // Block if downstream inflight count >= limit. Timeout after 300s to + // prevent any deadlock. Unblocks when downstream completes a request. + auto timeout = std::chrono::seconds(300); + bool capacity_available = + context->step_cvs_[downstream_step_idx]->wait_for( + lk, timeout, [&] { + return context->step_inflight_response_counts_ + [downstream_step_idx] < + context->info_->max_inflight_responses_; + }); + + if (!capacity_available) { + LOG_ERROR + << "[Internal Error] Ensemble '" + << context->info_->ensemble_name_ << "' step " << this_step_idx + << " blocked waiting for downstream step " + << downstream_step_idx << " (inflight: " + << context->step_inflight_response_counts_[downstream_step_idx] + << " >= limit: " << context->info_->max_inflight_responses_ + << ") for 300 seconds. Proceeding to avoid deadlock."; + } + } + } + } step_ptr->response_flags_ = flags; step_ptr->response_ = response; @@ -907,6 +969,15 @@ EnsembleContext::UpdateEnsembleState( if (completed_step->response_flags_ & TRITONSERVER_RESPONSE_COMPLETE_FINAL) { inflight_step_counter_--; + + size_t completed_step_idx = completed_step->step_idx_; + step_inflight_response_counts_[completed_step_idx]--; + + // Notify any producer threads blocked waiting for this step's capacity + if (info_->max_inflight_responses_ > 0 && !step_cvs_.empty()) { + std::lock_guard lk(*step_mutexes_[completed_step_idx]); + step_cvs_[completed_step_idx]->notify_one(); + } } RETURN_IF_ERROR(ConsumeResponse(completed_step)); updated_tensors->swap(completed_step->updated_tensors_); @@ -950,6 +1021,10 @@ EnsembleContext::GetNextSteps( for (const auto& idx : next_step_idx) { steps->emplace_back(); RETURN_IF_ERROR(InitStep(idx.first, idx.second, &(steps->back()))); + + // Track as inflight. Checked by producers for backpressure; decremented on + // completion. + step_inflight_response_counts_[idx.first]++; } inflight_step_counter_ += steps->size(); @@ -1602,6 +1677,32 @@ EnsembleScheduler::EnsembleScheduler( } } callback_pool_ = is_->EnsembleCallbackPool(); + + // Parse backpressure configuration. Limits concurrent responses from + // decoupled steps to prevent memory growth. + if (config.parameters().contains("max_ensemble_inflight_responses")) { + const auto& param = + config.parameters().at("max_ensemble_inflight_responses"); + const std::string& value = param.string_value(); + try { + const int64_t size = std::stoll(value); + if (size > 0) { + info_->max_inflight_responses_ = static_cast(size); + LOG_INFO << "Ensemble model '" << config.name() + << "' configured with max_ensemble_inflight_responses: " + << info_->max_inflight_responses_; + } else { + LOG_ERROR + << "Ignoring 'max_ensemble_inflight_responses' for ensemble model '" + << config.name() << "': value must be positive, got " << size; + } + } + catch (const std::invalid_argument& ia) { + LOG_ERROR + << "Failed to parse 'max_ensemble_inflight_responses' for ensemble '" + << config.name() << "': " << ia.what(); + } + } } EnsembleScheduler::~EnsembleScheduler() diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 9527ab802..33bc5c109 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -83,6 +83,12 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; + + // Maximum concurrent inflight responses from steps to downstream + // consumers. Prevents memory growth by blocking producers when limit reached. + // Value of 0 means unlimited (default). Configured via parameter + // 'max_ensemble_inflight_responses' in ensemble config.pbtxt. + size_t max_inflight_responses_ = 0; }; // Scheduler that implements ensemble scheduling. From 4c8162483c06c71d60d93e5692271f887e3f2c60 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:32:47 +0530 Subject: [PATCH 02/20] Update src/ensemble_scheduler/ensemble_scheduler.cc Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/ensemble_scheduler/ensemble_scheduler.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index ea7145706..136fa3056 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -522,8 +522,8 @@ EnsembleContext::EnsembleContext( step_cvs_.resize(num_steps); for (size_t i = 0; i < num_steps; i++) { - step_mutexes_[i].reset(new std::mutex()); - step_cvs_[i].reset(new std::condition_variable()); + step_mutexes_[i] = std::make_unique(); + step_cvs_[i] = std::make_unique(); } } From 7b5af6641d3a57c13c2e7c37ff7062125ee6ab4a Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:46:22 +0530 Subject: [PATCH 03/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 136fa3056..ab3d1c2a3 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -46,6 +46,9 @@ class EnsembleContext; using IterationCount = size_t; +// Timeout for mutex blocking to prevent potential deadlocks +constexpr int kMutexTimeoutSeconds = 300; + // Check if the model is configured to preserve the order of responses. // This is critical for async execution of ResponseComplete callbacks. inline bool @@ -707,9 +710,9 @@ EnsembleContext::ResponseComplete( std::unique_lock lk( *context->step_mutexes_[downstream_step_idx]); - // Block if downstream inflight count >= limit. Timeout after 300s to - // prevent any deadlock. Unblocks when downstream completes a request. - auto timeout = std::chrono::seconds(300); + // Block if downstream inflight count >= limit. Timeout to prevent + // potential deadlock. Unblocks when downstream completes a request. + auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); bool capacity_available = context->step_cvs_[downstream_step_idx]->wait_for( lk, timeout, [&] { @@ -726,7 +729,8 @@ EnsembleContext::ResponseComplete( << downstream_step_idx << " (inflight: " << context->step_inflight_response_counts_[downstream_step_idx] << " >= limit: " << context->info_->max_inflight_responses_ - << ") for 300 seconds. Proceeding to avoid deadlock."; + << ") for " << kMutexTimeoutSeconds + << " seconds. Proceeding to avoid deadlock."; } } } @@ -1697,10 +1701,10 @@ EnsembleScheduler::EnsembleScheduler( << config.name() << "': value must be positive, got " << size; } } - catch (const std::invalid_argument& ia) { + catch (const std::exception& e) { LOG_ERROR << "Failed to parse 'max_ensemble_inflight_responses' for ensemble '" - << config.name() << "': " << ia.what(); + << config.name() << "': " << e.what(); } } } From 9558ae81637f5a84f296c1e48d925a524e4f1fd5 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 15:28:53 +0530 Subject: [PATCH 04/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 28 ++++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index ab3d1c2a3..6854c7d53 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -711,17 +711,25 @@ EnsembleContext::ResponseComplete( *context->step_mutexes_[downstream_step_idx]); // Block if downstream inflight count >= limit. Timeout to prevent - // potential deadlock. Unblocks when downstream completes a request. + // potential deadlock. Unblocks when downstream completes a request + // or request is cancelled. auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); + auto cancelled = [&]() { + auto& req = context->request_tracker_->Request(); + return (req == nullptr) || req->IsCancelled(); + }; + bool capacity_available = context->step_cvs_[downstream_step_idx]->wait_for( lk, timeout, [&] { - return context->step_inflight_response_counts_ - [downstream_step_idx] < - context->info_->max_inflight_responses_; + return cancelled() || + (context->step_inflight_response_counts_ + [downstream_step_idx] < + context->info_->max_inflight_responses_); }); - if (!capacity_available) { + // Log error only if timeout occurred (not cancellation). + if (!capacity_available && !cancelled()) { LOG_ERROR << "[Internal Error] Ensemble '" << context->info_->ensemble_name_ << "' step " << this_step_idx @@ -975,11 +983,12 @@ EnsembleContext::UpdateEnsembleState( inflight_step_counter_--; size_t completed_step_idx = completed_step->step_idx_; - step_inflight_response_counts_[completed_step_idx]--; - // Notify any producer threads blocked waiting for this step's capacity + // Decrement step_inflight_response_counts_, then notify any producer + // threads blocked waiting for this step's capacity if (info_->max_inflight_responses_ > 0 && !step_cvs_.empty()) { std::lock_guard lk(*step_mutexes_[completed_step_idx]); + step_inflight_response_counts_[completed_step_idx]--; step_cvs_[completed_step_idx]->notify_one(); } } @@ -1028,7 +1037,10 @@ EnsembleContext::GetNextSteps( // Track as inflight. Checked by producers for backpressure; decremented on // completion. - step_inflight_response_counts_[idx.first]++; + if (info_->max_inflight_responses_ > 0 && !step_mutexes_.empty()) { + std::lock_guard lk(*step_mutexes_[idx.first]); + step_inflight_response_counts_[idx.first]++; + } } inflight_step_counter_ += steps->size(); From 90cbb7a41b32a75c7dad6fd42330e4d111483164 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 20:01:20 +0530 Subject: [PATCH 05/20] Update error message --- src/ensemble_scheduler/ensemble_scheduler.cc | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 6854c7d53..3e72a5859 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -1709,14 +1709,18 @@ EnsembleScheduler::EnsembleScheduler( << info_->max_inflight_responses_; } else { LOG_ERROR - << "Ignoring 'max_ensemble_inflight_responses' for ensemble model '" - << config.name() << "': value must be positive, got " << size; + << "Ensemble model '" << config.name() + << "': max_ensemble_inflight_responses must be greater than 0. " + << "Received '" << size << "'. Falling back to default value (" + << info_->max_inflight_responses_ << ")."; } } catch (const std::exception& e) { - LOG_ERROR - << "Failed to parse 'max_ensemble_inflight_responses' for ensemble '" - << config.name() << "': " << e.what(); + LOG_ERROR << "Ensemble model '" << config.name() + << "': failed to parse max_ensemble_inflight_responses='" + << value << "': " << e.what() + << ". Falling back to default value (" + << info_->max_inflight_responses_ << ")."; } } } From 4befcd9c75dc77123f35f85a2e34f77762b68fda Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 15 Oct 2025 00:20:33 +0530 Subject: [PATCH 06/20] Update Parameter validation --- src/ensemble_scheduler/ensemble_scheduler.cc | 69 +++++++++++++------- src/ensemble_scheduler/ensemble_scheduler.h | 4 ++ 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 3e72a5859..7a5ba1ca4 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -1536,12 +1536,52 @@ EnsembleContext::ScheduleSteps( } // namespace +Status +EnsembleScheduler::ValidateConfig(const inference::ModelConfig& config) +{ + // Validate max_ensemble_inflight_responses parameter if present + if (config.parameters().contains("max_ensemble_inflight_responses")) { + const auto& param = + config.parameters().at("max_ensemble_inflight_responses"); + const std::string& value = param.string_value(); + + try { + const int parsed = std::stoi(value); + if (parsed <= 0) { + return Status( + Status::Code::INVALID_ARG, + "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + + config.name() + "': value must be positive, got " + + std::to_string(parsed)); + } + } + catch (const std::out_of_range& e) { + return Status( + Status::Code::INVALID_ARG, + "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + + config.name() + "': value exceeds maximum allowed (" + + std::to_string(INT_MAX) + ")"); + } + catch (const std::invalid_argument& e) { + return Status( + Status::Code::INVALID_ARG, + "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + + config.name() + "': cannot parse value '" + value + "'"); + } + } + + return Status::Success; +} + Status EnsembleScheduler::Create( InferenceStatsAggregator* const stats_aggregator, InferenceServer* const server, const ModelIdentifier& model_id, const inference::ModelConfig& config, std::unique_ptr* scheduler) { + // Validate configuration before constructing scheduler + RETURN_IF_ERROR(ValidateConfig(config)); + scheduler->reset( new EnsembleScheduler(stats_aggregator, server, model_id, config)); return Status::Success; @@ -1696,32 +1736,15 @@ EnsembleScheduler::EnsembleScheduler( // Parse backpressure configuration. Limits concurrent responses from // decoupled steps to prevent memory growth. + // Configuration is already validated in ValidateConfig() if (config.parameters().contains("max_ensemble_inflight_responses")) { const auto& param = config.parameters().at("max_ensemble_inflight_responses"); - const std::string& value = param.string_value(); - try { - const int64_t size = std::stoll(value); - if (size > 0) { - info_->max_inflight_responses_ = static_cast(size); - LOG_INFO << "Ensemble model '" << config.name() - << "' configured with max_ensemble_inflight_responses: " - << info_->max_inflight_responses_; - } else { - LOG_ERROR - << "Ensemble model '" << config.name() - << "': max_ensemble_inflight_responses must be greater than 0. " - << "Received '" << size << "'. Falling back to default value (" - << info_->max_inflight_responses_ << ")."; - } - } - catch (const std::exception& e) { - LOG_ERROR << "Ensemble model '" << config.name() - << "': failed to parse max_ensemble_inflight_responses='" - << value << "': " << e.what() - << ". Falling back to default value (" - << info_->max_inflight_responses_ << ")."; - } + info_->max_inflight_responses_ = + static_cast(std::stoi(param.string_value())); + LOG_INFO << "Ensemble model '" << config.name() + << "' configured with max_ensemble_inflight_responses: " + << info_->max_inflight_responses_; } } diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 33bc5c109..ed86ffdaa 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -122,6 +122,10 @@ class EnsembleScheduler : public Scheduler { InferenceServer* const server, const ModelIdentifier& model_id, const inference::ModelConfig& config); + // Validates ensemble configuration parameters before construction. + // Returns error Status if configuration is invalid. + static Status ValidateConfig(const inference::ModelConfig& config); + void CacheLookUp( std::unique_ptr& request, std::unique_ptr& cached_response); From 2bff78bd20fb1ab29cd88093193920b95eca0ef2 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 00:22:43 +0530 Subject: [PATCH 07/20] Move max_inflight_responses to config.proto --- src/ensemble_scheduler/ensemble_scheduler.cc | 73 +++++--------------- src/ensemble_scheduler/ensemble_scheduler.h | 8 +-- 2 files changed, 20 insertions(+), 61 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 7a5ba1ca4..723d119a9 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -379,7 +379,7 @@ class EnsembleContext { // consumers are overloaded. Only active if max_inflight_responses_ > 0. std::vector step_inflight_response_counts_; std::vector> step_mutexes_; - std::vector> step_cvs_; + std::vector> step_cv_vec_; // pointer that either points to 'pruned_tensor_to_step_' or to // 'info_->tensor_to_step_' if all ensemble outputs are requested @@ -522,11 +522,11 @@ EnsembleContext::EnsembleContext( if (info_->max_inflight_responses_ > 0) { step_mutexes_.resize(num_steps); - step_cvs_.resize(num_steps); + step_cv_vec_.resize(num_steps); for (size_t i = 0; i < num_steps; i++) { step_mutexes_[i] = std::make_unique(); - step_cvs_[i] = std::make_unique(); + step_cv_vec_[i] = std::make_unique(); } } @@ -701,7 +701,7 @@ EnsembleContext::ResponseComplete( // Block this producer if downstream consumers are overloaded. // Prevents memory exhaustion by limiting concurrent inflight responses. if (context->info_->max_inflight_responses_ > 0 && - !context->step_cvs_.empty()) { + !context->step_cv_vec_.empty()) { for (const auto& output_pair : istep.output_to_tensor_) { const auto& tensor_name = output_pair.second; const auto& downstream_steps = (*context->tensor_to_step_)[tensor_name]; @@ -720,7 +720,7 @@ EnsembleContext::ResponseComplete( }; bool capacity_available = - context->step_cvs_[downstream_step_idx]->wait_for( + context->step_cv_vec_[downstream_step_idx]->wait_for( lk, timeout, [&] { return cancelled() || (context->step_inflight_response_counts_ @@ -986,10 +986,10 @@ EnsembleContext::UpdateEnsembleState( // Decrement step_inflight_response_counts_, then notify any producer // threads blocked waiting for this step's capacity - if (info_->max_inflight_responses_ > 0 && !step_cvs_.empty()) { + if (info_->max_inflight_responses_ > 0 && !step_cv_vec_.empty()) { std::lock_guard lk(*step_mutexes_[completed_step_idx]); step_inflight_response_counts_[completed_step_idx]--; - step_cvs_[completed_step_idx]->notify_one(); + step_cv_vec_[completed_step_idx]->notify_one(); } } RETURN_IF_ERROR(ConsumeResponse(completed_step)); @@ -1536,52 +1536,12 @@ EnsembleContext::ScheduleSteps( } // namespace -Status -EnsembleScheduler::ValidateConfig(const inference::ModelConfig& config) -{ - // Validate max_ensemble_inflight_responses parameter if present - if (config.parameters().contains("max_ensemble_inflight_responses")) { - const auto& param = - config.parameters().at("max_ensemble_inflight_responses"); - const std::string& value = param.string_value(); - - try { - const int parsed = std::stoi(value); - if (parsed <= 0) { - return Status( - Status::Code::INVALID_ARG, - "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + - config.name() + "': value must be positive, got " + - std::to_string(parsed)); - } - } - catch (const std::out_of_range& e) { - return Status( - Status::Code::INVALID_ARG, - "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + - config.name() + "': value exceeds maximum allowed (" + - std::to_string(INT_MAX) + ")"); - } - catch (const std::invalid_argument& e) { - return Status( - Status::Code::INVALID_ARG, - "Invalid 'max_ensemble_inflight_responses' for ensemble model '" + - config.name() + "': cannot parse value '" + value + "'"); - } - } - - return Status::Success; -} - Status EnsembleScheduler::Create( InferenceStatsAggregator* const stats_aggregator, InferenceServer* const server, const ModelIdentifier& model_id, const inference::ModelConfig& config, std::unique_ptr* scheduler) { - // Validate configuration before constructing scheduler - RETURN_IF_ERROR(ValidateConfig(config)); - scheduler->reset( new EnsembleScheduler(stats_aggregator, server, model_id, config)); return Status::Success; @@ -1734,17 +1694,16 @@ EnsembleScheduler::EnsembleScheduler( } callback_pool_ = is_->EnsembleCallbackPool(); - // Parse backpressure configuration. Limits concurrent responses from - // decoupled steps to prevent memory growth. - // Configuration is already validated in ValidateConfig() - if (config.parameters().contains("max_ensemble_inflight_responses")) { - const auto& param = - config.parameters().at("max_ensemble_inflight_responses"); + // Backpressure configuration from protobuf field. Limits concurrent responses + // from decoupled steps to prevent memory growth. Value of 0 means unlimited. + if (config.has_ensemble_scheduling()) { info_->max_inflight_responses_ = - static_cast(std::stoi(param.string_value())); - LOG_INFO << "Ensemble model '" << config.name() - << "' configured with max_ensemble_inflight_responses: " - << info_->max_inflight_responses_; + config.ensemble_scheduling().max_inflight_responses(); + if (info_->max_inflight_responses_ > 0) { + LOG_INFO << "Ensemble model '" << config.name() + << "' configured with max_inflight_responses: " + << info_->max_inflight_responses_; + } } } diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index ed86ffdaa..2bf3d90ea 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,10 +84,10 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight responses from steps to downstream - // consumers. Prevents memory growth by blocking producers when limit reached. - // Value of 0 means unlimited (default). Configured via parameter - // 'max_ensemble_inflight_responses' in ensemble config.pbtxt. + // Maximum concurrent inflight responses from steps to downstream consumers. + // Prevents memory growth by blocking producers when limit is reached. + // Default value is 0, which indicates unlimited (no backpressure applied). + // Configured via 'max_inflight_responses' parameter in config.pbtxt. size_t max_inflight_responses_ = 0; }; From ea4187b1dcbab82f3dc8b1948e07ddbf031ba8be Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 00:25:24 +0530 Subject: [PATCH 08/20] Update --- src/ensemble_scheduler/ensemble_scheduler.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 2bf3d90ea..407470102 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -122,10 +122,6 @@ class EnsembleScheduler : public Scheduler { InferenceServer* const server, const ModelIdentifier& model_id, const inference::ModelConfig& config); - // Validates ensemble configuration parameters before construction. - // Returns error Status if configuration is invalid. - static Status ValidateConfig(const inference::ModelConfig& config); - void CacheLookUp( std::unique_ptr& request, std::unique_ptr& cached_response); From 1e155bb1f89495ba321cddb5cdac469b65507af2 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 02:32:50 +0530 Subject: [PATCH 09/20] Move blocking logic to EnsembleContext::ScheduleSteps() --- src/ensemble_scheduler/ensemble_scheduler.cc | 82 ++++++++------------ 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 723d119a9..ec8c636e9 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -694,55 +694,6 @@ EnsembleContext::ResponseComplete( auto pool = step_raw_ptr->ctx_->CallbackPool(); auto fn = [response, flags, step_raw_ptr]() { auto step_ptr = std::unique_ptr(step_raw_ptr); - auto& context = step_ptr->ctx_; - size_t this_step_idx = step_ptr->step_idx_; - const auto& istep = context->info_->steps_[this_step_idx]; - - // Block this producer if downstream consumers are overloaded. - // Prevents memory exhaustion by limiting concurrent inflight responses. - if (context->info_->max_inflight_responses_ > 0 && - !context->step_cv_vec_.empty()) { - for (const auto& output_pair : istep.output_to_tensor_) { - const auto& tensor_name = output_pair.second; - const auto& downstream_steps = (*context->tensor_to_step_)[tensor_name]; - - for (const auto& downstream_step_idx : downstream_steps) { - std::unique_lock lk( - *context->step_mutexes_[downstream_step_idx]); - - // Block if downstream inflight count >= limit. Timeout to prevent - // potential deadlock. Unblocks when downstream completes a request - // or request is cancelled. - auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); - auto cancelled = [&]() { - auto& req = context->request_tracker_->Request(); - return (req == nullptr) || req->IsCancelled(); - }; - - bool capacity_available = - context->step_cv_vec_[downstream_step_idx]->wait_for( - lk, timeout, [&] { - return cancelled() || - (context->step_inflight_response_counts_ - [downstream_step_idx] < - context->info_->max_inflight_responses_); - }); - - // Log error only if timeout occurred (not cancellation). - if (!capacity_available && !cancelled()) { - LOG_ERROR - << "[Internal Error] Ensemble '" - << context->info_->ensemble_name_ << "' step " << this_step_idx - << " blocked waiting for downstream step " - << downstream_step_idx << " (inflight: " - << context->step_inflight_response_counts_[downstream_step_idx] - << " >= limit: " << context->info_->max_inflight_responses_ - << ") for " << kMutexTimeoutSeconds - << " seconds. Proceeding to avoid deadlock."; - } - } - } - } step_ptr->response_flags_ = flags; step_ptr->response_ = response; @@ -1483,6 +1434,39 @@ EnsembleContext::ScheduleSteps( { for (auto& step : steps) { step->ctx_ = context; + size_t this_step_idx = step->step_idx_; + + // Block if this step is overloaded. + if (context->info_->max_inflight_responses_ > 0 && + !context->step_cv_vec_.empty()) { + std::unique_lock lk(*context->step_mutexes_[this_step_idx]); + + auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); + auto cancelled = [&]() { + auto& req = context->request_tracker_->Request(); + return (req == nullptr) || req->IsCancelled(); + }; + + bool capacity_available = context->step_cv_vec_[this_step_idx]->wait_for( + lk, timeout, [&] { + return cancelled() || + (context->step_inflight_response_counts_[this_step_idx] < + context->info_->max_inflight_responses_); + }); + + // Log error only if timeout occurred (not cancellation). + if (!capacity_available && !cancelled()) { + LOG_ERROR << "[Internal Error] Ensemble '" + << context->info_->ensemble_name_ + << "' unable to schedule step " << this_step_idx + << " (inflight: " + << context->step_inflight_response_counts_[this_step_idx] + << " >= limit: " << context->info_->max_inflight_responses_ + << ") for " << kMutexTimeoutSeconds + << " seconds. Proceeding to avoid deadlock."; + } + } + bool should_schedule = false; // Must release lock before InferAsync to avoid deadlock, as the same thread // will be calling request/response callbacks on cache hits, which will From 3c0e83debde1ac6f84679f233cccbc23298d10d3 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 02:35:34 +0530 Subject: [PATCH 10/20] Fix pre-commit --- src/ensemble_scheduler/ensemble_scheduler.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index ec8c636e9..588fbcaa2 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -1435,7 +1435,7 @@ EnsembleContext::ScheduleSteps( for (auto& step : steps) { step->ctx_ = context; size_t this_step_idx = step->step_idx_; - + // Block if this step is overloaded. if (context->info_->max_inflight_responses_ > 0 && !context->step_cv_vec_.empty()) { @@ -1447,8 +1447,8 @@ EnsembleContext::ScheduleSteps( return (req == nullptr) || req->IsCancelled(); }; - bool capacity_available = context->step_cv_vec_[this_step_idx]->wait_for( - lk, timeout, [&] { + bool capacity_available = + context->step_cv_vec_[this_step_idx]->wait_for(lk, timeout, [&] { return cancelled() || (context->step_inflight_response_counts_[this_step_idx] < context->info_->max_inflight_responses_); From 2cd6c7bcf6bb2c650b8acdf347de704115d6b297 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 24 Oct 2025 20:31:26 +0530 Subject: [PATCH 11/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 56 ++++++++++---------- src/ensemble_scheduler/ensemble_scheduler.h | 6 +-- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 588fbcaa2..60a5ca2c6 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -375,9 +375,9 @@ class EnsembleContext { size_t inflight_step_counter_; // Backpressure support: Limits memory growth from decoupled models. - // Tracks inflight responses per step; blocks producers when downstream - // consumers are overloaded. Only active if max_inflight_responses_ > 0. - std::vector step_inflight_response_counts_; + // Tracks inflight requests per step; blocks producers when downstream + // consumers are overloaded. Only active if max_inflight_requests_ > 0. + std::vector step_inflight_request_counts_; std::vector> step_mutexes_; std::vector> step_cv_vec_; @@ -518,9 +518,9 @@ EnsembleContext::EnsembleContext( // Initialize backpressure tracking if enabled. size_t num_steps = info_->steps_.size(); - step_inflight_response_counts_.resize(num_steps, 0); + step_inflight_request_counts_.resize(num_steps, 0); - if (info_->max_inflight_responses_ > 0) { + if (info_->max_inflight_requests_ > 0) { step_mutexes_.resize(num_steps); step_cv_vec_.resize(num_steps); @@ -935,11 +935,11 @@ EnsembleContext::UpdateEnsembleState( size_t completed_step_idx = completed_step->step_idx_; - // Decrement step_inflight_response_counts_, then notify any producer + // Decrement step_inflight_request_counts_, then notify any producer // threads blocked waiting for this step's capacity - if (info_->max_inflight_responses_ > 0 && !step_cv_vec_.empty()) { + if (info_->max_inflight_requests_ > 0 && !step_cv_vec_.empty()) { std::lock_guard lk(*step_mutexes_[completed_step_idx]); - step_inflight_response_counts_[completed_step_idx]--; + step_inflight_request_counts_[completed_step_idx]--; step_cv_vec_[completed_step_idx]->notify_one(); } } @@ -985,13 +985,6 @@ EnsembleContext::GetNextSteps( for (const auto& idx : next_step_idx) { steps->emplace_back(); RETURN_IF_ERROR(InitStep(idx.first, idx.second, &(steps->back()))); - - // Track as inflight. Checked by producers for backpressure; decremented on - // completion. - if (info_->max_inflight_responses_ > 0 && !step_mutexes_.empty()) { - std::lock_guard lk(*step_mutexes_[idx.first]); - step_inflight_response_counts_[idx.first]++; - } } inflight_step_counter_ += steps->size(); @@ -1436,8 +1429,8 @@ EnsembleContext::ScheduleSteps( step->ctx_ = context; size_t this_step_idx = step->step_idx_; - // Block if this step is overloaded. - if (context->info_->max_inflight_responses_ > 0 && + // Apply backpressure to downstream steps only, not the entry step + if ((this_step_idx != 0) && context->info_->max_inflight_requests_ > 0 && !context->step_cv_vec_.empty()) { std::unique_lock lk(*context->step_mutexes_[this_step_idx]); @@ -1450,8 +1443,8 @@ EnsembleContext::ScheduleSteps( bool capacity_available = context->step_cv_vec_[this_step_idx]->wait_for(lk, timeout, [&] { return cancelled() || - (context->step_inflight_response_counts_[this_step_idx] < - context->info_->max_inflight_responses_); + (context->step_inflight_request_counts_[this_step_idx] < + context->info_->max_inflight_requests_); }); // Log error only if timeout occurred (not cancellation). @@ -1460,8 +1453,8 @@ EnsembleContext::ScheduleSteps( << context->info_->ensemble_name_ << "' unable to schedule step " << this_step_idx << " (inflight: " - << context->step_inflight_response_counts_[this_step_idx] - << " >= limit: " << context->info_->max_inflight_responses_ + << context->step_inflight_request_counts_[this_step_idx] + << " >= limit: " << context->info_->max_inflight_requests_ << ") for " << kMutexTimeoutSeconds << " seconds. Proceeding to avoid deadlock."; } @@ -1496,6 +1489,15 @@ EnsembleContext::ScheduleSteps( std::unique_ptr request = std::move(step->request_); auto step_status = context->is_->InferAsync(request); if (step_status.IsOk()) { + // Increment inflight counter AFTER successful scheduling. Always + // increment for ALL steps (including step 0) to ensure symmetry with + // decrement and prevent underflow when steps complete. + if (context->info_->max_inflight_requests_ > 0 && + !context->step_mutexes_.empty()) { + std::lock_guard lk( + *context->step_mutexes_[this_step_idx]); + context->step_inflight_request_counts_[this_step_idx]++; + } step.release(); continue; } else { @@ -1678,15 +1680,15 @@ EnsembleScheduler::EnsembleScheduler( } callback_pool_ = is_->EnsembleCallbackPool(); - // Backpressure configuration from protobuf field. Limits concurrent responses + // Backpressure configuration from protobuf field. Limits concurrent requests // from decoupled steps to prevent memory growth. Value of 0 means unlimited. if (config.has_ensemble_scheduling()) { - info_->max_inflight_responses_ = - config.ensemble_scheduling().max_inflight_responses(); - if (info_->max_inflight_responses_ > 0) { + info_->max_inflight_requests_ = + config.ensemble_scheduling().max_inflight_requests(); + if (info_->max_inflight_requests_ > 0) { LOG_INFO << "Ensemble model '" << config.name() - << "' configured with max_inflight_responses: " - << info_->max_inflight_responses_; + << "' configured with max_inflight_requests: " + << info_->max_inflight_requests_; } } } diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 407470102..e86f28afe 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,11 +84,11 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight responses from steps to downstream consumers. + // Maximum concurrent inflight requests from steps to downstream consumers. // Prevents memory growth by blocking producers when limit is reached. // Default value is 0, which indicates unlimited (no backpressure applied). - // Configured via 'max_inflight_responses' parameter in config.pbtxt. - size_t max_inflight_responses_ = 0; + // Configured via 'max_inflight_requests' field in ensemble_scheduling. + size_t max_inflight_requests_ = 0; }; // Scheduler that implements ensemble scheduling. From 58eaa557fb78ae0f17da908df7f4e6aceb28895c Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 24 Oct 2025 21:43:49 +0530 Subject: [PATCH 12/20] Update comment --- src/ensemble_scheduler/ensemble_scheduler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index e86f28afe..ff7828e42 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,7 +84,7 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight requests from steps to downstream consumers. + // Maximum concurrent inflight requests to ensemble steps (downstream consumers). // Prevents memory growth by blocking producers when limit is reached. // Default value is 0, which indicates unlimited (no backpressure applied). // Configured via 'max_inflight_requests' field in ensemble_scheduling. From 35d6700d0ad8bacc6acd069acbaecbd146d3aef1 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 24 Oct 2025 21:45:57 +0530 Subject: [PATCH 13/20] Fix pre-commit --- src/ensemble_scheduler/ensemble_scheduler.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index ff7828e42..9362e5aec 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,10 +84,11 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight requests to ensemble steps (downstream consumers). - // Prevents memory growth by blocking producers when limit is reached. - // Default value is 0, which indicates unlimited (no backpressure applied). - // Configured via 'max_inflight_requests' field in ensemble_scheduling. + // Maximum concurrent inflight requests to ensemble steps (downstream + // consumers). Prevents memory growth by blocking producers when limit is + // reached. Default value is 0, which indicates unlimited (no backpressure + // applied). Configured via 'max_inflight_requests' field in + // ensemble_scheduling. size_t max_inflight_requests_ = 0; }; From f1ff2f7b72918f61592461e3d534d03f8b29ed69 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 28 Oct 2025 22:45:51 +0530 Subject: [PATCH 14/20] Enhancements --- src/constants.h | 2 + src/ensemble_scheduler/ensemble_scheduler.cc | 160 +++++++++++-------- 2 files changed, 97 insertions(+), 65 deletions(-) diff --git a/src/constants.h b/src/constants.h index 208def668..ed7cf980e 100644 --- a/src/constants.h +++ b/src/constants.h @@ -95,6 +95,8 @@ constexpr int MAX_GRPC_MESSAGE_SIZE = INT32_MAX; constexpr uint64_t SEQUENCE_IDLE_DEFAULT_MICROSECONDS = 1000 * 1000; constexpr size_t CUDA_IPC_STRUCT_SIZE = 64; +constexpr int kMutexTimeoutSeconds = 300; + #ifdef TRITON_ENABLE_METRICS // MetricModelReporter expects a device ID for GPUs, but we reuse this device // ID for other metrics as well such as for CPU and Response Cache metrics diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 60a5ca2c6..c865a7323 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -31,6 +31,7 @@ #include #include +#include "constants.h" #include "cuda_utils.h" #include "metrics.h" #include "model.h" @@ -46,9 +47,6 @@ class EnsembleContext; using IterationCount = size_t; -// Timeout for mutex blocking to prevent potential deadlocks -constexpr int kMutexTimeoutSeconds = 300; - // Check if the model is configured to preserve the order of responses. // This is critical for async execution of ResponseComplete callbacks. inline bool @@ -154,6 +152,81 @@ class RequestTracker { triton::common::ThreadPool* const callback_pool_; }; +// Limits concurrent inflight requests for a single ensemble step. +// Tracks inflight requests count and blocks producers when limit is reached. +class StepInflightRequestLimiter { + public: + StepInflightRequestLimiter() : inflight_count_(0), max_inflight_(0) {} + + void SetLimit(size_t limit) { max_inflight_ = limit; } + + // Wait until capacity is available or request is cancelled. + // No-op if limit not configured (max_inflight_ == 0). + void WaitForCapacity( + RequestTracker* request_tracker, const size_t step_idx, + const std::string& ensemble_name) + { + // No limit configured, no blocking + if (max_inflight_ == 0) { + return; + } + + std::unique_lock lk(mutex_); + auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); + + auto is_request_cancelled = [&]() { + auto& req = request_tracker->Request(); + return (req == nullptr) || req->IsCancelled(); + }; + + bool capacity_available = cv_.wait_for(lk, timeout, [&] { + return is_request_cancelled() || (inflight_count_ < max_inflight_); + }); + + // Log error if timeout occurred (not cancellation), but proceed anyway + // to avoid deadlock. Caller always continues after this call. + if (!capacity_available && !is_request_cancelled()) { + LOG_ERROR << "[Internal Error] Ensemble '" << ensemble_name + << "' unable to schedule step " << step_idx + << " (inflight: " << inflight_count_ + << " >= limit: " << max_inflight_ << ") for " + << kMutexTimeoutSeconds + << " seconds. Proceeding to avoid deadlock."; + } + } + + // Increment inflight count after successfully scheduling a request. + // No-op if limit not configured (max_inflight_ == 0). + void IncrementInflightCount() + { + // No limit configured, no tracking needed + if (max_inflight_ == 0) { + return; + } + std::lock_guard lk(mutex_); + inflight_count_++; + } + + // Decrement inflight count when a request completes, and notify waiting + // producers. No-op if limit not configured (max_inflight_ == 0). + void DecrementInflightCount() + { + // No limit configured, no tracking needed + if (max_inflight_ == 0) { + return; + } + std::lock_guard lk(mutex_); + inflight_count_--; + cv_.notify_one(); + } + + private: + size_t inflight_count_; + size_t max_inflight_; + std::mutex mutex_; + std::condition_variable cv_; +}; + // Step is used as 'userp' and keeps ensemble context alive // until no more internal requests are inflight. // Step contains metadata, and status for the @@ -374,12 +447,9 @@ class EnsembleContext { size_t inflight_step_counter_; - // Backpressure support: Limits memory growth from decoupled models. - // Tracks inflight requests per step; blocks producers when downstream - // consumers are overloaded. Only active if max_inflight_requests_ > 0. - std::vector step_inflight_request_counts_; - std::vector> step_mutexes_; - std::vector> step_cv_vec_; + // Inflight request limiters for each ensemble step. + std::vector> + step_inflight_request_limiters_; // pointer that either points to 'pruned_tensor_to_step_' or to // 'info_->tensor_to_step_' if all ensemble outputs are requested @@ -516,18 +586,13 @@ EnsembleContext::EnsembleContext( } } - // Initialize backpressure tracking if enabled. + // Initialize backpressure managers for each step. size_t num_steps = info_->steps_.size(); - step_inflight_request_counts_.resize(num_steps, 0); - - if (info_->max_inflight_requests_ > 0) { - step_mutexes_.resize(num_steps); - step_cv_vec_.resize(num_steps); - - for (size_t i = 0; i < num_steps; i++) { - step_mutexes_[i] = std::make_unique(); - step_cv_vec_[i] = std::make_unique(); - } + step_inflight_request_limiters_.resize(num_steps); + for (size_t i = 0; i < num_steps; i++) { + step_inflight_request_limiters_[i] = + std::make_unique(); + step_inflight_request_limiters_[i]->SetLimit(info_->max_inflight_requests_); } if (ensemble_status_.IsOk()) { @@ -932,16 +997,8 @@ EnsembleContext::UpdateEnsembleState( if (completed_step->response_flags_ & TRITONSERVER_RESPONSE_COMPLETE_FINAL) { inflight_step_counter_--; - - size_t completed_step_idx = completed_step->step_idx_; - - // Decrement step_inflight_request_counts_, then notify any producer - // threads blocked waiting for this step's capacity - if (info_->max_inflight_requests_ > 0 && !step_cv_vec_.empty()) { - std::lock_guard lk(*step_mutexes_[completed_step_idx]); - step_inflight_request_counts_[completed_step_idx]--; - step_cv_vec_[completed_step_idx]->notify_one(); - } + step_inflight_request_limiters_[completed_step->step_idx_] + ->DecrementInflightCount(); } RETURN_IF_ERROR(ConsumeResponse(completed_step)); updated_tensors->swap(completed_step->updated_tensors_); @@ -1429,35 +1486,12 @@ EnsembleContext::ScheduleSteps( step->ctx_ = context; size_t this_step_idx = step->step_idx_; - // Apply backpressure to downstream steps only, not the entry step - if ((this_step_idx != 0) && context->info_->max_inflight_requests_ > 0 && - !context->step_cv_vec_.empty()) { - std::unique_lock lk(*context->step_mutexes_[this_step_idx]); - - auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); - auto cancelled = [&]() { - auto& req = context->request_tracker_->Request(); - return (req == nullptr) || req->IsCancelled(); - }; - - bool capacity_available = - context->step_cv_vec_[this_step_idx]->wait_for(lk, timeout, [&] { - return cancelled() || - (context->step_inflight_request_counts_[this_step_idx] < - context->info_->max_inflight_requests_); - }); - - // Log error only if timeout occurred (not cancellation). - if (!capacity_available && !cancelled()) { - LOG_ERROR << "[Internal Error] Ensemble '" - << context->info_->ensemble_name_ - << "' unable to schedule step " << this_step_idx - << " (inflight: " - << context->step_inflight_request_counts_[this_step_idx] - << " >= limit: " << context->info_->max_inflight_requests_ - << ") for " << kMutexTimeoutSeconds - << " seconds. Proceeding to avoid deadlock."; - } + // Apply backpressure to downstream steps only, not the entry step. + // Step 0 is scheduled on handler thread and should never block. + if (this_step_idx != 0) { + context->step_inflight_request_limiters_[this_step_idx]->WaitForCapacity( + context->request_tracker_, this_step_idx, + context->info_->ensemble_name_); } bool should_schedule = false; @@ -1492,12 +1526,8 @@ EnsembleContext::ScheduleSteps( // Increment inflight counter AFTER successful scheduling. Always // increment for ALL steps (including step 0) to ensure symmetry with // decrement and prevent underflow when steps complete. - if (context->info_->max_inflight_requests_ > 0 && - !context->step_mutexes_.empty()) { - std::lock_guard lk( - *context->step_mutexes_[this_step_idx]); - context->step_inflight_request_counts_[this_step_idx]++; - } + context->step_inflight_request_limiters_[this_step_idx] + ->IncrementInflightCount(); step.release(); continue; } else { From 2685d49d0df827bfbdabc84ccc48774de37f9d3a Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 28 Oct 2025 22:54:56 +0530 Subject: [PATCH 15/20] Update --- src/ensemble_scheduler/ensemble_scheduler.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 9362e5aec..8b3985442 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,10 +84,11 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight requests to ensemble steps (downstream - // consumers). Prevents memory growth by blocking producers when limit is - // reached. Default value is 0, which indicates unlimited (no backpressure - // applied). Configured via 'max_inflight_requests' field in + // Maximum concurrent inflight requests allowed at each ensemble step + // (downstream consumer). This limit is applied per step, not globally for the + // entire ensemble model. Prevents memory growth by blocking producers when + // the limit is reached. Default value is 0, which indicates unlimited (no + // backpressure applied). Configured via 'max_inflight_requests' field in // ensemble_scheduling. size_t max_inflight_requests_ = 0; }; From 381b847346a70155183719a16407423e2ad21ae5 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Thu, 30 Oct 2025 14:31:10 +0530 Subject: [PATCH 16/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index c865a7323..74e97e8af 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -156,9 +156,10 @@ class RequestTracker { // Tracks inflight requests count and blocks producers when limit is reached. class StepInflightRequestLimiter { public: - StepInflightRequestLimiter() : inflight_count_(0), max_inflight_(0) {} - - void SetLimit(size_t limit) { max_inflight_ = limit; } + explicit StepInflightRequestLimiter(const size_t max_inflight) + : inflight_count_(0), max_inflight_(max_inflight) + { + } // Wait until capacity is available or request is cancelled. // No-op if limit not configured (max_inflight_ == 0). @@ -222,7 +223,7 @@ class StepInflightRequestLimiter { private: size_t inflight_count_; - size_t max_inflight_; + const size_t max_inflight_; std::mutex mutex_; std::condition_variable cv_; }; @@ -588,11 +589,10 @@ EnsembleContext::EnsembleContext( // Initialize backpressure managers for each step. size_t num_steps = info_->steps_.size(); - step_inflight_request_limiters_.resize(num_steps); for (size_t i = 0; i < num_steps; i++) { - step_inflight_request_limiters_[i] = - std::make_unique(); - step_inflight_request_limiters_[i]->SetLimit(info_->max_inflight_requests_); + step_inflight_request_limiters_.emplace_back( + std::make_unique( + info_->max_inflight_requests_)); } if (ensemble_status_.IsOk()) { From 53ec73392b274862cb7aa3dbfa81541960ffa10a Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 31 Oct 2025 16:39:26 +0530 Subject: [PATCH 17/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 35 +++++++++++--------- src/ensemble_scheduler/ensemble_scheduler.h | 11 +++--- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 74e97e8af..d29413f5a 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -449,6 +449,7 @@ class EnsembleContext { size_t inflight_step_counter_; // Inflight request limiters for each ensemble step. + // Only allocated when max_inflight_requests_ > 0. std::vector> step_inflight_request_limiters_; @@ -587,12 +588,14 @@ EnsembleContext::EnsembleContext( } } - // Initialize backpressure managers for each step. - size_t num_steps = info_->steps_.size(); - for (size_t i = 0; i < num_steps; i++) { - step_inflight_request_limiters_.emplace_back( - std::make_unique( - info_->max_inflight_requests_)); + // Initialize step inflight request limiters for each step. + if (info_->max_inflight_requests_ > 0) { + size_t num_steps = info_->steps_.size(); + for (size_t i = 0; i < num_steps; i++) { + step_inflight_request_limiters_.emplace_back( + std::make_unique( + info_->max_inflight_requests_)); + } } if (ensemble_status_.IsOk()) { @@ -997,8 +1000,10 @@ EnsembleContext::UpdateEnsembleState( if (completed_step->response_flags_ & TRITONSERVER_RESPONSE_COMPLETE_FINAL) { inflight_step_counter_--; - step_inflight_request_limiters_[completed_step->step_idx_] - ->DecrementInflightCount(); + if (!step_inflight_request_limiters_.empty()) { + step_inflight_request_limiters_[completed_step->step_idx_] + ->DecrementInflightCount(); + } } RETURN_IF_ERROR(ConsumeResponse(completed_step)); updated_tensors->swap(completed_step->updated_tensors_); @@ -1486,9 +1491,8 @@ EnsembleContext::ScheduleSteps( step->ctx_ = context; size_t this_step_idx = step->step_idx_; - // Apply backpressure to downstream steps only, not the entry step. - // Step 0 is scheduled on handler thread and should never block. - if (this_step_idx != 0) { + // Apply step inflight request limiters if configured. + if (!context->step_inflight_request_limiters_.empty()) { context->step_inflight_request_limiters_[this_step_idx]->WaitForCapacity( context->request_tracker_, this_step_idx, context->info_->ensemble_name_); @@ -1526,8 +1530,10 @@ EnsembleContext::ScheduleSteps( // Increment inflight counter AFTER successful scheduling. Always // increment for ALL steps (including step 0) to ensure symmetry with // decrement and prevent underflow when steps complete. - context->step_inflight_request_limiters_[this_step_idx] - ->IncrementInflightCount(); + if (!context->step_inflight_request_limiters_.empty()) { + context->step_inflight_request_limiters_[this_step_idx] + ->IncrementInflightCount(); + } step.release(); continue; } else { @@ -1710,8 +1716,7 @@ EnsembleScheduler::EnsembleScheduler( } callback_pool_ = is_->EnsembleCallbackPool(); - // Backpressure configuration from protobuf field. Limits concurrent requests - // from decoupled steps to prevent memory growth. Value of 0 means unlimited. + // Parse the configuration for max_inflight_requests from the protobuf field. if (config.has_ensemble_scheduling()) { info_->max_inflight_requests_ = config.ensemble_scheduling().max_inflight_requests(); diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 8b3985442..e272adbd4 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -84,11 +84,12 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // Maximum concurrent inflight requests allowed at each ensemble step - // (downstream consumer). This limit is applied per step, not globally for the - // entire ensemble model. Prevents memory growth by blocking producers when - // the limit is reached. Default value is 0, which indicates unlimited (no - // backpressure applied). Configured via 'max_inflight_requests' field in + // The maximum number of concurrent inflight requests allowed at each ensemble + // step per inference request. This limit is applied per step, not globally + // for the entire ensemble model. This limit prevents unbounded memory growth + // when ensemble steps produce responses faster than downstream steps can + // consume them. Default value is 0, which indicates that no limit is + // enforced. Configured via 'max_inflight_requests' field in // ensemble_scheduling. size_t max_inflight_requests_ = 0; }; From b165755e67889dcc5a3926ce5d41ea5081f6d704 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 1 Nov 2025 00:14:23 +0530 Subject: [PATCH 18/20] Update src/ensemble_scheduler/ensemble_scheduler.h Co-authored-by: Yingge He <157551214+yinggeh@users.noreply.github.com> --- src/ensemble_scheduler/ensemble_scheduler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index e272adbd4..fca3317e0 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -85,7 +85,7 @@ struct EnsembleInfo { std::unordered_map tensor_to_prev_step_; // The maximum number of concurrent inflight requests allowed at each ensemble - // step per inference request. This limit is applied per step, not globally + // step per inference request. This limit is applied per step and per inference request, not globally // for the entire ensemble model. This limit prevents unbounded memory growth // when ensemble steps produce responses faster than downstream steps can // consume them. Default value is 0, which indicates that no limit is From caf637c8b27c3ba80acc9ef1034bdb8f62fce0a4 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 1 Nov 2025 00:37:16 +0530 Subject: [PATCH 19/20] Update --- src/ensemble_scheduler/ensemble_scheduler.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index fca3317e0..2d252445e 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -85,12 +85,12 @@ struct EnsembleInfo { std::unordered_map tensor_to_prev_step_; // The maximum number of concurrent inflight requests allowed at each ensemble - // step per inference request. This limit is applied per step and per inference request, not globally - // for the entire ensemble model. This limit prevents unbounded memory growth - // when ensemble steps produce responses faster than downstream steps can - // consume them. Default value is 0, which indicates that no limit is - // enforced. Configured via 'max_inflight_requests' field in - // ensemble_scheduling. + // step per inference request. This limit is applied per step and per + // inference request, not globally for the entire ensemble model. This limit + // prevents unbounded memory growth when ensemble steps produce responses + // faster than downstream steps can consume them. Default value is 0, which + // indicates that no limit is enforced. Configured via 'max_inflight_requests' + // field in ensemble_scheduling. size_t max_inflight_requests_ = 0; }; From 9187e19a73c316f9025bf4ecbdd48ea770008c38 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 1 Nov 2025 01:28:55 +0530 Subject: [PATCH 20/20] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index d29413f5a..4a89ab82f 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -591,6 +591,7 @@ EnsembleContext::EnsembleContext( // Initialize step inflight request limiters for each step. if (info_->max_inflight_requests_ > 0) { size_t num_steps = info_->steps_.size(); + step_inflight_request_limiters_.reserve(num_steps); for (size_t i = 0; i < num_steps; i++) { step_inflight_request_limiters_.emplace_back( std::make_unique(