Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proxy_agent_shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ serde_json = "1.0.91" # json Deserializer
serde-xml-rs = "0.8.1" # xml Deserializer with xml attribute
regex = "1.11" # match file name
thiserror = "1.0.64"
tokio = { version = "1", features = ["rt", "macros", "sync", "time"] }
tokio = { version = "1", features = ["rt", "macros", "net", "sync", "time"] }
tokio-util = "0.7.11"
log = { version = "0.4.26", features = ["std"] }
ctor = "0.3.6" # used for test setup and clean up
Expand Down
217 changes: 178 additions & 39 deletions proxy_agent_shared/src/telemetry/event_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,52 @@ impl VmMetaData {
}
}

/// Configuration for limiting EventReader behavior
#[derive(Default, Clone)]
pub struct EventReaderLimits {
pub max_events_per_round: Option<usize>,
pub max_event_file_size_bytes: Option<u64>,
pub version: Option<String>,
}

impl EventReaderLimits {
pub fn new() -> Self {
EventReaderLimits::default()
}

pub fn with_max_events_per_round(mut self, max: usize) -> Self {
self.max_events_per_round = Some(max);
self
}

pub fn with_max_event_file_size_bytes(mut self, max: u64) -> Self {
self.max_event_file_size_bytes = Some(max);
self
}

pub fn with_version(mut self, version: String) -> Self {
self.version = Some(version);
self
}
}

pub struct EventReader {
dir_path: PathBuf,
delay_start: bool,
cancellation_token: CancellationToken,
common_state: CommonState,
execution_mode: String,
event_name: String,
limits: EventReaderLimits,
}

impl EventReader {
/// Create a new EventReader without limits on event file size and max events per round.
/// The event reader will read the event files from the specified directory.
/// If delay_start is true, the event reader will delay start for 60 seconds.
/// The common_state is used to store the vm metadata.
/// The execution_mode is used to indicate the mode of the agent.
/// The event_name is used to indicate the name of the event reader.
pub fn new(
dir_path: PathBuf,
delay_start: bool,
Expand All @@ -84,6 +120,28 @@ impl EventReader {
common_state,
execution_mode,
event_name,
limits: EventReaderLimits::default(),
}
}

/// Create a new EventReader with limits configuration.
pub fn new_with_limits(
dir_path: PathBuf,
delay_start: bool,
cancellation_token: CancellationToken,
common_state: CommonState,
execution_mode: String,
event_name: String,
limits: EventReaderLimits,
) -> EventReader {
EventReader {
dir_path,
delay_start,
cancellation_token,
common_state,
execution_mode,
event_name,
limits,
}
}

Expand Down Expand Up @@ -145,24 +203,28 @@ impl EventReader {
}
}

if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await {
let _processed = self
.process_events(&wire_server_client, &vm_meta_data)
.await;
}

self.process_once(&wire_server_client).await;
tokio::time::sleep(interval).await;
}
}

/// Process the event files from the directory once.
pub async fn process_once(&self, wire_server_client: &WireServerClient) -> usize {
if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await {
self.process_events(wire_server_client, &vm_meta_data).await
} else {
0
}
}

