diff --git a/proxy_agent_shared/Cargo.toml b/proxy_agent_shared/Cargo.toml index 182100b3..3fc52c9b 100644 --- a/proxy_agent_shared/Cargo.toml +++ b/proxy_agent_shared/Cargo.toml @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute regex = "1.11" # match file name thiserror = "1.0.64" -tokio = { version = "1", features = ["rt", "macros", "sync", "time"] } +tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] } tokio-util = "0.7.11" log = { version = "0.4.26", features = ["std"] } ctor = "0.3.6" # used for test setup and clean up diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index ac3d526f..e99170aa 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -59,6 +59,35 @@ impl VmMetaData { } } +/// Configuration for limiting EventReader behavior +#[derive(Default, Clone)] +pub struct EventReaderLimits { + pub max_events_per_round: Option, + pub max_event_file_size_bytes: Option, + pub version: Option, +} + +impl EventReaderLimits { + pub fn new() -> Self { + EventReaderLimits::default() + } + + pub fn with_max_events_per_round(mut self, max: usize) -> Self { + self.max_events_per_round = Some(max); + self + } + + pub fn with_max_event_file_size_bytes(mut self, max: u64) -> Self { + self.max_event_file_size_bytes = Some(max); + self + } + + pub fn with_version(mut self, version: String) -> Self { + self.version = Some(version); + self + } +} + pub struct EventReader { dir_path: PathBuf, delay_start: bool, @@ -66,9 +95,16 @@ pub struct EventReader { common_state: CommonState, execution_mode: String, event_name: String, + limits: EventReaderLimits, } impl EventReader { + /// Create a new EventReader without limits on event file size and max events per round. + /// The event reader will read the event files from the specified directory. + /// If delay_start is true, the event reader will delay start for 60 seconds. + /// The common_state is used to store the vm metadata. + /// The execution_mode is used to indicate the mode of the agent. + /// The event_name is used to indicate the name of the event reader. pub fn new( dir_path: PathBuf, delay_start: bool, @@ -84,6 +120,28 @@ impl EventReader { common_state, execution_mode, event_name, + limits: EventReaderLimits::default(), + } + } + + /// Create a new EventReader with limits configuration. + pub fn new_with_limits( + dir_path: PathBuf, + delay_start: bool, + cancellation_token: CancellationToken, + common_state: CommonState, + execution_mode: String, + event_name: String, + limits: EventReaderLimits, + ) -> EventReader { + EventReader { + dir_path, + delay_start, + cancellation_token, + common_state, + execution_mode, + event_name, + limits, } } @@ -145,24 +203,28 @@ impl EventReader { } } - if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { - let _processed = self - .process_events(&wire_server_client, &vm_meta_data) - .await; - } - + self.process_once(&wire_server_client).await; tokio::time::sleep(interval).await; } } + /// Process the event files from the directory once. + pub async fn process_once(&self, wire_server_client: &WireServerClient) -> usize { + if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { + self.process_events(wire_server_client, &vm_meta_data).await + } else { + 0 + } + } + async fn process_events( &self, wire_server_client: &WireServerClient, vm_meta_data: &VmMetaData, ) -> usize { let event_count: usize; - // get all .json event files in the directory - match misc_helpers::search_files(&self.dir_path, r"^(.*\.json)$") { + // get all [0-9]+.json event filenames with numbers in the directory + match misc_helpers::search_files(&self.dir_path, r"^[0-9]+\.json$") { Ok(files) => { let file_count = files.len(); event_count = self @@ -185,7 +247,7 @@ impl EventReader { event_count } - async fn update_vm_meta_data( + pub async fn update_vm_meta_data( &self, wire_server_client: &WireServerClient, imds_client: &ImdsClient, @@ -240,6 +302,42 @@ impl EventReader { ) -> usize { let mut num_events_logged = 0; for file in files { + if let Some(max_events) = self.limits.max_events_per_round { + if num_events_logged >= max_events { + logger_manager::write_warn(format!( + "EventReader:: Reached the max number of events to be read per round: {}. Stop processing file {} this round.", + max_events, + file.display() + )); + // do not delete this event json file, will try process it at next round + continue; + } + } + + match file.metadata() { + Err(e) => { + logger_manager::write_warn(format!( + "EventReader:: Failed to get metadata for file {}: {}", + file.display(), + e + )); + continue; + } + Ok(metadata) => { + if let Some(max_size) = self.limits.max_event_file_size_bytes { + if metadata.len() > max_size { + logger_manager::write_warn(format!( + "EventReader:: File {} exceeds the size limit of {} bytes, skip it.", + file.display(), + max_size + )); + // clean up the file to avoid blocking further processing + Self::clean_file(file); + continue; + } + } + } + } match misc_helpers::json_read_from_file::>(&file) { Ok(events) => { num_events_logged += events.len(); @@ -248,13 +346,13 @@ impl EventReader { } Err(e) => { logger_manager::write_warn(format!( - "Failed to read events from file {}: {}", + "EventReader:: Failed to read events from file {}: {}", file.display(), e )); } } - Self::clean_files(file); + Self::clean_file(file); } num_events_logged } @@ -277,6 +375,7 @@ impl EventReader { vm_meta_data.clone(), self.execution_mode.clone(), self.event_name.clone(), + self.limits.version.clone(), )); if telemetry_data.get_size() >= Self::MAX_MESSAGE_SIZE { @@ -337,7 +436,7 @@ impl EventReader { } } - fn clean_files(file: PathBuf) { + fn clean_file(file: PathBuf) { match remove_file(&file) { Ok(_) => { logger_manager::write_info(format!("Removed File: {}", file.display())); @@ -351,15 +450,6 @@ impl EventReader { } } } - - #[cfg(test)] - async fn get_vm_meta_data(&self) -> VmMetaData { - if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await { - vm_meta_data - } else { - VmMetaData::empty() - } - } } #[cfg(test)] @@ -383,14 +473,6 @@ mod tests { let port = 7071u16; let cancellation_token = CancellationToken::new(); let common_state = CommonState::start_new(); - let event_reader = EventReader { - dir_path: events_dir.clone(), - delay_start: false, - cancellation_token: cancellation_token.clone(), - common_state: common_state.clone(), - execution_mode: "Test".to_string(), - event_name: "test_event_reader_thread".to_string(), - }; let wire_server_client = WireServerClient::new(ip, port); let imds_client = ImdsClient::new(ip, port); tokio::spawn(server_mock::start( @@ -401,6 +483,16 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; logger_manager::write_info("server_mock started.".to_string()); + let event_reader = EventReader::new( + events_dir.clone(), + false, + cancellation_token.clone(), + common_state.clone(), + "Test".to_string(), + "test_event_reader_thread".to_string(), + ); + + // refresh vm metadata match event_reader .update_vm_meta_data(&wire_server_client, &imds_client) .await @@ -413,7 +505,7 @@ mod tests { } } - // Write 10 events to events dir + // Write events to events dir let message = r#"{\"method\":\"GET\",\"url\":\"/machine/37569ad2-69a3-44fd-b653-813e62a177cf/68938c06%2D5233%2D4ff9%2Da173%2D0ac0a2754f8a.%5FWS2022?comp=config&type=hostingEnvironmentConfig&incarnation=2\",\"ip\":\"168.63.129.16\",\"port\":80,\"userId\":999,\"userName\":\"WS2022$\",\"processName\":\"C:\\\\WindowsAzure\\\\GuestAgent_2.7.41491.1071_2023-03-02_185502\\\\WindowsAzureGuestAgent.exe\",\"runAsElevated\":true,\"responseStatus\":\"200 OK\",\"elapsedTime\":8}"#; let mut events: Vec = Vec::new(); for _ in [0; 10] { @@ -429,28 +521,75 @@ mod tests { let mut file_path = events_dir.to_path_buf(); file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + // test EventReader with limits + let event_reader_limits = EventReaderLimits::new() + .with_max_event_file_size_bytes(1024 * 10) + .with_max_events_per_round(10) + .with_version("test_version".to_string()); + let event_reader_with_limits = EventReader::new_with_limits( + events_dir.clone(), + false, + cancellation_token.clone(), + common_state.clone(), + "Test".to_string(), + "test_event_reader_thread".to_string(), + event_reader_limits.clone(), + ); // Check the events processed - let vm_meta_data = event_reader.get_vm_meta_data().await; - let events_processed = event_reader - .process_events(&wire_server_client, &vm_meta_data) + let events_processed = event_reader_with_limits + .process_once(&wire_server_client) + .await; + logger_manager::write_info(format!("Send {} events from event files", events_processed)); + //Should be 10 events processed and read into events Vector + assert_eq!(events_processed, 10, "Events processed should be 10"); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert_eq!(1, files.len(), "Must still have 1 event file."); + // test EventReader with limits - second round + let events_processed = event_reader_with_limits + .process_once(&wire_server_client) .await; logger_manager::write_info(format!("Send {} events from event files", events_processed)); - //Should be 10 events written and read into events Vector + //Should be 10 events processed and read into events Vector assert_eq!(events_processed, 10, "Events processed should be 10"); let files = misc_helpers::get_files(&events_dir).unwrap(); - assert!(files.is_empty(), "Events files not cleaned up."); + assert!(files.is_empty(), "Must have no event files."); + + // Write 2 event files again for next test + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + tokio::time::sleep(Duration::from_millis(1)).await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert_eq!(2, files.len(), "Must have 2 event files."); + + // test EventReader without limits + let events_processed = event_reader.process_once(&wire_server_client).await; + logger_manager::write_info(format!("Send {} events from event files", events_processed)); + //Should be 20 events processed and read into events Vector + assert_eq!(events_processed, 20, "Events processed should be 20"); + let files = misc_helpers::get_files(&events_dir).unwrap(); + assert!(files.is_empty(), "Must have no event files."); - // Test not processing the non-json files + // Test not processing the non-json files, nor the file name containing non-numeric characters let mut file_path = events_dir.to_path_buf(); file_path.push(format!( "{}.notjson", misc_helpers::get_date_time_unix_nano() )); misc_helpers::json_write_to_file(&events, &file_path).unwrap(); - let events_processed = event_reader - .process_events(&wire_server_client, &vm_meta_data) - .await; + let mut file_path = events_dir.to_path_buf(); + file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano())); + misc_helpers::json_write_to_file(&events, &file_path).unwrap(); + let events_processed = event_reader.process_once(&wire_server_client).await; assert_eq!(0, events_processed, "events_processed must be 0."); let files = misc_helpers::get_files(&events_dir).unwrap(); assert!( diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index 79149d70..dbb06952 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -91,11 +91,17 @@ impl TelemetryEvent { vm_meta_data: VmMetaData, execution_mode: String, event_name: String, + ga_version: Option, ) -> Self { + // if ga_version is provided, append event_log.version to event_name + // if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged + let (ga_version, event_name) = match ga_version { + Some(version) => (version, format!("{}-{}", event_name, event_log.Version)), + None => (event_log.Version.to_string(), event_name), + }; TelemetryEvent { event_pid: event_log.EventPid.parse::().unwrap_or(0), event_tid: event_log.EventTid.parse::().unwrap_or(0), - ga_version: event_log.Version.to_string(), task_name: event_log.TaskName.to_string(), opcode_name: event_log.TimeStamp.to_string(), capability_used: event_log.EventLevel.to_string(), @@ -103,6 +109,7 @@ impl TelemetryEvent { context2: event_log.TimeStamp.to_string(), context3: event_log.OperationId.to_string(), + ga_version, execution_mode, event_name, os_version: current_info::get_long_os_version(),