@@ -9,6 +9,7 @@ use cidr::Ipv4Inet;
9
9
use log:: { debug, info, trace} ;
10
10
use network:: node:: request:: SetupNodeRequest ;
11
11
use tokio:: time;
12
+ use tokio_stream:: wrappers:: ReceiverStream ;
12
13
use tonic:: { transport:: Server , Request , Response , Status } ;
13
14
use uuid:: Uuid ;
14
15
@@ -17,13 +18,81 @@ mod config;
17
18
use config:: { GrpcServerConfig , NodeAgentConfig } ;
18
19
use network:: node:: setup_node;
19
20
use node_manager:: NodeSystem ;
21
+ use workload_manager:: workload_manager:: WorkloadManager ;
20
22
23
+ use proto:: agent:: {
24
+ instance_service_server:: InstanceService , instance_service_server:: InstanceServiceServer ,
25
+ Instance , InstanceStatus , SignalInstruction ,
26
+ } ;
21
27
use proto:: scheduler:: {
22
28
node_service_client:: NodeServiceClient , NodeRegisterRequest , NodeRegisterResponse , NodeStatus ,
23
29
Resource , ResourceSummary , Status as SchedulerStatus ,
24
30
} ;
25
31
26
32
const NUMBER_OF_CONNECTION_ATTEMPTS : u16 = 10 ;
33
+
34
+ ///
35
+ /// This Struct implement the Instance service from Node Agent proto file
36
+ ///
37
+ #[ derive( Debug ) ]
38
+ pub struct InstanceServiceController {
39
+ workload_manager : WorkloadManager ,
40
+ }
41
+
42
+ impl InstanceServiceController {
43
+ pub fn new ( node_id : String ) -> Self {
44
+ Self {
45
+ workload_manager : WorkloadManager :: new ( node_id) ,
46
+ }
47
+ }
48
+ }
49
+
50
+ #[ tonic:: async_trait]
51
+ impl InstanceService for InstanceServiceController {
52
+ type createStream = ReceiverStream < Result < InstanceStatus , Status > > ;
53
+
54
+ async fn create (
55
+ & self ,
56
+ request : Request < Instance > ,
57
+ ) -> Result < Response < Self :: createStream > , Status > {
58
+ let instance = request. into_inner ( ) ;
59
+ // let receiver = self.workload_manager.create(instance).await?;
60
+ let receiver = self . workload_manager . create ( instance) . await ;
61
+
62
+ Ok ( Response :: new ( ReceiverStream :: new ( receiver) ) )
63
+ }
64
+
65
+ async fn signal ( & self , request : Request < SignalInstruction > ) -> Result < Response < ( ) > , Status > {
66
+ let signal_instruction = request. into_inner ( ) ;
67
+
68
+ Ok ( Response :: new (
69
+ self . workload_manager . signal ( signal_instruction) . await ?,
70
+ ) )
71
+ }
72
+ }
73
+
74
+ ///
75
+ /// This function starts the grpc server of the Node Agent.
76
+ /// The server listens and responds to requests from the Scheduler.
77
+ /// The default port is 50053.
78
+ ///
79
+ fn create_grpc_server ( config : GrpcServerConfig , node_id : String ) -> tokio:: task:: JoinHandle < ( ) > {
80
+ let addr = format ! ( "http://{}:{}" , config. host, config. port)
81
+ . parse ( )
82
+ . unwrap ( ) ;
83
+ let instance_service_controller = InstanceServiceController :: new ( node_id) ;
84
+
85
+ info ! ( "Node Agent server listening on {}" , addr) ;
86
+
87
+ tokio:: spawn ( async move {
88
+ Server :: builder ( )
89
+ . add_service ( InstanceServiceServer :: new ( instance_service_controller) )
90
+ . serve ( addr)
91
+ . await
92
+ . unwrap ( )
93
+ } )
94
+ }
95
+
27
96
///
28
97
/// This function allows you to connect to the scheduler's grpc server.
29
98
///
@@ -183,6 +252,7 @@ fn create_grpc_client(config: GrpcServerConfig, node_id: String) -> tokio::task:
183
252
}
184
253
} )
185
254
}
255
+
186
256
#[ tokio:: main]
187
257
async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
188
258
env_logger:: init ( ) ;
@@ -205,8 +275,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
205
275
206
276
// start grpc server and client
207
277
let client_handler = create_grpc_client ( config. client , node_id. clone ( ) ) ;
278
+ let server_handler = create_grpc_server ( config. server , node_id. clone ( ) ) ;
208
279
209
280
client_handler. await ?;
281
+ server_handler. await ?;
210
282
211
283
info ! ( "Shutting down node agent" ) ;
212
284
0 commit comments