Skip to content

Commit ac92836

Browse files
feat: support multi-priority and on/offline unified request schedule.
1 parent d99fa64 commit ac92836

39 files changed

+1155
-134
lines changed

xllm/core/common/global_flags.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,9 @@ DEFINE_string(etcd_addr, "", "etcd adderss for save instance meta info");
199199
DEFINE_bool(enable_service_routing, false, "whether to use etcd.");
200200

201201
DEFINE_int32(heart_beat_interval, 3, "heart beat interval");
202+
203+
DEFINE_string(priority_strategy, "FCFS", "priority strategy for requests");
204+
205+
DEFINE_bool(enable_on_preempt_off,
206+
true,
207+
"whether enable online preempt offline");

xllm/core/common/global_flags.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,7 @@ DECLARE_int32(heart_beat_interval);
126126
DECLARE_int32(chunked_match_frequency);
127127

128128
DECLARE_bool(use_zero_evict);
129+
130+
DECLARE_string(priority_strategy);
131+
132+
DECLARE_bool(enable_on_preempt_off);

xllm/core/common/metrics.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ DEFINE_GAUGE(num_running_requests, "Number of running requests in scheduler");
8888
DEFINE_GAUGE(num_waiting_requests, "Number of waiting requests in scheduler");
8989
DEFINE_GAUGE(num_preempted_requests,
9090
"Number of preempted requests in scheduler");
91+
DEFINE_GAUGE(num_offd_preempt_off_requests,
92+
"Number of offline decode preempt offline requests in scheduler");
93+
DEFINE_GAUGE(num_ond_preempt_on_requests,
94+
"Number of online decode preempt online requests in scheduler");
95+
DEFINE_GAUGE(num_onp_preempt_off_requests,
96+
"Number of online prefill preempt offline requests in scheduler");
97+
DEFINE_GAUGE(num_ond_preempt_off_requests,
98+
"Number of online decode preempt offline requests in scheduler");
9199

92100
DEFINE_GAUGE(num_running_sequences, "Number of running sequences");
93101

xllm/core/common/metrics.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ DECLARE_GAUGE(num_pending_requests);
149149
DECLARE_GAUGE(num_running_requests);
150150
DECLARE_GAUGE(num_waiting_requests);
151151
DECLARE_GAUGE(num_preempted_requests);
152+
DECLARE_GAUGE(num_offd_preempt_off_requests);
153+
DECLARE_GAUGE(num_ond_preempt_on_requests);
154+
DECLARE_GAUGE(num_onp_preempt_off_requests);
155+
DECLARE_GAUGE(num_ond_preempt_off_requests);
152156
DECLARE_GAUGE(num_running_sequences);
153157
DECLARE_GAUGE(kv_cache_utilization_perc);
154158
DECLARE_GAUGE(num_blocks_in_prefix_cache);

xllm/core/common/options.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ class Options {
113113
PROPERTY(bool, enable_service_routing) = false;
114114

115115
PROPERTY(std::optional<std::string>, tool_call_parser);
116+
117+
PROPERTY(std::string, priority_strategy) = "FCFS";
118+
119+
PROPERTY(bool, enable_on_preempt_off) = true;
116120
};
117121

118122
} // namespace xllm

xllm/core/distributed_runtime/disagg_pd_service_impl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ std::shared_ptr<Request> DisaggPDServiceImpl::generate_request(
9494
req.x_request_id(),
9595
req.x_request_time(),
9696
std::move(req_state),
97-
req.service_req_id());
97+
req.service_req_id(),
98+
req.offline(),
99+
req.slo_ms(),
100+
req.priority());
98101

99102
// add one sequence, rest will be added by scheduler
100103
return new_request;

xllm/core/framework/block/block_manager.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ limitations under the License.
3232
#include "common/metrics.h"
3333
#include "common/types.h"
3434
#include "framework/prefix_cache/prefix_cache.h"
35+
#include "framework/request/request.h"
36+
#include "framework/request/sequence.h"
37+
#include "scheduler/decode_priority_queue.h"
3538
#include "util/timer.h"
3639

