@@ -39,9 +39,9 @@ constexpr size_t kRequestQueueSize = 100000;
39
39
ContinuousScheduler::ContinuousScheduler (Engine* engine, const Options& options)
40
40
: options_(options), engine_(engine), request_queue_(kRequestQueueSize ) {
41
41
CHECK (engine_ != nullptr );
42
- block_manager_ = engine_->block_manager_pool ();
43
- CHECK (block_manager_ != nullptr );
44
- enable_prefix_cache_ = block_manager_ ->options ().enable_prefix_cache ();
42
+ block_manager_pool_ = engine_->block_manager_pool ();
43
+ CHECK (block_manager_pool_ != nullptr );
44
+ enable_prefix_cache_ = block_manager_pool_ ->options ().enable_prefix_cache ();
45
45
46
46
last_batch_.resize (options_.dp_size ());
47
47
@@ -85,11 +85,11 @@ void ContinuousScheduler::handle_prefill_requests(
85
85
// they may contian many sequences, so we should check here.
86
86
while (!waiting_priority_queue_.empty () && remaining_seq_budget > 0 &&
87
87
remaining_token_budget > 0 &&
88
- block_manager_ ->kv_cache_utilization () <
88
+ block_manager_pool_ ->kv_cache_utilization () <
89
89
FLAGS_prefill_scheduling_memory_usage_threshold) {
90
90
std::shared_ptr<Request> request (waiting_priority_queue_.top ());
91
91
if (request->finished () || request->cancelled ()) {
92
- block_manager_ ->deallocate (request.get ());
92
+ block_manager_pool_ ->deallocate (request.get ());
93
93
// release the ownership of the request
94
94
finished_requests.emplace_back (request);
95
95
// remove the request from the priority queue
@@ -124,8 +124,8 @@ void ContinuousScheduler::handle_prefill_requests(
124
124
break ;
125
125
}
126
126
127
- if (!block_manager_ ->allocate (prefill_sequence.get ())) {
128
- block_manager_ ->deallocate (prefill_sequence.get ());
127
+ if (!block_manager_pool_ ->allocate (prefill_sequence.get ())) {
128
+ block_manager_pool_ ->deallocate (prefill_sequence.get ());
129
129
can_schedule = false ;
130
130
break ;
131
131
}
@@ -139,7 +139,7 @@ void ContinuousScheduler::handle_prefill_requests(
139
139
if (!can_schedule) {
140
140
for (auto & seq : prefill_sequences) {
141
141
// release shared blocks
142
- block_manager_ ->deallocate (seq);
142
+ block_manager_pool_ ->deallocate (seq);
143
143
}
144
144
break ;
145
145
}
@@ -161,13 +161,14 @@ void ContinuousScheduler::handle_prefill_requests(
161
161
}
162
162
163
163
if (running_sequences_.empty () && !waiting_priority_queue_.empty () &&
164
- running_queue_.empty () && block_manager_->kv_cache_utilization () == 0 ) {
164
+ running_queue_.empty () &&
165
+ block_manager_pool_->kv_cache_utilization () == 0 ) {
165
166
LOG (ERROR) << " Request prompt is too long, no enough memory to schedule "
166
167
" a single sequence." ;
167
168
// no enough memory to schedule single sequence, just finish the request
168
169
std::shared_ptr<Request> request (waiting_priority_queue_.top ());
169
170
waiting_priority_queue_.pop ();
170
- block_manager_ ->deallocate (request.get ());
171
+ block_manager_pool_ ->deallocate (request.get ());
171
172
response_processor_->process_failed_request (
172
173
request,
173
174
{StatusCode::RESOURCE_EXHAUSTED,
@@ -224,13 +225,13 @@ void ContinuousScheduler::handle_decode_requests(
224
225
size_t updated_num_tokens =
225
226
sequence->num_tokens () + options_.num_speculative_tokens () + 1 ;
226
227
// no blocks left
227
- if (!block_manager_ ->allocate (sequence.get (), updated_num_tokens)) {
228
+ if (!block_manager_pool_ ->allocate (sequence.get (), updated_num_tokens)) {
228
229
has_enough_blocks = false ;
229
230
break ;
230
231
}
231
232
232
233
if (sequence->if_cache_block_for_prefill ()) {
233
- block_manager_ ->cache (sequence.get ());
234
+ block_manager_pool_ ->cache (sequence.get ());
234
235
}
235
236
236
237
// update the allocated tokens for the sequence
@@ -279,7 +280,7 @@ void ContinuousScheduler::handle_decode_requests(
279
280
280
281
if (request_to_preempt.get () != request.get ()) {
281
282
++num_preempted_requests;
282
- block_manager_ ->deallocate (request_to_preempt.get ());
283
+ block_manager_pool_ ->deallocate (request_to_preempt.get ());
283
284
running_queue_.pop_back ();
284
285
// add preemptable request to waiting priority queue
285
286
request_to_preempt->set_preempted ();
@@ -339,7 +340,7 @@ void ContinuousScheduler::handle_abnormal_request(
339
340
<< " Running queue size is not 1, there maybe a bug of request "
340
341
" preemption logic. running_queue_.size ="
341
342
<< running_queue_.size ();
342
- if (util::sum (block_manager_ ->num_used_blocks ()) !=
343
+ if (util::sum (block_manager_pool_ ->num_used_blocks ()) !=
343
344
request->total_num_blocks ()) {
344
345
// blocks_exhausted is true.
345
346
// NOTE: consider dp > 1, here we need get all num blocks in use.
@@ -353,7 +354,7 @@ void ContinuousScheduler::handle_abnormal_request(
353
354
354
355
// request is too long, budget or memory no enough.
355
356
running_queue_.pop_front ();
356
- block_manager_ ->deallocate (request.get ());
357
+ block_manager_pool_ ->deallocate (request.get ());
357
358
response_processor_->process_failed_request (
358
359
request,
359
360
{StatusCode::RESOURCE_EXHAUSTED,
@@ -384,13 +385,13 @@ void ContinuousScheduler::handle_running_requests(
384
385
// check if the request can be expanded
385
386
if (request->expand_sequences ()) {
386
387
// cache the blocks to share among the sequences
387
- block_manager_ ->cache (request->sequences ()[0 ].get ());
388
+ block_manager_pool_ ->cache (request->sequences ()[0 ].get ());
388
389
}
389
390
390
391
// release blocks for finished sequences here
391
392
for (auto & sequence : request->sequences ()) {
392
393
if (sequence->finished ()) {
393
- block_manager_ ->deallocate (sequence.get ());
394
+ block_manager_pool_ ->deallocate (sequence.get ());
394
395
}
395
396
}
396
397
}
@@ -428,7 +429,7 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
428
429
std::shared_ptr<Request> request = *it;
429
430
request->update_connection_status ();
430
431
if (request->finished () || request->cancelled ()) {
431
- block_manager_ ->deallocate (request.get ());
432
+ block_manager_pool_ ->deallocate (request.get ());
432
433
// release the ownership of the request
433
434
finished_requests.emplace_back (request);
434
435
// finished request is set to nullptr
@@ -516,11 +517,12 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
516
517
517
518
GAUGE_SET (num_running_sequences, running_sequences_.size ());
518
519
519
- GAUGE_SET (kv_cache_utilization_perc, block_manager_->kv_cache_utilization ());
520
+ GAUGE_SET (kv_cache_utilization_perc,
521
+ block_manager_pool_->kv_cache_utilization ());
520
522
GAUGE_SET (num_blocks_in_prefix_cache,
521
- util::min (block_manager_ ->num_blocks_in_prefix_cache ()));
522
- GAUGE_SET (num_free_blocks, util::max (block_manager_ ->num_free_blocks ()));
523
- GAUGE_SET (num_used_blocks, util::min (block_manager_ ->num_used_blocks ()));
523
+ util::min (block_manager_pool_ ->num_blocks_in_prefix_cache ()));
524
+ GAUGE_SET (num_free_blocks, util::max (block_manager_pool_ ->num_free_blocks ()));
525
+ GAUGE_SET (num_used_blocks, util::min (block_manager_pool_ ->num_used_blocks ()));
524
526
return batches;
525
527
}
526
528
@@ -656,8 +658,8 @@ void ContinuousScheduler::process_batch_output(bool enable_schedule_overlap) {
656
658
get_num_occupied_slots (to_be_processed_sequences);
657
659
std::vector<int64_t > active_activation_size_in_bytes =
658
660
get_active_activation_in_bytes ();
659
- int64_t num_total_slots = block_manager_ ->options ().num_blocks () *
660
- block_manager_ ->options ().block_size ();
661
+ int64_t num_total_slots = block_manager_pool_ ->options ().num_blocks () *
662
+ block_manager_pool_ ->options ().block_size ();
661
663
for (int32_t dp_rank = 0 ; dp_rank < options_.dp_size (); ++dp_rank) {
662
664
double occupied_slots_ratio =
663
665
static_cast <double >(num_occupied_slots[dp_rank]) / num_total_slots;
@@ -712,16 +714,16 @@ void ContinuousScheduler::process_batch_output(bool enable_schedule_overlap) {
712
714
713
715
std::vector<Block> ContinuousScheduler::allocate_blocks_for (size_t token_num,
714
716
int32_t & dp_rank) {
715
- return block_manager_ ->allocate (token_num, dp_rank);
717
+ return block_manager_pool_ ->allocate (token_num, dp_rank);
716
718
}
717
719
718
720
std::vector<int64_t > ContinuousScheduler::get_num_occupied_slots (
719
721
std::vector<Sequence*>& sequences) const {
720
722
std::vector<int64_t > num_occupied_slots (options_.dp_size ());
721
723
std::vector<int64_t > num_unfilled_blocks (options_.dp_size ());
722
- std::vector<size_t > num_used_blocks = block_manager_ ->num_used_blocks ();
724
+ std::vector<size_t > num_used_blocks = block_manager_pool_ ->num_used_blocks ();
723
725
724
- const int block_size = block_manager_ ->options ().block_size ();
726
+ const int block_size = block_manager_pool_ ->options ().block_size ();
725
727
726
728
for (auto & sequence : sequences) {
727
729
const int32_t dp_rank = sequence->dp_rank ();
0 commit comments