Skip to content

Commit 6aadeea

Browse files
EvanCleycaoyifan.cyf
andauthored
feat: implement PreheatFile and StatFile for scheduler (#4380) (#4381)
Co-authored-by: caoyifan.cyf <[email protected]>
1 parent 93a8ce6 commit 6aadeea

File tree

11 files changed

+737
-6
lines changed

11 files changed

+737
-6
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.8
44

55
require (
66
cloud.google.com/go/storage v1.50.0
7-
d7y.io/api/v2 v2.1.79
7+
d7y.io/api/v2 v2.1.80
88
github.com/MysteriousPotato/go-lockable v1.0.0
99
github.com/Showmax/go-fqdn v1.0.0
1010
github.com/VividCortex/mysqlerr v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
6363
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
6464
cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4=
6565
cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI=
66-
d7y.io/api/v2 v2.1.79 h1:LUPpjiuhHZgyezHBH7lG42pkqjcdbiTAHtEJ2Z7tUJg=
67-
d7y.io/api/v2 v2.1.79/go.mod h1:t6k27g8dFyH6sp3y2J1qHyk2YmoySd88qSbsEaqJ2Sw=
66+
d7y.io/api/v2 v2.1.80 h1:dH4w0dUXLpZ1sdp6QuWUVXkjQaUSX/pAOFFunkrcsUs=
67+
d7y.io/api/v2 v2.1.80/go.mod h1:t6k27g8dFyH6sp3y2J1qHyk2YmoySd88qSbsEaqJ2Sw=
6868
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
6969
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
7070
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=

internal/dflog/logger.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,24 @@ func WithStatImageAndTaskID(url, taskID string) *SugaredLoggerOnWith {
256256
}
257257
}
258258

259+
func WithPreheatFile(url string) *SugaredLoggerOnWith {
260+
return &SugaredLoggerOnWith{
261+
withArgs: []any{"url", url},
262+
}
263+
}
264+
265+
func WithStatFile(url string) *SugaredLoggerOnWith {
266+
return &SugaredLoggerOnWith{
267+
withArgs: []any{"url", url},
268+
}
269+
}
270+
271+
func WithStatFileAndTaskID(url, taskID string) *SugaredLoggerOnWith {
272+
return &SugaredLoggerOnWith{
273+
withArgs: []any{"url", url, "taskID", taskID},
274+
}
275+
}
276+
259277
func (log *SugaredLoggerOnWith) With(args ...any) *SugaredLoggerOnWith {
260278
args = append(args, log.withArgs...)
261279
return &SugaredLoggerOnWith{

internal/job/types.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ package job
1818

1919
import (
2020
"time"
21+
22+
"google.golang.org/protobuf/types/known/durationpb"
23+
24+
v2 "d7y.io/api/v2/pkg/apis/common/v2"
25+
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
2126
)
2227

2328
// PreheatRequest defines the request parameters for preheating.
@@ -83,6 +88,24 @@ type GetTaskResponse struct {
8388
SchedulerClusterID uint `json:"scheduler_cluster_id"`
8489
}
8590

91+
// ListTaskEntriesRequest defines the request parameters for listing task entries.
92+
type ListTaskEntriesRequest struct {
93+
TaskID string `json:"task_id" validate:"required"`
94+
Url string `json:"url" validate:"omitempty"`
95+
Timeout *durationpb.Duration `json:"timeout" validate:"omitempty"`
96+
Header map[string]string `json:"header" validate:"omitempty"`
97+
CertificateChain [][]byte `json:"certificate_chain" validate:"omitempty"`
98+
ObjectStorage *v2.ObjectStorage `json:"object_storage" validate:"omitempty"`
99+
Hdfs *v2.HDFS `json:"hdfs" validate:"omitempty"`
100+
}
101+
102+
// ListTaskEntriesResponse defines the response parameters for listing task entries.
103+
type ListTaskEntriesResponse struct {
104+
Entries []*dfdaemonv2.Entry `json:"entries"`
105+
Recursive bool `json:"recursive"`
106+
SchedulerID uint `json:"scheduler_id"`
107+
}
108+
86109
// Peer represents the peer information.
87110
type Peer struct {
88111
ID string `json:"id"`

pkg/rpc/dfdaemon/client/client_v2.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ type V2 interface {
8989
// DeleteTask deletes task from p2p network.
9090
DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error
9191

92+
// ListTaskEntries lists task entries.
93+
ListTaskEntries(context.Context, *dfdaemonv2.ListTaskEntriesRequest, ...grpc.CallOption) (*dfdaemonv2.ListTaskEntriesResponse, error)
94+
9295
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
9396
DownloadPersistentCacheTask(context.Context, *dfdaemonv2.DownloadPersistentCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error)
9497

@@ -192,3 +195,11 @@ func (v *v2) DeletePersistentCacheTask(ctx context.Context, req *dfdaemonv2.Dele
192195
_, err := v.DfdaemonUploadClient.DeletePersistentCacheTask(ctx, req, opts...)
193196
return err
194197
}
198+
199+
// ListTaskEntries lists task entries from p2p network.
200+
func (v *v2) ListTaskEntries(ctx context.Context, req *dfdaemonv2.ListTaskEntriesRequest, opts ...grpc.CallOption) (*dfdaemonv2.ListTaskEntriesResponse, error) {
201+
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
202+
defer cancel()
203+
204+
return v.DfdaemonUploadClient.ListTaskEntries(ctx, req, opts...)
205+
}

scheduler/job/job.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ type Job interface {
5959
// GetTask retrieves task information from all hosts in the cluster.
6060
GetTask(context.Context, *internaljob.GetTaskRequest, *logger.SugaredLoggerOnWith) (*internaljob.GetTaskResponse, error)
6161

62+
// ListTaskEntries lists all task entries.
63+
ListTaskEntries(context.Context, *internaljob.ListTaskEntriesRequest, *logger.SugaredLoggerOnWith) (*internaljob.ListTaskEntriesResponse, error)
64+
6265
// PreheatSinglePeer preheats job by single seed peer, scheduler will trigger seed peer to download task.
6366
PreheatSingleSeedPeer(context.Context, *internaljob.PreheatRequest, *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error)
6467

@@ -1034,3 +1037,47 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
10341037
SchedulerClusterID: j.config.Manager.SchedulerClusterID,
10351038
})
10361039
}
1040+
1041+
func (j *job) ListTaskEntries(ctx context.Context, req *internaljob.ListTaskEntriesRequest, log *logger.SugaredLoggerOnWith) (*internaljob.ListTaskEntriesResponse, error) {
1042+
advertiseIP := j.config.Server.AdvertiseIP.String()
1043+
1044+
selected, err := j.resource.SeedPeer().Select(ctx, req.TaskID)
1045+
if err != nil {
1046+
return nil, err
1047+
}
1048+
1049+
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
1050+
log.Infof("selected seed peer %s for task %s", addr, req.TaskID)
1051+
1052+
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, j.dialOptions...)
1053+
if err != nil {
1054+
log.Errorf("[list-task-entries] get dfdaemon client failed: %s", err)
1055+
return nil, err
1056+
}
1057+
1058+
res, err := dfdaemonClient.ListTaskEntries(ctx, &dfdaemonv2.ListTaskEntriesRequest{
1059+
TaskId: req.TaskID,
1060+
Url: req.Url,
1061+
RequestHeader: req.Header,
1062+
Timeout: req.Timeout,
1063+
CertificateChain: req.CertificateChain,
1064+
ObjectStorage: req.ObjectStorage,
1065+
Hdfs: req.Hdfs,
1066+
RemoteIp: &advertiseIP,
1067+
})
1068+
if err != nil {
1069+
log.Errorf("[list-task-entries]list task entries failed: %s", err)
1070+
return nil, err
1071+
}
1072+
1073+
var recursive bool
1074+
if len(res.Entries) > 1 {
1075+
recursive = true
1076+
}
1077+
1078+
return &internaljob.ListTaskEntriesResponse{
1079+
Recursive: recursive,
1080+
Entries: res.Entries,
1081+
SchedulerID: j.config.Manager.SchedulerClusterID,
1082+
}, nil
1083+
}

scheduler/job/mocks/job_mock.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/metrics/metrics.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,34 @@ var (
485485
Help: "Counter of the number of failed of the stating image.",
486486
})
487487

488+
PreheatFileCount = promauto.NewCounter(prometheus.CounterOpts{
489+
Namespace: types.MetricsNamespace,
490+
Subsystem: types.SchedulerMetricsName,
491+
Name: "preheat_file_total",
492+
Help: "Counter of the total preheat file.",
493+
})
494+
495+
PreheatFileFailureCount = promauto.NewCounter(prometheus.CounterOpts{
496+
Namespace: types.MetricsNamespace,
497+
Subsystem: types.SchedulerMetricsName,
498+
Name: "preheat_file_failure_total",
499+
Help: "Counter of the total preheat file failure.",
500+
})
501+
502+
StatFileCount = promauto.NewCounter(prometheus.CounterOpts{
503+
Namespace: types.MetricsNamespace,
504+
Subsystem: types.SchedulerMetricsName,
505+
Name: "stat_file_total",
506+
Help: "Counter of the number of the stating file.",
507+
})
508+
509+
StatFileFailureCount = promauto.NewCounter(prometheus.CounterOpts{
510+
Namespace: types.MetricsNamespace,
511+
Subsystem: types.SchedulerMetricsName,
512+
Name: "stat_file_failure_total",
513+
Help: "Counter of the number of failed of the stating file.",
514+
})
515+
488516
VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
489517
Namespace: types.MetricsNamespace,
490518
Subsystem: types.SchedulerMetricsName,

scheduler/rpcserver/scheduler_server_v2.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,12 +342,42 @@ func (s *schedulerServerV2) StatImage(ctx context.Context, req *schedulerv2.Stat
342342
return resp, nil
343343
}
344344

345-
// TODO(EvanCley): Implement the following methods.
345+
// PreheatFile synchronously triggers an asynchronous preheat task for a file.
346+
//
347+
// This is a blocking call. The RPC will not return until the server has completed the
348+
// initial synchronous work: preparing the file URL.
349+
//
350+
// After this call successfully returns, a scheduler on the server begins the actual
351+
// preheating process, instructing peers to download the file in the background.
352+
//
353+
// A successful response (google.protobuf.Empty) confirms that the preparation is complete
354+
// and the asynchronous download task has been scheduled.
346355
func (s *schedulerServerV2) PreheatFile(ctx context.Context, req *schedulerv2.PreheatFileRequest) (*emptypb.Empty, error) {
356+
// Collect PreheatFileCount metrics.
357+
metrics.PreheatFileCount.Inc()
358+
if err := s.service.PreheatFile(ctx, req); err != nil {
359+
// Collect PreheatFileFailureCount metrics.
360+
metrics.PreheatFileFailureCount.Inc()
361+
return nil, err
362+
}
363+
347364
return new(emptypb.Empty), nil
348365
}
349366

350-
// TODO(EvanCley): Implement the following methods.
367+
// StatFile provides detailed status for files distribution in peers.
368+
//
369+
// This is a blocking call that first queries the file/dir entries and then queries
370+
// all peers to collect the file's download state across the network.
371+
// The response includes the file status on each peer.
351372
func (s *schedulerServerV2) StatFile(ctx context.Context, req *schedulerv2.StatFileRequest) (*schedulerv2.StatFileResponse, error) {
352-
return nil, nil
373+
// Collect StatFileCount metrics.
374+
metrics.StatFileCount.Inc()
375+
resp, err := s.service.StatFile(ctx, req)
376+
if err != nil {
377+
// Collect StatFileFailureCount metrics.
378+
metrics.StatFileFailureCount.Inc()
379+
return nil, err
380+
}
381+
382+
return resp, nil
353383
}

0 commit comments

Comments
 (0)