@@ -416,13 +416,12 @@ void WorkerService::PullKVCache(::google::protobuf::RpcController* controller,
416416
417417void WorkerService::TransferBlocks (
418418 ::google::protobuf::RpcController* controller,
419- const ::xllm:: proto::BlockTransferInfos* req,
420- ::xllm:: proto::TransferStatus* resp,
419+ const proto::BlockTransferInfos* req,
420+ proto::TransferStatus* resp,
421421 ::google::protobuf::Closure* done) {
422422 brpc::ClosureGuard done_guard (done);
423423 std::vector<BlockTransferInfo> block_transfer_info;
424- uint64_t batch_id;
425- proto_to_block_transfer_info (*req, batch_id, block_transfer_info);
424+ uint64_t batch_id = proto_to_block_transfer_info (*req, block_transfer_info);
426425
427426 if (batch_id == 0x0 ) {
428427 resp->set_success_cnt (worker_->transfer_kv_blocks (block_transfer_info));
@@ -432,6 +431,114 @@ void WorkerService::TransferBlocks(
432431 return ;
433432}
434433
434+ class ServerStreamHandler : public brpc ::StreamInputHandler {
435+ private:
436+ std::promise<void > close_promise_;
437+ std::atomic<bool > promise_set_{false };
438+
439+ public:
440+ ~ServerStreamHandler () {
441+ if (!promise_set_.exchange (true )) {
442+ try {
443+ close_promise_.set_value ();
444+ } catch (const std::exception& e) {
445+ LOG (WARNING) << " Exception in destructor: " << e.what ();
446+ }
447+ }
448+ }
449+
450+ std::future<void > get_close_future () { return close_promise_.get_future (); }
451+
452+ int on_received_messages (brpc::StreamId id,
453+ butil::IOBuf* const messages[],
454+ size_t size) override {
455+ LOG (WARNING) << " ServerStreamHandler::on_received_messages not implement." ;
456+ return 0 ;
457+ }
458+
459+ void on_closed (brpc::StreamId id) override {
460+ if (!promise_set_.exchange (true )) {
461+ close_promise_.set_value ();
462+ }
463+ }
464+
465+ void on_idle_timeout (brpc::StreamId id) override {
466+ if (!promise_set_.exchange (true )) {
467+ LOG (WARNING) << " Stream idle timeout: " << id;
468+ close_promise_.set_value ();
469+ }
470+ }
471+ };
472+
473+ void WorkerService::PrefetchFromStorage (
474+ google::protobuf::RpcController* controller,
475+ const proto::BlockTransferInfos* req,
476+ proto::Status* resp,
477+ google::protobuf::Closure* done) {
478+ brpc::ClosureGuard done_guard (done);
479+ brpc::Controller* cntl = static_cast <brpc::Controller*>(controller);
480+
481+ auto stream_handler = std::make_unique<ServerStreamHandler>();
482+ auto stream_id = std::make_unique<brpc::StreamId>();
483+ brpc::StreamOptions stream_options;
484+ stream_options.handler = stream_handler.get ();
485+ if (brpc::StreamAccept (stream_id.get (), *cntl, &stream_options) != 0 ) {
486+ resp->set_ok (false );
487+ LOG (ERROR) << " Failed to accept stream!" ;
488+ return ;
489+ }
490+
491+ std::vector<BlockTransferInfo> block_transfer_info;
492+ proto_to_block_transfer_info (*req, block_transfer_info);
493+
494+ copy_threadpool_.schedule (
495+ [this ,
496+ block_transfer_info = std::move (block_transfer_info),
497+ stream_id = std::move (stream_id),
498+ stream_handler = std::move (stream_handler)]() mutable {
499+ Slice<BlockTransferInfo> transfer_slice{block_transfer_info};
500+ auto close_future = stream_handler->get_close_future ();
501+ bool is_completed = false ;
502+
503+ for (size_t i = 0 ; i < transfer_slice.size ();
504+ i += stream_copy_batch_size_) {
505+ auto current_slice = transfer_slice.slice (
506+ i, std::min (i + stream_copy_batch_size_, transfer_slice.size ()));
507+
508+ auto success_cnt = worker_->prefetch_from_storage (current_slice);
509+
510+ if (success_cnt != current_slice.size () ||
511+ i + stream_copy_batch_size_ >= transfer_slice.size ()) {
512+ is_completed = true ;
513+ }
514+
515+ butil::IOBuf buf;
516+ buf.append (std::to_string (success_cnt));
517+ if (brpc::StreamWrite (*stream_id.get (), buf) != 0 ) {
518+ brpc::StreamClose (*stream_id.get ());
519+ is_completed = false ;
520+ break ;
521+ }
522+
523+ if (is_completed) {
524+ if (success_cnt != 0 ) {
525+ butil::IOBuf buf_end;
526+ buf_end.append (" 0" );
527+ brpc::StreamWrite (*stream_id.get (), buf_end);
528+ }
529+ break ;
530+ }
531+ }
532+ if (is_completed) {
533+ close_future.wait ();
534+ }
535+ brpc::StreamClose (*stream_id.get ());
536+ });
537+
538+ resp->set_ok (true );
539+ return ;
540+ }
541+
435542void WorkerService::GetDeviceInfo (::google::protobuf::RpcController* controller,
436543 const proto::Empty* req,
437544 proto::DeviceInfo* resp,
0 commit comments