3740
namespace xllm {
38-
41+
// class DecodePriorityQueue;
3942
class BlockManager {
4043
public:
4144
struct Options {
@@ -59,6 +62,10 @@ class BlockManager {
5962

6063
virtual void cache(const Slice<int32_t>& token_ids,
6164
const Slice<Block>& blocks) = 0;
65+
virtual bool check_if_enough_to_evict(
66+
DecodePriorityQueue* running_queue_to_evict,
67+
Sequence* prefill_sequence,
68+
size_t& num_request_to_evict) = 0;
6269

6370
// get merged all dp rank KVCacheEvent
6471
virtual void get_merged_kvcache_event(KvCacheEvent* event) const = 0;

xllm/core/framework/block/block_manager_impl.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ BlockManagerImpl::BlockManagerImpl(const Options& options)
3030
}
3131

3232
size_t total_blocks = options_.num_blocks();
33+
block_size_ = options_.block_size();
3334
num_free_blocks_ = total_blocks;
3435
free_blocks_.reserve(total_blocks);
3536
for (int32_t i = 0; i < total_blocks; ++i) {
@@ -73,6 +74,33 @@ void BlockManagerImpl::deallocate(const Slice<Block>& blocks) {
7374
}
7475
}
7576

77+
bool BlockManagerImpl::check_if_enough_to_evict(
78+
DecodePriorityQueue* running_queue_to_evict,
79+
Sequence* prefill_sequence,
80+
size_t& num_request_to_evict) {
81+
// check if it's enough when we evict this requests queue
82+
83+
const size_t num_blocks_needed =
84+
(prefill_sequence->num_tokens() + block_size_ - 1) / block_size_;
85+
size_t num_blocks_can_evict = 0;
86+
// count the number of blocks can be preempted
87+
for (auto it = running_queue_to_evict->rbegin();
88+
it != running_queue_to_evict->rend();
89+
++it) {
90+
std::shared_ptr<Request> request_to_preempt = *it;
91+
num_request_to_evict++;
92+
// count the number of blocks belong to the request
93+
for (const auto& seq : request_to_preempt->sequences()) {
94+
num_blocks_can_evict += seq->kv_state().num_kv_blocks();
95+
}
96+
if ((num_blocks_needed <= num_blocks_can_evict) ||
97+
has_enough_blocks(num_blocks_needed - num_blocks_can_evict)) {
98+
return true;
99+
}
100+
}
101+
return false;
102+
}
103+
76104
bool BlockManagerImpl::has_enough_blocks(uint32_t num_blocks) {
77105
if (num_blocks <= num_free_blocks_) {
78106
return true;

xllm/core/framework/block/block_manager_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class BlockManagerImpl : public BlockManager {
4646

4747
void get_merged_kvcache_event(KvCacheEvent* event) const override;
4848

49+
bool check_if_enough_to_evict(DecodePriorityQueue* running_queue_to_evict,
50+
Sequence* prefill_sequence,
51+
size_t& num_request_to_evict) override;
52+
4953
size_t num_blocks_in_prefix_cache() const override {
5054
if (options_.enable_prefix_cache()) {
5155
CHECK(prefix_cache_);
@@ -99,6 +103,9 @@ class BlockManagerImpl : public BlockManager {
99103
// free block count
100104
size_t num_free_blocks_ = 0;
101105

106+
// block size
107+
size_t block_size_ = 0;
108+
102109
// free block list
103110
std::vector<int32_t> free_blocks_;
104111
};

xllm/core/framework/block/block_manager_pool.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ bool BlockManagerPool::allocate(Sequence* sequence) {
9292
return allocate(sequence, sequence->num_tokens());
9393
}
9494

95+
bool BlockManagerPool::check_if_enough_to_evict(
96+
DecodePriorityQueue* running_queue_to_evict,
97+
Sequence* prefill_sequence,
98+
size_t& num_request_to_evict) {
99+
DCHECK(prefill_sequence != nullptr);
100+
int32_t dp_rank = prefill_sequence->dp_rank();
101+
return block_managers_[dp_rank]->check_if_enough_to_evict(
102+
running_queue_to_evict, prefill_sequence, num_request_to_evict);
103+
}
104+
95105
bool BlockManagerPool::allocate(std::vector<Sequence*>& sequences) {
96106
for (auto* sequence : sequences) {
97107
DCHECK(sequence != nullptr);

0 commit comments

Comments
 (0)