async fn process_events(
&self,
wire_server_client: &WireServerClient,
vm_meta_data: &VmMetaData,
) -> usize {
let event_count: usize;
// get all .json event files in the directory
match misc_helpers::search_files(&self.dir_path, r"^(.*\.json)$") {
// get all [0-9]+.json event filenames with numbers in the directory
match misc_helpers::search_files(&self.dir_path, r"^[0-9]+\.json$") {
Ok(files) => {
let file_count = files.len();
event_count = self
Expand All @@ -185,7 +247,7 @@ impl EventReader {
event_count
}

async fn update_vm_meta_data(
pub async fn update_vm_meta_data(
&self,
wire_server_client: &WireServerClient,
imds_client: &ImdsClient,
Expand Down Expand Up @@ -240,6 +302,42 @@ impl EventReader {
) -> usize {
let mut num_events_logged = 0;
for file in files {
if let Some(max_events) = self.limits.max_events_per_round {
if num_events_logged >= max_events {
logger_manager::write_warn(format!(
"EventReader:: Reached the max number of events to be read per round: {}. Stop processing file {} this round.",
max_events,
file.display()
));
// do not delete this event json file, will try process it at next round
continue;
}
}

match file.metadata() {
Err(e) => {
logger_manager::write_warn(format!(
"EventReader:: Failed to get metadata for file {}: {}",
file.display(),
e
));
continue;
}
Ok(metadata) => {
if let Some(max_size) = self.limits.max_event_file_size_bytes {
if metadata.len() > max_size {
logger_manager::write_warn(format!(
"EventReader:: File {} exceeds the size limit of {} bytes, skip it.",
file.display(),
max_size
));
// clean up the file to avoid blocking further processing
Self::clean_file(file);
continue;
}
}
}
}
match misc_helpers::json_read_from_file::<Vec<Event>>(&file) {
Ok(events) => {
num_events_logged += events.len();
Expand All @@ -248,13 +346,13 @@ impl EventReader {
}
Err(e) => {
logger_manager::write_warn(format!(
"Failed to read events from file {}: {}",
"EventReader:: Failed to read events from file {}: {}",
file.display(),
e
));
}
}
Self::clean_files(file);
Self::clean_file(file);
}
num_events_logged
}
Expand All @@ -277,6 +375,7 @@ impl EventReader {
vm_meta_data.clone(),
self.execution_mode.clone(),
self.event_name.clone(),
self.limits.version.clone(),
));

if telemetry_data.get_size() >= Self::MAX_MESSAGE_SIZE {
Expand Down Expand Up @@ -337,7 +436,7 @@ impl EventReader {
}
}

fn clean_files(file: PathBuf) {
fn clean_file(file: PathBuf) {
match remove_file(&file) {
Ok(_) => {
logger_manager::write_info(format!("Removed File: {}", file.display()));
Expand All @@ -351,15 +450,6 @@ impl EventReader {
}
}
}

#[cfg(test)]
async fn get_vm_meta_data(&self) -> VmMetaData {
if let Ok(Some(vm_meta_data)) = self.common_state.get_vm_meta_data().await {
vm_meta_data
} else {
VmMetaData::empty()
}
}
}

#[cfg(test)]
Expand All @@ -383,14 +473,6 @@ mod tests {
let port = 7071u16;
let cancellation_token = CancellationToken::new();
let common_state = CommonState::start_new();
let event_reader = EventReader {
dir_path: events_dir.clone(),
delay_start: false,
cancellation_token: cancellation_token.clone(),
common_state: common_state.clone(),
execution_mode: "Test".to_string(),
event_name: "test_event_reader_thread".to_string(),
};
let wire_server_client = WireServerClient::new(ip, port);
let imds_client = ImdsClient::new(ip, port);
tokio::spawn(server_mock::start(
Expand All @@ -401,6 +483,16 @@ mod tests {
tokio::time::sleep(Duration::from_millis(100)).await;
logger_manager::write_info("server_mock started.".to_string());

let event_reader = EventReader::new(
events_dir.clone(),
false,
cancellation_token.clone(),
common_state.clone(),
"Test".to_string(),
"test_event_reader_thread".to_string(),
);

// refresh vm metadata
match event_reader
.update_vm_meta_data(&wire_server_client, &imds_client)
.await
Expand All @@ -413,7 +505,7 @@ mod tests {
}
}

// Write 10 events to events dir
// Write events to events dir
let message = r#"{\"method\":\"GET\",\"url\":\"/machine/37569ad2-69a3-44fd-b653-813e62a177cf/68938c06%2D5233%2D4ff9%2Da173%2D0ac0a2754f8a.%5FWS2022?comp=config&type=hostingEnvironmentConfig&incarnation=2\",\"ip\":\"168.63.129.16\",\"port\":80,\"userId\":999,\"userName\":\"WS2022$\",\"processName\":\"C:\\\\WindowsAzure\\\\GuestAgent_2.7.41491.1071_2023-03-02_185502\\\\WindowsAzureGuestAgent.exe\",\"runAsElevated\":true,\"responseStatus\":\"200 OK\",\"elapsedTime\":8}"#;
let mut events: Vec<Event> = Vec::new();
for _ in [0; 10] {
Expand All @@ -429,28 +521,75 @@ mod tests {
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();

// test EventReader with limits
let event_reader_limits = EventReaderLimits::new()
.with_max_event_file_size_bytes(1024 * 10)
.with_max_events_per_round(10)
.with_version("test_version".to_string());
let event_reader_with_limits = EventReader::new_with_limits(
events_dir.clone(),
false,
cancellation_token.clone(),
common_state.clone(),
"Test".to_string(),
"test_event_reader_thread".to_string(),
event_reader_limits.clone(),
);
// Check the events processed
let vm_meta_data = event_reader.get_vm_meta_data().await;
let events_processed = event_reader
.process_events(&wire_server_client, &vm_meta_data)
let events_processed = event_reader_with_limits
.process_once(&wire_server_client)
.await;
logger_manager::write_info(format!("Send {} events from event files", events_processed));
//Should be 10 events processed and read into events Vector
assert_eq!(events_processed, 10, "Events processed should be 10");
let files = misc_helpers::get_files(&events_dir).unwrap();
assert_eq!(1, files.len(), "Must still have 1 event file.");
// test EventReader with limits - second round
let events_processed = event_reader_with_limits
.process_once(&wire_server_client)
.await;
logger_manager::write_info(format!("Send {} events from event files", events_processed));
//Should be 10 events written and read into events Vector
//Should be 10 events processed and read into events Vector
assert_eq!(events_processed, 10, "Events processed should be 10");
let files = misc_helpers::get_files(&events_dir).unwrap();
assert!(files.is_empty(), "Events files not cleaned up.");
assert!(files.is_empty(), "Must have no event files.");

// Write 2 event files again for next test
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
let files = misc_helpers::get_files(&events_dir).unwrap();
assert_eq!(2, files.len(), "Must have 2 event files.");

// test EventReader without limits
let events_processed = event_reader.process_once(&wire_server_client).await;
logger_manager::write_info(format!("Send {} events from event files", events_processed));
//Should be 20 events processed and read into events Vector
assert_eq!(events_processed, 20, "Events processed should be 20");
let files = misc_helpers::get_files(&events_dir).unwrap();
assert!(files.is_empty(), "Must have no event files.");

// Test not processing the non-json files
// Test not processing the non-json files, nor the file name containing non-numeric characters
let mut file_path = events_dir.to_path_buf();
file_path.push(format!(
"{}.notjson",
misc_helpers::get_date_time_unix_nano()
));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
let events_processed = event_reader
.process_events(&wire_server_client, &vm_meta_data)
.await;
let mut file_path = events_dir.to_path_buf();
file_path.push(format!("a{}.json", misc_helpers::get_date_time_unix_nano()));
misc_helpers::json_write_to_file(&events, &file_path).unwrap();
let events_processed = event_reader.process_once(&wire_server_client).await;
assert_eq!(0, events_processed, "events_processed must be 0.");
let files = misc_helpers::get_files(&events_dir).unwrap();
assert!(
Expand Down
9 changes: 8 additions & 1 deletion proxy_agent_shared/src/telemetry/telemetry_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,25 @@ impl TelemetryEvent {
vm_meta_data: VmMetaData,
execution_mode: String,
event_name: String,
ga_version: Option<String>,
) -> Self {
// if ga_version is provided, append event_log.version to event_name
// if ga_version is None, use event_log.Version as ga_version and keep event_name unchanged
let (ga_version, event_name) = match ga_version {
Some(version) => (version, format!("{}-{}", event_name, event_log.Version)),
None => (event_log.Version.to_string(), event_name),
};
TelemetryEvent {
event_pid: event_log.EventPid.parse::<u64>().unwrap_or(0),
event_tid: event_log.EventTid.parse::<u64>().unwrap_or(0),
ga_version: event_log.Version.to_string(),
task_name: event_log.TaskName.to_string(),
opcode_name: event_log.TimeStamp.to_string(),
capability_used: event_log.EventLevel.to_string(),
context1: event_log.Message.to_string(),
context2: event_log.TimeStamp.to_string(),
context3: event_log.OperationId.to_string(),

ga_version,
execution_mode,
event_name,
os_version: current_info::get_long_os_version(),
Expand Down
Loading