Skip to content

Conversation

weizhehuang0827
Copy link
Collaborator

implement multi-priority and on/offline unified request schedule.

@weizhehuang0827
Copy link
Collaborator Author

Note that a simpler alternative strategy is to continue using deque to implement decode queue and sort it directly each time when scheduling, which seems to have higher complexity in theory.

const std::string& service_request_id,
bool offline,
int32_t slo_ms,
xllm::proto::Priority priority)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a new enum Priority to replace xllm::proto::Priority, we'd better avoid using protocol buffer types during serving.


int32_t slo_ms = 0;

xllm::proto::Priority priority = xllm::proto::Priority::NORMAL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, a new enum Priority type should be used here.

std::shared_ptr<Request> request = prefill_request_queue_.pop();

auto poped_result = prefill_request_queue_.try_pop();
// OPTIMIZE 之后改为:多次尝试读取在线 prefill
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: emmm... comments in English. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry :(


DEFINE_string(priority_strategy, "FCFS", "priority strategy for requests");

DEFINE_bool(enable_on_preempt_off,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: modify the flag name, as it may be mistaken for enable, on_preempt, off at first glance.

use enable_online_preempt_offline or some names else.

num_blocks_can_evict += seq->kv_state().num_kv_blocks();
}
if ((num_blocks_needed <= num_blocks_can_evict) ||
has_enough_blocks(num_blocks_needed - num_blocks_can_evict)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to call has_enough_blocks every time. This is because before calling check_if_enough_to_evict, the allocate operation must have failed. Additionally, the entire execution code runs serially with no concurrency (in the PD separation scenario, there is concurrency when allocating blocks for remote prompts, but this only involves block allocation and no block release). Therefore, the number of free blocks will not increase.

The has_enough_blocks function call prefix_cache eviction logic every time, and this may cost to much.

Based on the above considerations, we can place the function check_if_enough_to_evict directly in the contiguous scheduler file. This is because we only need to judge whether the total num_kv_blocks in running_queue_to_evict meets the size requirement of num_request_to_evict.
BlockManagerImpl is inherently used for managing block allocations, it is not very appropriate to pass parameters such as DecodePriorityQueue to it.

while (!running_queue_.empty() &&
size_t& num_offd_preempt_off_requests,
size_t& num_ond_preempt_on_requests,
size_t& num_ond_preempt_off_requests,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: offd and ond , It’s not immediately known what it means. :(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to make name short and align with num_preempt_requests orz. OK I will change it to whole name.

size_t& num_preempted_requests) {
// Do nothing: have new prefill requests to handle, or have no running
// requests
if (!running_sequences_.empty() || running_queue_.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emmm... What is the purpose of removing this logic? In ContinuousScheduler, prefill will be will be executed with high priority.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move this logic to prepare_batch and remove the redundant second empty. I hope to wrap the two handle_decode_requests to ensure that only prefill or decode is processed.

size_t updated_num_tokens =
sequence->num_tokens() + options_.num_speculative_tokens() + 1;
sequence->num_tokens() + options_.num_speculative_tokens();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the target new generated tokens is options_.num_speculative_tokens() + 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my test script, I confirm the new token has been added to sequence num_tokens. There is no need to +1 again. There is a logic difference between ContinuousScheduler and ChunkedPrefillScheduler here before.

@@ -537,7 +691,8 @@ std::vector<Batch> ContinuousScheduler::schedule_request(
return batch;
}

if (!waiting_priority_queue_.empty() || !running_queue_.empty()) {
if (!waiting_priority_queue_.empty() || !running_queue_->empty() ||
!waiting_priority_queue_offline_.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add running_queue_offline_ ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

poped_result = prefill_request_queue_offline_.try_pop();
if (!poped_result.has_value()) {
// no offline request, sleep for a while and try again
absl::SleepFor(absl::Milliseconds(100));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this maybe block online request.

A more appropriate strategy is to design the pop interface as a blocking interface with timeout waiting (for example, it automatically returns null after 100ms), and then use try_pop to read offline requests. This way, what we block are offline requests rather than online requests.

std::shared_ptr<Request> poped_result = prefill_request_queue_.pop();
if (poped_result.has_value()) {
   // nothing
} else {
   poped_result = prefill_request_queue_offline_.try_pop();
}

pop function:

    absl::optional<T> pop(absl::Duration timeout) {
        absl::MutexLock lock(&mutex_);

        bool has_value = mutex_.AwaitWithTimeout(
            absl::Condition(
                +[](std::queue<T>* queue) { return !queue->empty(); },
                &queue_
            ),
            timeout
        );

        if (!has_value) {
            return absl::nullopt;
        }

        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants