Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions xllm/core/common/global_flags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,9 @@ DEFINE_string(etcd_addr, "", "etcd adderss for save instance meta info");
DEFINE_bool(enable_service_routing, false, "whether to use etcd.");

DEFINE_int32(heart_beat_interval, 3, "heart beat interval");

DEFINE_string(priority_strategy, "FCFS", "priority strategy for requests");

DEFINE_bool(enable_online_preempt_offline,
true,
"whether enable online preempt offline");
4 changes: 4 additions & 0 deletions xllm/core/common/global_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,7 @@ DECLARE_int32(heart_beat_interval);
DECLARE_int32(chunked_match_frequency);

DECLARE_bool(use_zero_evict);

DECLARE_string(priority_strategy);

DECLARE_bool(enable_online_preempt_offline);
8 changes: 8 additions & 0 deletions xllm/core/common/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ DEFINE_GAUGE(num_running_requests, "Number of running requests in scheduler");
DEFINE_GAUGE(num_waiting_requests, "Number of waiting requests in scheduler");
DEFINE_GAUGE(num_preempted_requests,
"Number of preempted requests in scheduler");
DEFINE_GAUGE(num_offline_decode_preempt_offline_requests,
"Number of offline decode preempt offline requests in scheduler");
DEFINE_GAUGE(num_online_decode_preempt_online_requests,
"Number of online decode preempt online requests in scheduler");
DEFINE_GAUGE(num_online_prefill_preempt_offline_requests,
"Number of online prefill preempt offline requests in scheduler");
DEFINE_GAUGE(num_online_decode_preempt_offline_requests,
"Number of online decode preempt offline requests in scheduler");

DEFINE_GAUGE(num_running_sequences, "Number of running sequences");

Expand Down
4 changes: 4 additions & 0 deletions xllm/core/common/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ DECLARE_GAUGE(num_pending_requests);
DECLARE_GAUGE(num_running_requests);
DECLARE_GAUGE(num_waiting_requests);
DECLARE_GAUGE(num_preempted_requests);
DECLARE_GAUGE(num_offline_decode_preempt_offline_requests);
DECLARE_GAUGE(num_online_decode_preempt_online_requests);
DECLARE_GAUGE(num_online_prefill_preempt_offline_requests);
DECLARE_GAUGE(num_online_decode_preempt_offline_requests);
DECLARE_GAUGE(num_running_sequences);
DECLARE_GAUGE(kv_cache_utilization_perc);
DECLARE_GAUGE(num_blocks_in_prefix_cache);
Expand Down
4 changes: 4 additions & 0 deletions xllm/core/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class Options {
PROPERTY(bool, enable_service_routing) = false;

PROPERTY(std::optional<std::string>, tool_call_parser);

PROPERTY(std::string, priority_strategy) = "FCFS";

PROPERTY(bool, enable_online_preempt_offline) = true;
};

} // namespace xllm
14 changes: 9 additions & 5 deletions xllm/core/distributed_runtime/disagg_pd_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,15 @@ std::shared_ptr<Request> DisaggPDServiceImpl::generate_request(
output_callback,
batch_output_callback);

auto new_request = std::make_shared<Request>(req.req_id(),
req.x_request_id(),
req.x_request_time(),
std::move(req_state),
req.service_req_id());
auto new_request = std::make_shared<Request>(
req.req_id(),
req.x_request_id(),
req.x_request_time(),
std::move(req_state),
req.service_req_id(),
req.offline(),
req.slo_ms(),
static_cast<xllm::RequestPriority>(req.priority()));

// add one sequence, rest will be added by scheduler
return new_request;
Expand Down
5 changes: 4 additions & 1 deletion xllm/core/framework/block/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ limitations under the License.
#include "common/metrics.h"
#include "common/types.h"
#include "framework/prefix_cache/prefix_cache.h"
#include "framework/request/request.h"
#include "framework/request/sequence.h"
#include "scheduler/decode_priority_queue.h"
#include "util/timer.h"

namespace xllm {

// class DecodePriorityQueue;
class BlockManager {
public:
struct Options {
Expand Down
1 change: 1 addition & 0 deletions xllm/core/framework/block/block_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ BlockManagerImpl::BlockManagerImpl(const Options& options)
}

size_t total_blocks = options_.num_blocks();
block_size_ = options_.block_size();
num_free_blocks_ = total_blocks;
free_blocks_.reserve(total_blocks);
for (int32_t i = 0; i < total_blocks; ++i) {
Expand Down
3 changes: 3 additions & 0 deletions xllm/core/framework/block/block_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class BlockManagerImpl : public BlockManager {
// free block count
size_t num_free_blocks_ = 0;

// block size
size_t block_size_ = 0;

// free block list
std::vector<int32_t> free_blocks_;
};
Expand Down
2 changes: 2 additions & 0 deletions xllm/core/framework/request/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ cc_library(
sequences_group.h
request_state.h
stopping_checker.h
priority_comparator.h
SRCS
finish_reason.cpp
incremental_decoder.cpp
Expand All @@ -32,6 +33,7 @@ cc_library(
sequences_group.cpp
request_state.cpp
stopping_checker.cpp
priority_comparator.cpp
DEPS
:kv_cache
:prefix_cache
Expand Down
54 changes: 54 additions & 0 deletions xllm/core/framework/request/priority_comparator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "priority_comparator.h"

#include "glog/logging.h"

namespace xllm {

// implement operator()
bool FCFSComparator::operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const {
return a->created_time() > b->created_time();
}

bool StrictPriorityComparator::operator()(
const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const {
auto priority_a = a->priority();
auto priority_b = b->priority();
if (priority_a != priority_b) {
return priority_a > priority_b; // HIGH(1) < NORMAL(2) < LOW(3)
}
return a->created_time() > b->created_time();
}

bool DeadlineComparator::operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const {
return a->slo_ms() - a->elapsed_seconds() * 1000 >
b->slo_ms() - b->elapsed_seconds() * 1000;
}

std::function<bool(const std::shared_ptr<Request>&,
const std::shared_ptr<Request>&)>
create_comparator(const std::string& priority_strategy) {
if (priority_strategy == "FCFS") {
return [](const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) {
return FCFSComparator()(a, b);
};
} else if (priority_strategy == "priority") {
return [](const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) {
return StrictPriorityComparator()(a, b);
};
} else if (priority_strategy == "deadline") {
return [](const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) {
return DeadlineComparator()(a, b);
};
} else {
LOG(FATAL) << "Unknown strategy: " << priority_strategy;
return nullptr;
}
}

} // namespace xllm
36 changes: 36 additions & 0 deletions xllm/core/framework/request/priority_comparator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <functional>
#include <memory>
#include <string>

#include "common.pb.h"
#include "framework/request/request.h"

namespace xllm {
class PriorityComparator {
public:
virtual bool operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const = 0;
virtual ~PriorityComparator() = default;
};

struct FCFSComparator : public PriorityComparator {
bool operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const override;
};

struct StrictPriorityComparator : public PriorityComparator {
bool operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const override;
};

struct DeadlineComparator : public PriorityComparator {
bool operator()(const std::shared_ptr<Request>& a,
const std::shared_ptr<Request>& b) const override;
};

std::function<bool(const std::shared_ptr<Request>&,
const std::shared_ptr<Request>&)>
create_comparator(const std::string& priority_strategy);

} // namespace xllm
10 changes: 8 additions & 2 deletions xllm/core/framework/request/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ Request::Request(const std::string& request_id,
const std::string& x_request_id,
const std::string& x_request_time,
const RequestState& state,
const std::string& service_request_id)
const std::string& service_request_id,
bool offline,
int32_t slo_ms,
RequestPriority priority)
: request_id_(request_id),
service_request_id_(service_request_id),
x_request_id_(x_request_id),
x_request_time_(x_request_time),
state_(std::move(state)),
created_time_(absl::Now()) {
created_time_(absl::Now()),
offline_(offline),
priority_(priority),
slo_ms_(slo_ms) {
create_sequences_group();
}

Expand Down
17 changes: 16 additions & 1 deletion xllm/core/framework/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ limitations under the License.

namespace xllm {

enum class RequestPriority { DEFAULT = 0, HIGH = 1, NORMAL = 2, LOW = 3 };

class Request {
public:
Request(const std::string& request_id,
const std::string& x_request_id,
const std::string& x_request_time,
const RequestState& state,
const std::string& service_request_id = "");
const std::string& service_request_id = "",
bool offline = false,
int32_t slo_ms = 0,
RequestPriority priority = RequestPriority::NORMAL);

bool finished() const;

Expand Down Expand Up @@ -81,6 +86,10 @@ class Request {

const std::string& x_request_time() const { return x_request_time_; }

const bool offline() const { return offline_; }
const int32_t slo_ms() const { return slo_ms_; }
const RequestPriority priority() const { return priority_; }

RequestState& state() { return state_; }

void update_connection_status();
Expand Down Expand Up @@ -108,6 +117,12 @@ class Request {

std::atomic<bool> cancelled_{false};

bool offline_;

int32_t slo_ms_;

RequestPriority priority_;

private:
void create_sequences_group();
};
Expand Down
20 changes: 20 additions & 0 deletions xllm/core/framework/request/request_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ RequestParams::RequestParams(const proto::CompletionRequest& request,
request_id = generate_completion_request_id();
x_request_id = x_rid;
x_request_time = x_rtime;
if (request.has_offline()) {
offline = request.offline();
}
if (request.has_slo_ms()) {
slo_ms = request.slo_ms();
}
if (request.has_priority()) {
priority = static_cast<xllm::RequestPriority>(request.priority());
}

if (request.has_service_request_id()) {
service_request_id = request.service_request_id();
Expand Down Expand Up @@ -186,6 +195,17 @@ void InitFromChatRequest(RequestParams& params, const ChatRequest& request) {
if (request.has_request_id()) {
params.request_id = request.request_id();
}

if (request.has_offline()) {
params.offline = request.offline();
}
if (request.has_slo_ms()) {
params.slo_ms = request.slo_ms();
}
if (request.has_priority()) {
params.priority = static_cast<xllm::RequestPriority>(request.priority());
}

if (request.has_service_request_id()) {
params.service_request_id = request.service_request_id();
}
Expand Down
8 changes: 8 additions & 0 deletions xllm/core/framework/request/request_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ limitations under the License.
#include <vector>

#include "chat.pb.h"
#include "common.pb.h"
#include "common/macros.h"
#include "completion.pb.h"
#include "core/common/macros.h"
#include "core/common/types.h"
#include "embedding.pb.h"
#include "multimodal.pb.h"
#include "request.h"
#include "request_output.h"

namespace xllm {
Expand Down Expand Up @@ -124,6 +126,12 @@ struct RequestParams {
std::vector<xllm::JsonTool> tools;
std::string tool_choice = "auto";
bool has_tools() const { return !tools.empty(); }

bool offline = false;

int32_t slo_ms = 0;

RequestPriority priority = RequestPriority::NORMAL;
};

} // namespace xllm
5 changes: 4 additions & 1 deletion xllm/core/runtime/llm_master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,10 @@ std::shared_ptr<Request> LLMMaster::generate_request(
sp.x_request_id,
sp.x_request_time,
std::move(req_state),
sp.service_request_id);
sp.service_request_id,
sp.offline,
sp.slo_ms,
sp.priority);

// add one sequence, rest will be added by scheduler
return request;
Expand Down
4 changes: 4 additions & 0 deletions xllm/core/runtime/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ struct Options {

// enable service routing mode.
PROPERTY(bool, enable_service_routing) = false;

PROPERTY(std::string, priority_strategy) = "FCFS";

PROPERTY(bool, enable_online_preempt_offline) = true;
};

} // namespace runtime
Expand Down
3 changes: 3 additions & 0 deletions xllm/core/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cc_library(
async_response_processor.h
scheduler.h
scheduler_factory.h
decode_priority_queue.h
SRCS
chunked_prefill_scheduler.cpp
zero_eviction_scheduler.cpp
Expand All @@ -32,8 +33,10 @@ cc_library(
cc_test(
NAME
chunked_prefill_scheduler_test
continuous_scheduler_test
SRCS
chunked_prefill_scheduler_test.cpp
continuous_scheduler_test.cpp
DEPS
:scheduler
GTest::gtest_main
Expand Down
Loading