From a69cdb75a033a8477442f6a4b16874d2fb44e60b Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 5 Jan 2026 18:16:08 +0000 Subject: [PATCH 01/11] During provisioning time, default to disabled mode if the rules are not ready. --- proxy_agent/src/common/config.rs | 15 ------- proxy_agent/src/key_keeper.rs | 2 + proxy_agent/src/redirector.rs | 65 ++++++++++++++--------------- proxy_agent/src/redirector/linux.rs | 4 +- 4 files changed, 36 insertions(+), 50 deletions(-) diff --git a/proxy_agent/src/common/config.rs b/proxy_agent/src/common/config.rs index d9d1fdab..a9dae135 100644 --- a/proxy_agent/src/common/config.rs +++ b/proxy_agent/src/common/config.rs @@ -49,10 +49,6 @@ pub fn get_monitor_duration() -> Duration { pub fn get_poll_key_status_duration() -> Duration { Duration::from_secs(SYSTEM_CONFIG.get_poll_key_status_interval()) } -//TODO: remove this config/function once the contract is defined for HostGAPlugin -pub fn get_host_gaplugin_support() -> u8 { - SYSTEM_CONFIG.hostGAPluginSupport -} pub fn get_max_event_file_count() -> usize { SYSTEM_CONFIG.get_max_event_file_count() @@ -90,7 +86,6 @@ pub struct Config { latchKeyFolder: String, monitorIntervalInSeconds: u64, pollKeyStatusIntervalInSeconds: u64, - hostGAPluginSupport: u8, // 0 not support; 1 proxy only; 2 proxy + authentication check #[serde(skip_serializing_if = "Option::is_none")] maxEventFileCount: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -170,10 +165,6 @@ impl Config { self.pollKeyStatusIntervalInSeconds } - pub fn get_host_gaplugin_support(&self) -> u8 { - self.hostGAPluginSupport - } - pub fn get_max_event_file_count(&self) -> usize { self.maxEventFileCount .unwrap_or(constants::DEFAULT_MAX_EVENT_FILE_COUNT) @@ -269,12 +260,6 @@ mod tests { "get_poll_key_status_interval mismatch" ); - assert_eq!( - 1u8, - config.get_host_gaplugin_support(), - "get_host_gaplugin_support mismatch" - ); - assert_eq!( constants::DEFAULT_MAX_EVENT_FILE_COUNT, config.get_max_event_file_count(), diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index 6047b59b..184631f9 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -619,6 +619,8 @@ impl KeyKeeper { Ok(()) } + /// update the redirector/eBPF policy based on the secure channel status + /// it should be called when the secure channel state is changed async fn update_redirector_policy(&self, status: KeyStatus) -> bool { // update the redirector policy map if !redirector::update_wire_server_redirect_policy( diff --git a/proxy_agent/src/redirector.rs b/proxy_agent/src/redirector.rs index ff949057..eedd0aa0 100644 --- a/proxy_agent/src/redirector.rs +++ b/proxy_agent/src/redirector.rs @@ -193,6 +193,7 @@ impl Redirector { logger::write_information(format!( "Success updated bpf skip_process map with pid={pid}." )); + let wireserver_mode = if let Ok(Some(rules)) = self .access_control_shared_state .get_wireserver_rules() @@ -200,45 +201,43 @@ impl Redirector { { rules.mode } else { - AuthorizationMode::Audit + // default to disabled if the rules are not ready + AuthorizationMode::Disabled }; - if wireserver_mode != AuthorizationMode::Disabled { - bpf_object.update_policy_elem_bpf_map( - "WireServer endpoints", - self.local_port, - constants::WIRE_SERVER_IP_NETWORK_BYTE_ORDER, //0x10813FA8 - 168.63.129.16 - constants::WIRE_SERVER_PORT, - )?; - logger::write_information( - "Success updated bpf map for WireServer support.".to_string(), - ); - } + bpf_object.update_redirect_policy( + constants::WIRE_SERVER_IP_NETWORK_BYTE_ORDER, //0x10813FA8 - 168.63.129.16 + constants::WIRE_SERVER_PORT, + self.local_port, + wireserver_mode != AuthorizationMode::Disabled, + ); let imds_mode = if let Ok(Some(rules)) = self.access_control_shared_state.get_imds_rules().await { rules.mode } else { - AuthorizationMode::Audit + // default to disabled if the rules are not ready + AuthorizationMode::Disabled }; - if imds_mode != AuthorizationMode::Disabled { - bpf_object.update_policy_elem_bpf_map( - "IMDS endpoints", - self.local_port, - constants::IMDS_IP_NETWORK_BYTE_ORDER, //0xFEA9FEA9, // 169.254.169.254 - constants::IMDS_PORT, - )?; - logger::write_information("Success updated bpf map for IMDS support.".to_string()); - } - if config::get_host_gaplugin_support() > 0 { - bpf_object.update_policy_elem_bpf_map( - "Host GAPlugin endpoints", - self.local_port, - constants::GA_PLUGIN_IP_NETWORK_BYTE_ORDER, //0x10813FA8, // 168.63.129.16 - constants::GA_PLUGIN_PORT, - )?; - logger::write_information( - "Success updated bpf map for Host GAPlugin support.".to_string(), - ); - } + bpf_object.update_redirect_policy( + constants::IMDS_IP_NETWORK_BYTE_ORDER, //0xFEA9FEA9, // 169.254.169.254 + constants::IMDS_PORT, + self.local_port, + imds_mode != AuthorizationMode::Disabled, + ); + + let ga_plugin_mode = + if let Ok(Some(rules)) = self.access_control_shared_state.get_hostga_rules().await { + rules.mode + } else { + // default to disabled if the rules are not ready + AuthorizationMode::Disabled + }; + + bpf_object.update_redirect_policy( + constants::GA_PLUGIN_IP_NETWORK_BYTE_ORDER, //0x10813FA8, // 168.63.129.16 + constants::GA_PLUGIN_PORT, + self.local_port, + ga_plugin_mode != AuthorizationMode::Disabled, + ); // programs self.attach_bpf_prog(&mut bpf_object)?; diff --git a/proxy_agent/src/redirector/linux.rs b/proxy_agent/src/redirector/linux.rs index 8cbb1d31..cf5e1776 100644 --- a/proxy_agent/src/redirector/linux.rs +++ b/proxy_agent/src/redirector/linux.rs @@ -301,7 +301,7 @@ impl BpfObject { ); } Err(err) => { - logger::write(format!("Failed to remove destination: {}:{} from policy_map with error: {}", ip_to_string(dest_ipv4), dest_port, err)); + logger::write(format!("Failed to remove destination: {}:{} from policy_map with error: {}. The policy_map may not contain this entry, skip and continue.", ip_to_string(dest_ipv4), dest_port, err)); } }; } else { @@ -334,7 +334,7 @@ impl BpfObject { return true; } Err(err) => { - logger::write(format!("Failed to insert destination: {}:{} to policy_map with error: {}", ip_to_string(dest_ipv4), dest_port, err)); + logger::write_error(format!("Failed to insert destination: {}:{} to policy_map with error: {}", ip_to_string(dest_ipv4), dest_port, err)); } } } From 521871281160c19bfa0dcd36a9fda3e5d5c5d578 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 5 Jan 2026 21:06:11 +0000 Subject: [PATCH 02/11] update_redirector_policy when provision finished --- proxy_agent/src/key_keeper.rs | 16 ++++++++ proxy_agent/src/provision.rs | 59 ++++++++++++++++++++++++++- proxy_agent/src/proxy/proxy_server.rs | 2 + proxy_agent/src/redirector.rs | 50 ++--------------------- proxy_agent/src/shared_state.rs | 4 ++ 5 files changed, 84 insertions(+), 47 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index 184631f9..8ed359f1 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -253,6 +253,8 @@ impl KeyKeeper { provision::key_latched( EventThreadsSharedState{ cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self.access_control_shared_state.clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), @@ -290,6 +292,8 @@ impl KeyKeeper { provision::start_event_threads(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self.access_control_shared_state.clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), @@ -443,6 +447,10 @@ impl KeyKeeper { provision::key_latched(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self + .access_control_shared_state + .clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), @@ -526,6 +534,10 @@ impl KeyKeeper { provision::key_latched(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self + .access_control_shared_state + .clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self @@ -571,6 +583,10 @@ impl KeyKeeper { provision::key_latched(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self + .access_control_shared_state + .clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), diff --git a/proxy_agent/src/provision.rs b/proxy_agent/src/provision.rs index 2400ac86..6c20e961 100644 --- a/proxy_agent/src/provision.rs +++ b/proxy_agent/src/provision.rs @@ -8,11 +8,14 @@ use crate::common::{config, logger}; use crate::key_keeper::{DISABLE_STATE, UNKNOWN_STATE}; -use crate::proxy_agent_status; +use crate::proxy::authorization_rules::AuthorizationMode; +use crate::shared_state::access_control_wrapper::AccessControlSharedState; use crate::shared_state::agent_status_wrapper::{AgentStatusModule, AgentStatusSharedState}; use crate::shared_state::key_keeper_wrapper::KeyKeeperSharedState; use crate::shared_state::provision_wrapper::ProvisionSharedState; +use crate::shared_state::redirector_wrapper::RedirectorSharedState; use crate::shared_state::EventThreadsSharedState; +use crate::{proxy_agent_status, redirector}; use proxy_agent_shared::logger::LoggerLevel; use proxy_agent_shared::telemetry::event_logger; use proxy_agent_shared::telemetry::event_reader::EventReader; @@ -122,6 +125,15 @@ async fn update_provision_state( .await { if provision_state.contains(ProvisionFlags::ALL_READY) { + // update redirector/eBPF policy based on access control status + update_redirector_policy( + event_threads_shared_state.redirector_shared_state.clone(), + event_threads_shared_state + .access_control_shared_state + .clone(), + ) + .await; + if let Err(e) = event_threads_shared_state .provision_shared_state .set_provision_finished(true) @@ -147,6 +159,51 @@ async fn update_provision_state( } } +/// update the redirector/eBPF policy based on access control status +/// it should be called when provlision finished +async fn update_redirector_policy( + redirector_shared_state: RedirectorSharedState, + access_control_shared_state: AccessControlSharedState, +) { + let wireserver_mode = + if let Ok(Some(rules)) = access_control_shared_state.get_wireserver_rules().await { + rules.mode + } else { + // default to disabled if the rules are not ready + AuthorizationMode::Disabled + }; + redirector::update_wire_server_redirect_policy( + wireserver_mode != AuthorizationMode::Disabled, + redirector_shared_state.clone(), + ) + .await; + + let imds_mode = if let Ok(Some(rules)) = access_control_shared_state.get_imds_rules().await { + rules.mode + } else { + // default to disabled if the rules are not ready + AuthorizationMode::Disabled + }; + redirector::update_imds_redirect_policy( + imds_mode != AuthorizationMode::Disabled, + redirector_shared_state.clone(), + ) + .await; + + let ga_plugin_mode = + if let Ok(Some(rules)) = access_control_shared_state.get_hostga_rules().await { + rules.mode + } else { + // default to disabled if the rules are not ready + AuthorizationMode::Disabled + }; + redirector::update_hostga_redirect_policy( + ga_plugin_mode != AuthorizationMode::Disabled, + redirector_shared_state.clone(), + ) + .await; +} + pub async fn key_latch_ready_state_reset(provision_shared_state: ProvisionSharedState) { reset_provision_state(ProvisionFlags::KEY_LATCH_READY, provision_shared_state).await; } diff --git a/proxy_agent/src/proxy/proxy_server.rs b/proxy_agent/src/proxy/proxy_server.rs index cc2ae302..cdea7b37 100644 --- a/proxy_agent/src/proxy/proxy_server.rs +++ b/proxy_agent/src/proxy/proxy_server.rs @@ -192,6 +192,8 @@ impl ProxyServer { provision::listener_started(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self.access_control_shared_state.clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), diff --git a/proxy_agent/src/redirector.rs b/proxy_agent/src/redirector.rs index eedd0aa0..2dc87a46 100644 --- a/proxy_agent/src/redirector.rs +++ b/proxy_agent/src/redirector.rs @@ -45,14 +45,12 @@ mod windows; #[cfg(not(windows))] mod linux; -use crate::common::constants; use crate::common::error::BpfErrorType; use crate::common::error::Error; use crate::common::helpers; use crate::common::result::Result; use crate::common::{config, logger}; use crate::provision; -use crate::proxy::authorization_rules::AuthorizationMode; use crate::shared_state::access_control_wrapper::AccessControlSharedState; use crate::shared_state::agent_status_wrapper::{AgentStatusModule, AgentStatusSharedState}; use crate::shared_state::connection_summary_wrapper::ConnectionSummarySharedState; @@ -194,50 +192,8 @@ impl Redirector { "Success updated bpf skip_process map with pid={pid}." )); - let wireserver_mode = if let Ok(Some(rules)) = self - .access_control_shared_state - .get_wireserver_rules() - .await - { - rules.mode - } else { - // default to disabled if the rules are not ready - AuthorizationMode::Disabled - }; - bpf_object.update_redirect_policy( - constants::WIRE_SERVER_IP_NETWORK_BYTE_ORDER, //0x10813FA8 - 168.63.129.16 - constants::WIRE_SERVER_PORT, - self.local_port, - wireserver_mode != AuthorizationMode::Disabled, - ); - let imds_mode = - if let Ok(Some(rules)) = self.access_control_shared_state.get_imds_rules().await { - rules.mode - } else { - // default to disabled if the rules are not ready - AuthorizationMode::Disabled - }; - bpf_object.update_redirect_policy( - constants::IMDS_IP_NETWORK_BYTE_ORDER, //0xFEA9FEA9, // 169.254.169.254 - constants::IMDS_PORT, - self.local_port, - imds_mode != AuthorizationMode::Disabled, - ); - - let ga_plugin_mode = - if let Ok(Some(rules)) = self.access_control_shared_state.get_hostga_rules().await { - rules.mode - } else { - // default to disabled if the rules are not ready - AuthorizationMode::Disabled - }; - - bpf_object.update_redirect_policy( - constants::GA_PLUGIN_IP_NETWORK_BYTE_ORDER, //0x10813FA8, // 168.63.129.16 - constants::GA_PLUGIN_PORT, - self.local_port, - ga_plugin_mode != AuthorizationMode::Disabled, - ); + // Do not update redirect policy map here, it will be updated by provision module + // When provision is finished, it will call update_xxx_redirect_policy functions to update the redirect policy maps. // programs self.attach_bpf_prog(&mut bpf_object)?; @@ -284,6 +240,8 @@ impl Redirector { provision::redirector_ready(EventThreadsSharedState { cancellation_token: self.cancellation_token.clone(), common_state: self.common_state.clone(), + access_control_shared_state: self.access_control_shared_state.clone(), + redirector_shared_state: self.redirector_shared_state.clone(), key_keeper_shared_state: self.key_keeper_shared_state.clone(), provision_shared_state: self.provision_shared_state.clone(), agent_status_shared_state: self.agent_status_shared_state.clone(), diff --git a/proxy_agent/src/shared_state.rs b/proxy_agent/src/shared_state.rs index e297841a..58553849 100644 --- a/proxy_agent/src/shared_state.rs +++ b/proxy_agent/src/shared_state.rs @@ -124,6 +124,8 @@ impl SharedState { pub struct EventThreadsSharedState { pub cancellation_token: CancellationToken, pub common_state: CommonState, + pub access_control_shared_state: access_control_wrapper::AccessControlSharedState, + pub redirector_shared_state: redirector_wrapper::RedirectorSharedState, pub key_keeper_shared_state: key_keeper_wrapper::KeyKeeperSharedState, pub provision_shared_state: provision_wrapper::ProvisionSharedState, pub agent_status_shared_state: agent_status_wrapper::AgentStatusSharedState, @@ -135,6 +137,8 @@ impl EventThreadsSharedState { EventThreadsSharedState { cancellation_token: shared_state.get_cancellation_token(), common_state: shared_state.get_common_state(), + access_control_shared_state: shared_state.get_access_control_shared_state(), + redirector_shared_state: shared_state.get_redirector_shared_state(), key_keeper_shared_state: shared_state.get_key_keeper_shared_state(), provision_shared_state: shared_state.get_provision_shared_state(), agent_status_shared_state: shared_state.get_agent_status_shared_state(), From 9b5765e37d418e269e77cc41bb4aeafcc005f39b Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Mon, 5 Jan 2026 14:37:07 -0800 Subject: [PATCH 03/11] Break down KeyKeepr loop_poll function into smaller, more manageable sub-functions to improve readability and maintainability --- proxy_agent/src/key_keeper.rs | 723 ++++++++++++++++++---------------- 1 file changed, 377 insertions(+), 346 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index 8ed359f1..b44b2f5d 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -204,8 +204,6 @@ impl KeyKeeper { let mut redirect_policy_updated = false; loop { if !first_iteration { - // skip the sleep for the first loop - let current_state = match self .key_keeper_shared_state .get_current_secure_channel_state() @@ -220,399 +218,432 @@ impl KeyKeeper { } }; - let sleep = if current_state == UNKNOWN_STATE - && helpers::get_elapsed_time_in_millisec() - < FREQUENT_PULL_TIMEOUT_IN_MILLISECONDS - { - // frequent poll the secure channel status every second for the first 5 minutes - // until the secure channel state is known - FREQUENT_PULL_INTERVAL - } else { - self.interval - }; + let sleep = self.calculate_sleep_duration(¤t_state); + let (continue_loop, reset_timer) = self + .handle_notification(¬ify, sleep, ¤t_state) + .await; - let time = Instant::now(); - tokio::select! { - // notify to query the secure channel status immediately when the secure channel state is unknown or disabled - // this is to handle quicker response to the secure channel state change during VM provisioning. - _ = notify.notified() => { - if current_state == DISABLE_STATE || current_state == UNKNOWN_STATE { - logger::write_warning(format!("poll_secure_channel_status task notified and secure channel state is '{current_state}', reset states and start poll status now.")); - provision::key_latch_ready_state_reset(self.provision_shared_state.clone()).await; - if let Err(e) = self.key_keeper_shared_state.update_current_secure_channel_state(UNKNOWN_STATE.to_string()).await{ - logger::write_warning(format!("Failed to update secure channel state to 'Unknown': {e}")); - } - - if start.elapsed().as_millis() > PROVISION_TIMEUP_IN_MILLISECONDS { - // already timeup, reset the start timer - start = Instant::now(); - provision_timeup = false; - } - } else { - // report key latched ready to try update the provision finished time_tick - provision::key_latched( EventThreadsSharedState{ - cancellation_token: self.cancellation_token.clone(), - common_state: self.common_state.clone(), - access_control_shared_state: self.access_control_shared_state.clone(), - redirector_shared_state: self.redirector_shared_state.clone(), - key_keeper_shared_state: self.key_keeper_shared_state.clone(), - provision_shared_state: self.provision_shared_state.clone(), - agent_status_shared_state: self.agent_status_shared_state.clone(), - connection_summary_shared_state: self.connection_summary_shared_state.clone(), - },).await; - let slept_time_in_millisec = time.elapsed().as_millis(); - let continue_sleep = sleep.as_millis() - slept_time_in_millisec; - if continue_sleep > 0 { - let continue_sleep = Duration::from_millis(continue_sleep as u64); - let message = format!("poll_secure_channel_status task notified but secure channel state is '{current_state}', continue with sleep wait for {continue_sleep:?}."); - logger::write_warning(message); - tokio::time::sleep(continue_sleep).await; - } - } - }, - _ = tokio::time::sleep(sleep) => {} + if reset_timer { + start = Instant::now(); + provision_timeup = false; } - } - first_iteration = false; - if !provision_timeup && start.elapsed().as_millis() > PROVISION_TIMEUP_IN_MILLISECONDS { - provision::provision_timeup( - None, - self.provision_shared_state.clone(), - self.agent_status_shared_state.clone(), - ) - .await; - provision_timeup = true; + if !continue_loop { + continue; + } } + first_iteration = false; - if !started_event_threads - && helpers::get_elapsed_time_in_millisec() - > DELAY_START_EVENT_THREADS_IN_MILLISECONDS - { - provision::start_event_threads(EventThreadsSharedState { - cancellation_token: self.cancellation_token.clone(), - common_state: self.common_state.clone(), - access_control_shared_state: self.access_control_shared_state.clone(), - redirector_shared_state: self.redirector_shared_state.clone(), - key_keeper_shared_state: self.key_keeper_shared_state.clone(), - provision_shared_state: self.provision_shared_state.clone(), - agent_status_shared_state: self.agent_status_shared_state.clone(), - connection_summary_shared_state: self.connection_summary_shared_state.clone(), - }) + provision_timeup = self + .handle_provision_timeup(&mut start, provision_timeup) .await; - started_event_threads = true; - } + started_event_threads = self.handle_event_threads_start(started_event_threads).await; let status = match key::get_status(&self.base_url).await { Ok(s) => s, Err(e) => { self.update_status_message(format!("Failed to get key status - {e}"), true) .await; + // failed to get status, skip to next iteration continue; } }; self.update_status_message(format!("Got key status successfully: {status}."), true) .await; - let mut access_control_rules_changed = false; - let wireserver_rule_id = status.get_wireserver_rule_id(); - let imds_rule_id: String = status.get_imds_rule_id(); - let hostga_rule_id: String = status.get_hostga_rule_id(); + self.update_access_control_rules(&status).await; - match self + let state = status.get_secure_channel_state(); + let secure_channel_state_updated = self .key_keeper_shared_state - .update_wireserver_rule_id(wireserver_rule_id.to_string()) + .update_current_secure_channel_state(state.to_string()) + .await; + + if !self.handle_key_acquisition(&status, &state).await { + // Handle key acquisition failed, skip to next iteration + continue; + } + + if self + .handle_secure_channel_state_change( + secure_channel_state_updated, + &state, + &status, + &mut redirect_policy_updated, + ) .await { - Ok((updated, old_wire_server_rule_id)) => { - if updated { - logger::write_warning(format!( - "Wireserver rule id changed from '{old_wire_server_rule_id}' to '{wireserver_rule_id}'." - )); - if let Err(e) = self - .access_control_shared_state - .set_wireserver_rules(status.get_wireserver_rules()) - .await - { - logger::write_error(format!("Failed to set wireserver rules: {e}")); - } - access_control_rules_changed = true; + // successfully handled secure channel state change, skip to next iteration + continue; + } + + // check and update the redirect policy if not updated successfully before, try again here + // this could happen when the eBPF/redirector module was not started yet before + if !redirect_policy_updated { + logger::write_warning( + "redirect policy was not update successfully before, retrying now".to_string(), + ); + redirect_policy_updated = self.update_redirector_policy(&status).await; + } + } + } + + /// Calculate sleep duration based on current secure channel state + fn calculate_sleep_duration(&self, current_state: &str) -> Duration { + if current_state == UNKNOWN_STATE + && helpers::get_elapsed_time_in_millisec() < FREQUENT_PULL_TIMEOUT_IN_MILLISECONDS + { + // frequent poll the secure channel status every second for the first 5 minutes + // until the secure channel state is known + FREQUENT_PULL_INTERVAL + } else { + self.interval + } + } + + /// Handle notification and sleep logic + /// Returns (continue_loop, reset_timer) + async fn handle_notification( + &self, + notify: &tokio::sync::Notify, + sleep: Duration, + current_state: &str, + ) -> (bool, bool) { + let time = Instant::now(); + tokio::select! { + // notify to query the secure channel status immediately when the secure channel state is unknown or disabled + // this is to handle quicker response to the secure channel state change during VM provisioning. + _ = notify.notified() => { + if current_state == DISABLE_STATE || current_state == UNKNOWN_STATE { + logger::write_warning(format!("poll_secure_channel_status task notified and secure channel state is '{current_state}', reset states and start poll status now.")); + provision::key_latch_ready_state_reset(self.provision_shared_state.clone()).await; + if let Err(e) = self.key_keeper_shared_state.update_current_secure_channel_state(UNKNOWN_STATE.to_string()).await { + logger::write_warning(format!("Failed to update secure channel state to 'Unknown': {e}")); } + (true, true) + } else { + // report key latched ready to try update the provision finished time_tick + provision::key_latched(self.create_event_threads_shared_state()).await; + let slept_time_in_millisec = time.elapsed().as_millis(); + let continue_sleep = sleep.as_millis() - slept_time_in_millisec; + if continue_sleep > 0 { + let continue_sleep = Duration::from_millis(continue_sleep as u64); + let message = format!("poll_secure_channel_status task notified but secure channel state is '{current_state}', continue with sleep wait for {continue_sleep:?}."); + logger::write_warning(message); + tokio::time::sleep(continue_sleep).await; + } + (true, false) } - Err(e) => { - logger::write_warning(format!("Failed to update wireserver rule id: {e}")); - } + }, + _ = tokio::time::sleep(sleep) => { + (true, false) } + } + } - match self - .key_keeper_shared_state - .update_imds_rule_id(imds_rule_id.to_string()) - .await - { - Ok((updated, old_imds_rule_id)) => { - if updated { - logger::write_warning(format!( - "IMDS rule id changed from '{old_imds_rule_id}' to '{imds_rule_id}'." - )); - if let Err(e) = self - .access_control_shared_state - .set_imds_rules(status.get_imds_rules()) - .await - { - logger::write_error(format!("Failed to set imds rules: {e}")); - } - access_control_rules_changed = true; + /// Handle provision timeout logic + async fn handle_provision_timeup(&self, start: &mut Instant, provision_timeup: bool) -> bool { + if !provision_timeup && start.elapsed().as_millis() > PROVISION_TIMEUP_IN_MILLISECONDS { + provision::provision_timeup( + None, + self.provision_shared_state.clone(), + self.agent_status_shared_state.clone(), + ) + .await; + return true; + } + provision_timeup + } + + /// Handle starting event threads + async fn handle_event_threads_start(&self, started_event_threads: bool) -> bool { + if !started_event_threads + && helpers::get_elapsed_time_in_millisec() > DELAY_START_EVENT_THREADS_IN_MILLISECONDS + { + provision::start_event_threads(self.create_event_threads_shared_state()).await; + return true; + } + started_event_threads + } + + /// Update access control rules from the key status + /// Returns true if any rules changed + async fn update_access_control_rules(&self, status: &KeyStatus) -> bool { + let mut access_control_rules_changed = false; + let wireserver_rule_id = status.get_wireserver_rule_id(); + let imds_rule_id = status.get_imds_rule_id(); + let hostga_rule_id = status.get_hostga_rule_id(); + + // Update wireserver rules + match self + .key_keeper_shared_state + .update_wireserver_rule_id(wireserver_rule_id.to_string()) + .await + { + Ok((updated, old_wire_server_rule_id)) => { + if updated { + logger::write_warning(format!( + "Wireserver rule id changed from '{old_wire_server_rule_id}' to '{wireserver_rule_id}'." + )); + if let Err(e) = self + .access_control_shared_state + .set_wireserver_rules(status.get_wireserver_rules()) + .await + { + logger::write_error(format!("Failed to set wireserver rules: {e}")); } + access_control_rules_changed = true; } - Err(e) => { - logger::write_warning(format!("Failed to update imds rule id: {e}")); - } } + Err(e) => { + logger::write_warning(format!("Failed to update wireserver rule id: {e}")); + } + } - match self - .key_keeper_shared_state - .update_hostga_rule_id(hostga_rule_id.to_string()) - .await - { - Ok((updated, old_hostga_rule_id)) => { - if updated { - logger::write_warning(format!( - "HostGA rule id changed from '{old_hostga_rule_id}' to '{hostga_rule_id}'." - )); - if let Err(e) = self - .access_control_shared_state - .set_hostga_rules(status.get_hostga_rules()) - .await - { - logger::write_error(format!("Failed to set HostGA rules: {e}")); - } - access_control_rules_changed = true; + // Update IMDS rules + match self + .key_keeper_shared_state + .update_imds_rule_id(imds_rule_id.to_string()) + .await + { + Ok((updated, old_imds_rule_id)) => { + if updated { + logger::write_warning(format!( + "IMDS rule id changed from '{old_imds_rule_id}' to '{imds_rule_id}'." + )); + if let Err(e) = self + .access_control_shared_state + .set_imds_rules(status.get_imds_rules()) + .await + { + logger::write_error(format!("Failed to set imds rules: {e}")); } + access_control_rules_changed = true; } - Err(e) => { - logger::write_warning(format!("Failed to update HostGA rule id: {e}")); - } } + Err(e) => { + logger::write_warning(format!("Failed to update imds rule id: {e}")); + } + } - if access_control_rules_changed { - if let (Ok(wireserver_rules), Ok(imds_rules), Ok(hostga_rules)) = ( - self.access_control_shared_state - .get_wireserver_rules() - .await, - self.access_control_shared_state.get_imds_rules().await, - self.access_control_shared_state.get_hostga_rules().await, - ) { - let rules = AuthorizationRulesForLogging::new( - status.authorizationRules.clone(), - ComputedAuthorizationRules { - wireserver: wireserver_rules, - imds: imds_rules, - hostga: hostga_rules, - }, - ); - rules.write_all(&self.status_dir, constants::MAX_LOG_FILE_COUNT); + // Update HostGA rules + match self + .key_keeper_shared_state + .update_hostga_rule_id(hostga_rule_id.to_string()) + .await + { + Ok((updated, old_hostga_rule_id)) => { + if updated { + logger::write_warning(format!( + "HostGA rule id changed from '{old_hostga_rule_id}' to '{hostga_rule_id}'." + )); + if let Err(e) = self + .access_control_shared_state + .set_hostga_rules(status.get_hostga_rules()) + .await + { + logger::write_error(format!("Failed to set HostGA rules: {e}")); + } + access_control_rules_changed = true; } } + Err(e) => { + logger::write_warning(format!("Failed to update HostGA rule id: {e}")); + } + } - let state = status.get_secure_channel_state(); - let secure_channel_state_updated = self - .key_keeper_shared_state - .update_current_secure_channel_state(state.to_string()) - .await; - - // check if need fetch the key - if state != DISABLE_STATE - && (status.keyGuid.is_none() // key has not latched yet - || status.keyGuid != self.key_keeper_shared_state.get_current_key_guid().await.unwrap_or(None)) - // key changed - { - let mut key_found = false; - if let Some(guid) = &status.keyGuid { - // key latched before and search the key locally first - match Self::fetch_key(&self.key_dir, guid) { - Ok(key) => { - if let Err(e) = self.update_key_to_shared_state(key.clone()).await { - logger::write_warning(format!("Failed to update key: {e}")); - } - - let message = helpers::write_startup_event( - "Found key details from local and ready to use.", - "poll_secure_channel_status", - "key_keeper", - logger::AGENT_LOGGER_KEY, - ); - self.update_status_message(message, false).await; - key_found = true; - - provision::key_latched(EventThreadsSharedState { - cancellation_token: self.cancellation_token.clone(), - common_state: self.common_state.clone(), - access_control_shared_state: self - .access_control_shared_state - .clone(), - redirector_shared_state: self.redirector_shared_state.clone(), - key_keeper_shared_state: self.key_keeper_shared_state.clone(), - provision_shared_state: self.provision_shared_state.clone(), - agent_status_shared_state: self.agent_status_shared_state.clone(), - connection_summary_shared_state: self - .connection_summary_shared_state - .clone(), - }) - .await; - } - Err(e) => { - event_logger::write_event( - LoggerLevel::Info, - format!("Failed to fetch local key details with error: {e:?}. Will try acquire the key details from Server."), - "poll_secure_channel_status", - "key_keeper", - logger::AGENT_LOGGER_KEY, - ); - } - }; - } + // Write authorization rules to file if changed + if access_control_rules_changed { + if let (Ok(wireserver_rules), Ok(imds_rules), Ok(hostga_rules)) = ( + self.access_control_shared_state + .get_wireserver_rules() + .await, + self.access_control_shared_state.get_imds_rules().await, + self.access_control_shared_state.get_hostga_rules().await, + ) { + let rules = AuthorizationRulesForLogging::new( + status.authorizationRules.clone(), + ComputedAuthorizationRules { + wireserver: wireserver_rules, + imds: imds_rules, + hostga: hostga_rules, + }, + ); + rules.write_all(&self.status_dir, constants::MAX_LOG_FILE_COUNT); + } + } - // if key has not latched before, - // or not found - // or could not read locally, - // try fetch from server - if !key_found { - let key = match key::acquire_key(&self.base_url).await { - Ok(k) => k, - Err(e) => { - self.update_status_message( - format!("Failed to acquire key details: {e:?}"), - true, - ) - .await; - continue; - } - }; - - // persist the new key to local disk - let guid = key.guid.to_string(); - match Self::store_key(&self.key_dir, &key) { - Ok(()) => { - logger::write_information(format!( - "Successfully acquired the key '{guid}' details from server and saved locally.")); - } - Err(e) => { - self.update_status_message( - format!("Failed to save key details to file: {e:?}"), - true, - ) - .await; - continue; - } - } + access_control_rules_changed + } - // double check the key details saved correctly to local disk - if let Err(e) = Self::check_key(&self.key_dir, &key) { - self.update_status_message( - format!( - "Failed to check the key '{guid}' details saved locally: {e:?}." - ), - true, - ) - .await; - continue; - } else { - match key::attest_key(&self.base_url, &key).await { - Ok(()) => { - // update in memory - if let Err(e) = self.update_key_to_shared_state(key.clone()).await { - logger::write_warning(format!("Failed to update key: {e}")); - } - - let message = helpers::write_startup_event( - "Successfully attest the key and ready to use.", - "poll_secure_channel_status", - "key_keeper", - logger::AGENT_LOGGER_KEY, - ); - self.update_status_message(message, false).await; - provision::key_latched(EventThreadsSharedState { - cancellation_token: self.cancellation_token.clone(), - common_state: self.common_state.clone(), - access_control_shared_state: self - .access_control_shared_state - .clone(), - redirector_shared_state: self.redirector_shared_state.clone(), - key_keeper_shared_state: self.key_keeper_shared_state.clone(), - provision_shared_state: self.provision_shared_state.clone(), - agent_status_shared_state: self - .agent_status_shared_state - .clone(), - connection_summary_shared_state: self - .connection_summary_shared_state - .clone(), - }) - .await; - } - Err(e) => { - logger::write_warning(format!("Failed to attest the key: {e:?}")); - continue; - } - } - } - } + /// Handle key acquisition from local or server + /// Returns true if successful, false if should continue to next iteration + async fn handle_key_acquisition(&self, status: &KeyStatus, state: &str) -> bool { + // check if need fetch the key + if state != DISABLE_STATE + && (status.keyGuid.is_none() // key has not latched yet + || status.keyGuid != self.key_keeper_shared_state.get_current_key_guid().await.unwrap_or(None)) + // key changed + { + if self.try_fetch_local_key(&status.keyGuid).await { + return true; } - // update redirect policy if current secure channel state updated - match secure_channel_state_updated { - Ok(updated) => { - if updated { - // secure channel state changed, update the redirect policy - redirect_policy_updated = self.update_redirector_policy(status).await; - - // customer has not enforce the secure channel state - if state == DISABLE_STATE { - let message = helpers::write_startup_event( - "Customer has not enforce the secure channel state.", - "poll_secure_channel_status", - "key_keeper", - logger::AGENT_LOGGER_KEY, - ); - // Update the status message and let the provision to continue - self.update_status_message(message, false).await; - - // clear key in memory for disabled state - if let Err(e) = self.key_keeper_shared_state.clear_key().await { - logger::write_warning(format!("Failed to clear key: {e}")); - } - provision::key_latched(EventThreadsSharedState { - cancellation_token: self.cancellation_token.clone(), - common_state: self.common_state.clone(), - access_control_shared_state: self - .access_control_shared_state - .clone(), - redirector_shared_state: self.redirector_shared_state.clone(), - key_keeper_shared_state: self.key_keeper_shared_state.clone(), - provision_shared_state: self.provision_shared_state.clone(), - agent_status_shared_state: self.agent_status_shared_state.clone(), - connection_summary_shared_state: self - .connection_summary_shared_state - .clone(), - }) - .await; - } + // if key has not latched before, or not found, or could not read locally, + // try fetch from server + return self.acquire_key_from_server().await; + } + true + } - continue; + /// Try to fetch key from local storage + /// Returns true if key was found and loaded successfully + async fn try_fetch_local_key(&self, key_guid: &Option) -> bool { + if let Some(guid) = key_guid { + // key latched before and search the key locally first + match Self::fetch_key(&self.key_dir, guid) { + Ok(key) => { + if let Err(e) = self.update_key_to_shared_state(key.clone()).await { + logger::write_warning(format!("Failed to update key: {e}")); } + + let message = helpers::write_startup_event( + "Found key details from local and ready to use.", + "poll_secure_channel_status", + "key_keeper", + logger::AGENT_LOGGER_KEY, + ); + self.update_status_message(message, false).await; + + provision::key_latched(self.create_event_threads_shared_state()).await; + return true; } Err(e) => { - logger::write_warning(format!("Failed to update secure channel state: {e}")); + event_logger::write_event( + LoggerLevel::Info, + format!("Failed to fetch local key details with error: {e:?}. Will try acquire the key details from Server."), + "poll_secure_channel_status", + "key_keeper", + logger::AGENT_LOGGER_KEY, + ); } + }; + } + false + } + + /// Acquire key from server, persist it, and attest it + /// Returns true if successful, false if should continue to next iteration + async fn acquire_key_from_server(&self) -> bool { + let key = match key::acquire_key(&self.base_url).await { + Ok(k) => k, + Err(e) => { + self.update_status_message(format!("Failed to acquire key details: {e:?}"), true) + .await; + return false; } + }; - // check and update the redirect policy if not updated successfully before, try again here - // this could happen when the eBPF/redirector module was not started yet before - if !redirect_policy_updated { - logger::write_warning( - "redirect policy was not update successfully before, retrying now".to_string(), + // persist the new key to local disk + let guid = key.guid.to_string(); + if let Err(e) = Self::store_key(&self.key_dir, &key) { + self.update_status_message(format!("Failed to save key details to file: {e:?}"), true) + .await; + return false; + } + logger::write_information(format!( + "Successfully acquired the key '{guid}' details from server and saved locally." + )); + + // double check the key details saved correctly to local disk + if let Err(e) = Self::check_key(&self.key_dir, &key) { + self.update_status_message( + format!("Failed to check the key '{guid}' details saved locally: {e:?}."), + true, + ) + .await; + return false; + } + + // attest the key + match key::attest_key(&self.base_url, &key).await { + Ok(()) => { + // update in memory + if let Err(e) = self.update_key_to_shared_state(key.clone()).await { + logger::write_warning(format!("Failed to update key: {e}")); + } + + let message = helpers::write_startup_event( + "Successfully attest the key and ready to use.", + "poll_secure_channel_status", + "key_keeper", + logger::AGENT_LOGGER_KEY, ); - redirect_policy_updated = self.update_redirector_policy(status).await; + self.update_status_message(message, false).await; + provision::key_latched(self.create_event_threads_shared_state()).await; + true } + Err(e) => { + logger::write_warning(format!("Failed to attest the key: {e:?}")); + false + } + } + } + + /// Handle secure channel state change + /// Returns true if should continue to next iteration + async fn handle_secure_channel_state_change( + &self, + secure_channel_state_updated: std::result::Result, + state: &str, + status: &KeyStatus, + redirect_policy_updated: &mut bool, + ) -> bool { + match secure_channel_state_updated { + Ok(updated) => { + if updated { + // secure channel state changed, update the redirect policy + *redirect_policy_updated = self.update_redirector_policy(status).await; + + // customer has not enforce the secure channel state + if state == DISABLE_STATE { + let message = helpers::write_startup_event( + "Customer has not enforce the secure channel state.", + "poll_secure_channel_status", + "key_keeper", + logger::AGENT_LOGGER_KEY, + ); + // Update the status message and let the provision to continue + self.update_status_message(message, false).await; + + // clear key in memory for disabled state + if let Err(e) = self.key_keeper_shared_state.clear_key().await { + logger::write_warning(format!("Failed to clear key: {e}")); + } + provision::key_latched(self.create_event_threads_shared_state()).await; + } + + return true; + } + } + Err(e) => { + logger::write_warning(format!("Failed to update secure channel state: {e}")); + } + } + false + } + + /// Create EventThreadsSharedState from current state + fn create_event_threads_shared_state(&self) -> EventThreadsSharedState { + EventThreadsSharedState { + cancellation_token: self.cancellation_token.clone(), + common_state: self.common_state.clone(), + access_control_shared_state: self.access_control_shared_state.clone(), + redirector_shared_state: self.redirector_shared_state.clone(), + key_keeper_shared_state: self.key_keeper_shared_state.clone(), + provision_shared_state: self.provision_shared_state.clone(), + agent_status_shared_state: self.agent_status_shared_state.clone(), + connection_summary_shared_state: self.connection_summary_shared_state.clone(), } } @@ -637,7 +668,7 @@ impl KeyKeeper { /// update the redirector/eBPF policy based on the secure channel status /// it should be called when the secure channel state is changed - async fn update_redirector_policy(&self, status: KeyStatus) -> bool { + async fn update_redirector_policy(&self, status: &KeyStatus) -> bool { // update the redirector policy map if !redirector::update_wire_server_redirect_policy( status.get_wire_server_mode() != DISABLE_STATE, From b9e69bd90099ad2e47bb421ad266346d2d047316 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Mon, 5 Jan 2026 14:38:16 -0800 Subject: [PATCH 04/11] fix typo --- proxy_agent/src/provision.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy_agent/src/provision.rs b/proxy_agent/src/provision.rs index 6c20e961..94422070 100644 --- a/proxy_agent/src/provision.rs +++ b/proxy_agent/src/provision.rs @@ -160,7 +160,7 @@ async fn update_provision_state( } /// update the redirector/eBPF policy based on access control status -/// it should be called when provlision finished +/// it should be called when provision finished async fn update_redirector_policy( redirector_shared_state: RedirectorSharedState, access_control_shared_state: AccessControlSharedState, From e11aecdf972dd2ad0b13577275416de221852d44 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 5 Jan 2026 23:24:06 +0000 Subject: [PATCH 05/11] HostGAPlugin to use WS rules --- proxy_agent/src/key_keeper/key.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/proxy_agent/src/key_keeper/key.rs b/proxy_agent/src/key_keeper/key.rs index 7039b0e6..2a30b406 100644 --- a/proxy_agent/src/key_keeper/key.rs +++ b/proxy_agent/src/key_keeper/key.rs @@ -580,12 +580,14 @@ impl KeyStatus { } pub fn get_hostga_rules(&self) -> Option { - // short-term: HostGA has no rules + // match &self.authorizationRules { + // Some(rules) => rules.hostga.clone(), + // None => None, + // } + + // short-term: HostGA uses wireserver rules // long-term: TBD - match &self.authorizationRules { - Some(rules) => rules.hostga.clone(), - None => None, - } + self.get_wireserver_rules() } pub fn get_wire_server_mode(&self) -> String { From d900c1d65c7f8e43f9c4cfdfd9932d44aff206f9 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 5 Jan 2026 23:34:07 +0000 Subject: [PATCH 06/11] fix ut --- proxy_agent/src/key_keeper/key.rs | 172 ++++++++++++++++-------------- 1 file changed, 91 insertions(+), 81 deletions(-) diff --git a/proxy_agent/src/key_keeper/key.rs b/proxy_agent/src/key_keeper/key.rs index 2a30b406..667b0932 100644 --- a/proxy_agent/src/key_keeper/key.rs +++ b/proxy_agent/src/key_keeper/key.rs @@ -1263,95 +1263,105 @@ mod tests { "roleAssignment identities mismatch" ); - // Validate HostGA rules + // Validate HostGA rules to match WireServer rules let hostga_rules = status.get_hostga_rules().unwrap(); assert_eq!( - "allow", hostga_rules.defaultAccess, - "defaultAccess mismatch" + wireserver_rules.defaultAccess, hostga_rules.defaultAccess, + "hostga_rules defaultAccess mismatch" ); assert_eq!( - "sigid", + status.get_wireserver_rule_id(), status.get_hostga_rule_id(), "HostGA rule id mismatch" ); - assert_eq!("enforce", status.get_hostga_mode(), "HostGA mode mismatch"); - - // Validate HostGA rule details - // Retrieve and validate second privilege for HostGA - let privilege = &hostga_rules - .rules - .as_ref() - .unwrap() - .privileges - .as_ref() - .unwrap()[1]; - - assert_eq!("test2", privilege.name, "privilege name mismatch"); - assert_eq!("/test2", privilege.path, "privilege path mismatch"); - - assert_eq!( - "value3", - privilege.queryParameters.as_ref().unwrap()["key1"], - "privilege queryParameters mismatch" - ); - assert_eq!( - "value4", - privilege.queryParameters.as_ref().unwrap()["key2"], - "privilege queryParameters mismatch" - ); - - // Retrieve and validate second role for HostGA - let role = &hostga_rules.rules.as_ref().unwrap().roles.as_ref().unwrap()[1]; - assert_eq!("test6", role.name, "role name mismatch"); - assert_eq!("test4", role.privileges[0], "role privilege mismatch"); - assert_eq!("test5", role.privileges[1], "role privilege mismatch"); - - // Retrieve and validate first identity for HostGA - let identity = &hostga_rules - .rules - .as_ref() - .unwrap() - .identities - .as_ref() - .unwrap()[0]; - assert_eq!("test", identity.name, "identity name mismatch"); assert_eq!( - "test", - identity.userName.as_ref().unwrap(), - "identity userName mismatch" - ); - assert_eq!( - "test", - identity.groupName.as_ref().unwrap(), - "identity groupName mismatch" - ); - assert_eq!( - "test", - identity.exePath.as_ref().unwrap(), - "identity exePath mismatch" - ); - assert_eq!( - "test", - identity.processName.as_ref().unwrap(), - "identity processName mismatch" - ); - - // Retrieve and validate first role assignment for HostGA - let role_assignment = &hostga_rules - .rules - .as_ref() - .unwrap() - .roleAssignments - .as_ref() - .unwrap()[0]; - assert_eq!( - "test4", role_assignment.role, - "roleAssignment role mismatch" - ); - assert_eq!( - "test", role_assignment.identities[0], - "roleAssignment identities mismatch" - ); + status.get_wire_server_mode(), + status.get_hostga_mode(), + "HostGA mode mismatch" + ); + + /*** + * + * Validate HostGA rule details in future when HostGA has independent rules + + // Validate HostGA rule details + // Retrieve and validate second privilege for HostGA + let privilege = &hostga_rules + .rules + .as_ref() + .unwrap() + .privileges + .as_ref() + .unwrap()[1]; + + assert_eq!("test2", privilege.name, "privilege name mismatch"); + assert_eq!("/test2", privilege.path, "privilege path mismatch"); + + assert_eq!( + "value3", + privilege.queryParameters.as_ref().unwrap()["key1"], + "privilege queryParameters mismatch" + ); + assert_eq!( + "value4", + privilege.queryParameters.as_ref().unwrap()["key2"], + "privilege queryParameters mismatch" + ); + + // Retrieve and validate second role for HostGA + let role = &hostga_rules.rules.as_ref().unwrap().roles.as_ref().unwrap()[1]; + assert_eq!("test6", role.name, "role name mismatch"); + assert_eq!("test4", role.privileges[0], "role privilege mismatch"); + assert_eq!("test5", role.privileges[1], "role privilege mismatch"); + + // Retrieve and validate first identity for HostGA + let identity = &hostga_rules + .rules + .as_ref() + .unwrap() + .identities + .as_ref() + .unwrap()[0]; + assert_eq!("test", identity.name, "identity name mismatch"); + assert_eq!( + "test", + identity.userName.as_ref().unwrap(), + "identity userName mismatch" + ); + assert_eq!( + "test", + identity.groupName.as_ref().unwrap(), + "identity groupName mismatch" + ); + assert_eq!( + "test", + identity.exePath.as_ref().unwrap(), + "identity exePath mismatch" + ); + assert_eq!( + "test", + identity.processName.as_ref().unwrap(), + "identity processName mismatch" + ); + + // Retrieve and validate first role assignment for HostGA + let role_assignment = &hostga_rules + .rules + .as_ref() + .unwrap() + .roleAssignments + .as_ref() + .unwrap()[0]; + assert_eq!( + "test4", role_assignment.role, + "roleAssignment role mismatch" + ); + assert_eq!( + "test", role_assignment.identities[0], + "roleAssignment identities mismatch" + ); + * + */ } #[test] From c8beb0c2f9cccf718e7388bbcc4b8ba25d3c41aa Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Wed, 7 Jan 2026 10:14:45 -0800 Subject: [PATCH 07/11] address review comments --- proxy_agent/src/key_keeper.rs | 167 +++++++++++++++++++++++++--------- proxy_agent/src/provision.rs | 6 +- 2 files changed, 129 insertions(+), 44 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index b44b2f5d..c2e940a0 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -59,7 +59,7 @@ pub const MUST_SIG_WIRESERVER_IMDS: &str = "wireserverandimds"; pub const UNKNOWN_STATE: &str = "Unknown"; static FREQUENT_PULL_INTERVAL: Duration = Duration::from_secs(1); // 1 second const FREQUENT_PULL_TIMEOUT_IN_MILLISECONDS: u128 = 300000; // 5 minutes -const PROVISION_TIMEUP_IN_MILLISECONDS: u128 = 120000; // 2 minute +const PROVISION_TIMEOUT_IN_MILLISECONDS: u128 = 120000; // 2 minute const DELAY_START_EVENT_THREADS_IN_MILLISECONDS: u128 = 60000; // 1 minute #[derive(Clone)] @@ -90,6 +90,15 @@ pub struct KeyKeeper { connection_summary_shared_state: ConnectionSummarySharedState, } +/// Reason for waking up from sleep +/// Used in the pull_secure_channel_status loop +/// Notified: woke up due to notification +/// TimerElapsed: woke up due to sleep-timer elapsed +enum WakeReason { + Notified, + TimerElapsed, +} + impl KeyKeeper { pub fn new( base_url: Uri, @@ -180,7 +189,7 @@ impl KeyKeeper { async fn loop_poll(&self) { let mut first_iteration: bool = true; let mut started_event_threads: bool = false; - let mut provision_timeup: bool = false; + let mut provision_timeout: bool = false; let notify = match self.key_keeper_shared_state.get_notify().await { Ok(notify) => notify, Err(e) => { @@ -218,14 +227,14 @@ impl KeyKeeper { } }; - let sleep = self.calculate_sleep_duration(¤t_state); + let sleep_interval = self.calculate_sleep_duration(¤t_state); let (continue_loop, reset_timer) = self - .handle_notification(¬ify, sleep, ¤t_state) + .handle_notification(¬ify, sleep_interval, ¤t_state) .await; if reset_timer { start = Instant::now(); - provision_timeup = false; + provision_timeout = false; } if !continue_loop { @@ -234,8 +243,7 @@ impl KeyKeeper { } first_iteration = false; - provision_timeup = self - .handle_provision_timeup(&mut start, provision_timeup) + self.handle_provision_timeout(&mut start, &mut provision_timeout) .await; started_event_threads = self.handle_event_threads_start(started_event_threads).await; @@ -301,58 +309,135 @@ impl KeyKeeper { } } + async fn wait_for_wake(notify: &tokio::sync::Notify, sleep_interval: Duration) -> WakeReason { + tokio::select! { + // if a notification arrives + _ = notify.notified() => WakeReason::Notified, + // if the sleep duration elapses + _ = tokio::time::sleep(sleep_interval) => WakeReason::TimerElapsed, + } + } + /// Handle notification and sleep logic /// Returns (continue_loop, reset_timer) async fn handle_notification( &self, notify: &tokio::sync::Notify, - sleep: Duration, + sleep_interval: Duration, current_state: &str, ) -> (bool, bool) { - let time = Instant::now(); - tokio::select! { - // notify to query the secure channel status immediately when the secure channel state is unknown or disabled - // this is to handle quicker response to the secure channel state change during VM provisioning. - _ = notify.notified() => { - if current_state == DISABLE_STATE || current_state == UNKNOWN_STATE { - logger::write_warning(format!("poll_secure_channel_status task notified and secure channel state is '{current_state}', reset states and start poll status now.")); - provision::key_latch_ready_state_reset(self.provision_shared_state.clone()).await; - if let Err(e) = self.key_keeper_shared_state.update_current_secure_channel_state(UNKNOWN_STATE.to_string()).await { - logger::write_warning(format!("Failed to update secure channel state to 'Unknown': {e}")); - } - (true, true) - } else { - // report key latched ready to try update the provision finished time_tick - provision::key_latched(self.create_event_threads_shared_state()).await; - let slept_time_in_millisec = time.elapsed().as_millis(); - let continue_sleep = sleep.as_millis() - slept_time_in_millisec; - if continue_sleep > 0 { - let continue_sleep = Duration::from_millis(continue_sleep as u64); - let message = format!("poll_secure_channel_status task notified but secure channel state is '{current_state}', continue with sleep wait for {continue_sleep:?}."); - logger::write_warning(message); - tokio::time::sleep(continue_sleep).await; - } - (true, false) - } - }, - _ = tokio::time::sleep(sleep) => { - (true, false) + let start_time = Instant::now(); + + match Self::wait_for_wake(notify, sleep_interval).await { + WakeReason::Notified => { + self.handle_notified_state(current_state, sleep_interval, start_time) + .await } + WakeReason::TimerElapsed => (true, false), + } + } + + /// Handle the notified state - either reset or continue with key latched + /// Returns (continue_loop, reset_timer) + async fn handle_notified_state( + &self, + current_state: &str, + sleep_interval: Duration, + start_time: Instant, + ) -> (bool, bool) { + // notify to query the secure channel status immediately when the secure channel state is unknown or disabled + // this is to handle quicker response to the secure channel state change during VM provisioning. + + if self.should_reset_state(current_state) { + self.reset_state_on_notification(current_state).await + } else { + self.continue_with_key_latched(current_state, sleep_interval, start_time) + .await + } + } + + /// Check if the state should be reset based on current secure channel state + fn should_reset_state(&self, current_state: &str) -> bool { + current_state == DISABLE_STATE || current_state == UNKNOWN_STATE + } + + /// Reset state when notified in disabled or unknown state + /// Returns (continue_loop, reset_timer) + async fn reset_state_on_notification(&self, current_state: &str) -> (bool, bool) { + logger::write_warning(format!( + "poll_secure_channel_status task notified and secure channel state is '{current_state}', reset states and start poll status now." + )); + + provision::key_latch_ready_state_reset(self.provision_shared_state.clone()).await; + + if let Err(e) = self + .key_keeper_shared_state + .update_current_secure_channel_state(UNKNOWN_STATE.to_string()) + .await + { + logger::write_warning(format!( + "Failed to update secure channel state to 'Unknown': {e}" + )); + } + + (true, true) + } + + /// Continue with key latched when notified in a stable state + /// Returns (continue_loop, reset_timer) + async fn continue_with_key_latched( + &self, + current_state: &str, + sleep_interval: Duration, + current_loop_iteration_start_time: Instant, + ) -> (bool, bool) { + // report key latched ready to try update the provision finished time_tick + provision::key_latched(self.create_event_threads_shared_state()).await; + + self.handle_remaining_sleep( + current_state, + sleep_interval, + current_loop_iteration_start_time, + ) + .await; + + (true, false) + } + + /// Handle the remaining sleep time after the notification + async fn handle_remaining_sleep( + &self, + current_state: &str, + sleep_interval: Duration, + current_loop_iteration_start_time: Instant, + ) { + let slept_time_in_millisec = current_loop_iteration_start_time.elapsed().as_millis(); + let continue_sleep = sleep_interval + .as_millis() + .saturating_sub(slept_time_in_millisec); + if continue_sleep > 0 { + // continue sleep with the remaining time for current loop iteration + // it is to avoid too frequent polling when the secure channel state is stable + let continue_sleep = Duration::from_millis(continue_sleep as u64); + let message = format!( + "poll_secure_channel_status task notified but secure channel state is '{current_state}', continue with sleep wait for {continue_sleep:?}." + ); + logger::write_warning(message); + tokio::time::sleep(continue_sleep).await; } } /// Handle provision timeout logic - async fn handle_provision_timeup(&self, start: &mut Instant, provision_timeup: bool) -> bool { - if !provision_timeup && start.elapsed().as_millis() > PROVISION_TIMEUP_IN_MILLISECONDS { - provision::provision_timeup( + async fn handle_provision_timeout(&self, start: &mut Instant, provision_timeout: &mut bool) { + if !*provision_timeout && start.elapsed().as_millis() > PROVISION_TIMEOUT_IN_MILLISECONDS { + provision::provision_timeout( None, self.provision_shared_state.clone(), self.agent_status_shared_state.clone(), ) .await; - return true; + *provision_timeout = true; } - provision_timeup } /// Handle starting event threads diff --git a/proxy_agent/src/provision.rs b/proxy_agent/src/provision.rs index 94422070..fd11fbea 100644 --- a/proxy_agent/src/provision.rs +++ b/proxy_agent/src/provision.rs @@ -237,9 +237,9 @@ async fn reset_provision_state( /// use std::sync::{Arc, Mutex}; /// /// let shared_state = Arc::new(Mutex::new(SharedState::new())); -/// provision::provision_timeup(None, shared_state.clone()); +/// provision::provision_timeout(None, shared_state.clone()); /// ``` -pub async fn provision_timeup( +pub async fn provision_timeout( provision_dir: Option, provision_shared_state: ProvisionSharedState, agent_status_shared_state: AgentStatusSharedState, @@ -944,7 +944,7 @@ mod tests { super::AgentStatusModule::KeyKeeper, ) .await; - super::provision_timeup( + super::provision_timeout( Some(temp_test_path.clone()), provision_shared_state.clone(), agent_status_shared_state.clone(), From 8ea354fb86c534568485419f0f8109ee7f254d7f Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Wed, 7 Jan 2026 18:28:13 +0000 Subject: [PATCH 08/11] rename --- proxy_agent/src/key_keeper.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index c2e940a0..89667a87 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -209,7 +209,7 @@ impl KeyKeeper { )); } - let mut start = Instant::now(); + let mut provision_start_time = Instant::now(); let mut redirect_policy_updated = false; loop { if !first_iteration { @@ -233,7 +233,7 @@ impl KeyKeeper { .await; if reset_timer { - start = Instant::now(); + provision_start_time = Instant::now(); provision_timeout = false; } @@ -243,7 +243,7 @@ impl KeyKeeper { } first_iteration = false; - self.handle_provision_timeout(&mut start, &mut provision_timeout) + self.handle_provision_timeout(&mut provision_start_time, &mut provision_timeout) .await; started_event_threads = self.handle_event_threads_start(started_event_threads).await; @@ -428,8 +428,8 @@ impl KeyKeeper { } /// Handle provision timeout logic - async fn handle_provision_timeout(&self, start: &mut Instant, provision_timeout: &mut bool) { - if !*provision_timeout && start.elapsed().as_millis() > PROVISION_TIMEOUT_IN_MILLISECONDS { + async fn handle_provision_timeout(&self, provision_start_time: &mut Instant, provision_timeout: &mut bool) { + if !*provision_timeout && provision_start_time.elapsed().as_millis() > PROVISION_TIMEOUT_IN_MILLISECONDS { provision::provision_timeout( None, self.provision_shared_state.clone(), From 5c87b7f74a37c56d204a6e6de92b3b4071072c1f Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Wed, 7 Jan 2026 18:45:56 +0000 Subject: [PATCH 09/11] fix formatting --- proxy_agent/src/key_keeper.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index 89667a87..fb527e9b 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -343,16 +343,19 @@ impl KeyKeeper { &self, current_state: &str, sleep_interval: Duration, - start_time: Instant, + current_loop_iteration_start_time: Instant, ) -> (bool, bool) { // notify to query the secure channel status immediately when the secure channel state is unknown or disabled // this is to handle quicker response to the secure channel state change during VM provisioning. - if self.should_reset_state(current_state) { self.reset_state_on_notification(current_state).await } else { - self.continue_with_key_latched(current_state, sleep_interval, start_time) - .await + self.continue_with_key_latched( + current_state, + sleep_interval, + current_loop_iteration_start_time, + ) + .await } } @@ -428,8 +431,14 @@ impl KeyKeeper { } /// Handle provision timeout logic - async fn handle_provision_timeout(&self, provision_start_time: &mut Instant, provision_timeout: &mut bool) { - if !*provision_timeout && provision_start_time.elapsed().as_millis() > PROVISION_TIMEOUT_IN_MILLISECONDS { + async fn handle_provision_timeout( + &self, + provision_start_time: &mut Instant, + provision_timeout: &mut bool, + ) { + if !*provision_timeout + && provision_start_time.elapsed().as_millis() > PROVISION_TIMEOUT_IN_MILLISECONDS + { provision::provision_timeout( None, self.provision_shared_state.clone(), From 9b3379fc3b58defcd88c8918d2663118b4d5a903 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Wed, 7 Jan 2026 12:54:52 -0800 Subject: [PATCH 10/11] Add common fn parse_date_time_string --- proxy_agent_extension/src/service_main.rs | 6 + proxy_agent_shared/Cargo.toml | 2 +- proxy_agent_shared/src/error.rs | 3 + proxy_agent_shared/src/misc_helpers.rs | 118 +++++++++++++++++- .../src/proxy_agent_aggregate_status.rs | 7 ++ 5 files changed, 134 insertions(+), 2 deletions(-) diff --git a/proxy_agent_extension/src/service_main.rs b/proxy_agent_extension/src/service_main.rs index 4ad85e32..120ca171 100644 --- a/proxy_agent_extension/src/service_main.rs +++ b/proxy_agent_extension/src/service_main.rs @@ -921,6 +921,12 @@ mod tests { proxyConnectionSummary: vec![proxy_connection_summary_obj], failedAuthenticateSummary: vec![proxy_failedAuthenticateSummary_obj], }; + let result = toplevel_status.get_status_timestamp(); + assert!( + result.is_ok(), + "Status timestamp parse expected Ok result, got Err: {:?}", + result.err() + ); let mut status = StatusObj { name: constants::PLUGIN_NAME.to_string(), diff --git a/proxy_agent_shared/Cargo.toml b/proxy_agent_shared/Cargo.toml index dcbc9b07..182100b3 100644 --- a/proxy_agent_shared/Cargo.toml +++ b/proxy_agent_shared/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] concurrent-queue = "2.1.0" # for event queue once_cell = "1.17.0" # use Lazy -time = { version = "0.3.30", features = ["formatting"] } +time = { version = "0.3.30", features = ["formatting", "parsing"] } thread-id = "4.0.0" serde = "1.0.152" serde_derive = "1.0.152" diff --git a/proxy_agent_shared/src/error.rs b/proxy_agent_shared/src/error.rs index 44cc2749..b933a999 100644 --- a/proxy_agent_shared/src/error.rs +++ b/proxy_agent_shared/src/error.rs @@ -53,6 +53,9 @@ pub enum Error { #[error("Failed to receive '{0}' action response with error {1}")] RecvError(String, tokio::sync::oneshot::error::RecvError), + + #[error("Parse datetime string error: {0}")] + ParseDateTimeStringError(String), } #[derive(Debug, thiserror::Error)] diff --git a/proxy_agent_shared/src/misc_helpers.rs b/proxy_agent_shared/src/misc_helpers.rs index e984734b..439bfb1b 100644 --- a/proxy_agent_shared/src/misc_helpers.rs +++ b/proxy_agent_shared/src/misc_helpers.rs @@ -13,7 +13,7 @@ use std::{ process::Command, }; use thread_id; -use time::{format_description, OffsetDateTime}; +use time::{format_description, OffsetDateTime, PrimitiveDateTime}; #[cfg(windows)] use super::windows; @@ -57,6 +57,50 @@ pub fn get_date_time_unix_nano() -> i128 { OffsetDateTime::now_utc().unix_timestamp_nanos() } +/// Parse a datetime string to OffsetDateTime (UTC) +/// Supports multiple formats: +/// - ISO 8601 with/without 'Z': "YYYY-MM-DDTHH:MM:SS" or "YYYY-MM-DDTHH:MM:SSZ" +/// - With milliseconds: "YYYY-MM-DDTHH:MM:SS.mmm" +/// # Arguments +/// * `datetime_str` - A datetime string to parse +/// # Returns +/// A Result containing the parsed OffsetDateTime (UTC) or an error if parsing fails +/// # Example +/// ```rust +/// use proxy_agent_shared::misc_helpers; +/// let datetime1 = misc_helpers::parse_date_time_string("2024-01-15T10:30:45Z").unwrap(); +/// let datetime2 = misc_helpers::parse_date_time_string("2024-01-15T10:30:45").unwrap(); +/// let datetime3 = misc_helpers::parse_date_time_string("2024-01-15T10:30:45.123").unwrap(); +/// ``` +pub fn parse_date_time_string(datetime_str: &str) -> Result { + // Remove the 'Z' suffix if present + let datetime_str_trimmed = datetime_str.trim_end_matches('Z'); + + // Try parsing with milliseconds first + let date_format_with_millis = + format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]") + .map_err(|e| Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")))?; + + if let Ok(primitive_datetime) = + PrimitiveDateTime::parse(datetime_str_trimmed, &date_format_with_millis) + { + return Ok(primitive_datetime.assume_utc()); + } + + // Fall back to parsing without milliseconds + let date_format = format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]") + .map_err(|e| Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")))?; + + let primitive_datetime = + PrimitiveDateTime::parse(datetime_str_trimmed, &date_format).map_err(|e| { + Error::ParseDateTimeStringError(format!( + "Failed to parse datetime string '{datetime_str}': {e}" + )) + })?; + + Ok(primitive_datetime.assume_utc()) +} + pub fn try_create_folder(dir: &Path) -> Result<()> { match dir.try_exists() { Ok(exists) => { @@ -654,4 +698,76 @@ mod tests { ); } } + + #[test] + fn parse_date_time_string_test() { + // Test parsing with milliseconds + let datetime_str = "2024-01-15T10:30:45.123"; + let result = super::parse_date_time_string(datetime_str); + assert!( + result.is_ok(), + "Failed to parse datetime string with milliseconds" + ); + + let datetime = result.unwrap(); + assert_eq!(datetime.year(), 2024); + assert_eq!(datetime.month() as u8, 1); + assert_eq!(datetime.day(), 15); + assert_eq!(datetime.hour(), 10); + assert_eq!(datetime.minute(), 30); + assert_eq!(datetime.second(), 45); + assert_eq!(datetime.millisecond(), 123); + + // Test parsing with 'Z' suffix + let datetime_str = "2024-01-15T10:30:45Z"; + let result = super::parse_date_time_string(datetime_str); + assert!( + result.is_ok(), + "Failed to parse datetime string with Z suffix" + ); + + let datetime = result.unwrap(); + assert_eq!(datetime.year(), 2024); + assert_eq!(datetime.month() as u8, 1); + assert_eq!(datetime.day(), 15); + assert_eq!(datetime.hour(), 10); + assert_eq!(datetime.minute(), 30); + assert_eq!(datetime.second(), 45); + + // Test parsing without 'Z' suffix + let datetime_str_without_z = "2024-01-15T10:30:45"; + let result = super::parse_date_time_string(datetime_str_without_z); + assert!(result.is_ok(), "Should parse datetime string without 'Z'"); + + // Test round-trip with milliseconds format + let original_datetime_str = super::get_date_time_string_with_milliseconds(); + let result = super::parse_date_time_string(&original_datetime_str); + assert!( + result.is_ok(), + "Failed to parse datetime string with milliseconds" + ); + + // Test round-trip with standard format + let original_datetime_str = super::get_date_time_string(); + let result = super::parse_date_time_string(&original_datetime_str); + assert!( + result.is_ok(), + "Failed to parse datetime string without milliseconds" + ); + + // Test invalid format + let invalid_datetime_str = "2024-01-15 10:30:45"; // space instead of 'T' + let result = super::parse_date_time_string(invalid_datetime_str); + assert!( + result.is_err(), + "Should fail to parse invalid datetime string" + ); + + let invalid_datetime_str = "2024-01-15T10:30"; // without seconds + let result = super::parse_date_time_string(invalid_datetime_str); + assert!( + result.is_err(), + "Should fail to parse invalid datetime string" + ); + } } diff --git a/proxy_agent_shared/src/proxy_agent_aggregate_status.rs b/proxy_agent_shared/src/proxy_agent_aggregate_status.rs index 5a3f2e6a..6a6a2971 100644 --- a/proxy_agent_shared/src/proxy_agent_aggregate_status.rs +++ b/proxy_agent_shared/src/proxy_agent_aggregate_status.rs @@ -3,6 +3,7 @@ use crate::misc_helpers; use serde_derive::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf}; +use time::OffsetDateTime; #[cfg(windows)] const PROXY_AGENT_AGGREGATE_STATUS_FOLDER: &str = "%SYSTEMDRIVE%\\WindowsAzure\\ProxyAgent\\Logs\\"; @@ -88,3 +89,9 @@ pub struct GuestProxyAgentAggregateStatus { pub proxyConnectionSummary: Vec, pub failedAuthenticateSummary: Vec, } + +impl GuestProxyAgentAggregateStatus { + pub fn get_status_timestamp(&self) -> crate::result::Result { + misc_helpers::parse_date_time_string(&self.timestamp) + } +} From da95fd4788395818d9f711f41e109b309ab3aae2 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Wed, 7 Jan 2026 13:00:08 -0800 Subject: [PATCH 11/11] fix formmating --- .github/actions/spelling/expect.txt | 3 +++ proxy_agent_shared/src/misc_helpers.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 9e336d69..35213d2f 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -58,6 +58,7 @@ daddr datacenter davidanson DDCE +DDTHH debbuild DEBHELPER debian @@ -176,6 +177,7 @@ MFC microsoftcblmariner microsoftlosangeles microsoftwindowsdesktop +mmm mnt msasn msp @@ -271,6 +273,7 @@ spellright splitn SRPMS SSRF +SSZ stackoverflow stdbool stdint diff --git a/proxy_agent_shared/src/misc_helpers.rs b/proxy_agent_shared/src/misc_helpers.rs index 439bfb1b..d8631098 100644 --- a/proxy_agent_shared/src/misc_helpers.rs +++ b/proxy_agent_shared/src/misc_helpers.rs @@ -79,7 +79,9 @@ pub fn parse_date_time_string(datetime_str: &str) -> Result { // Try parsing with milliseconds first let date_format_with_millis = format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond]") - .map_err(|e| Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")))?; + .map_err(|e| { + Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")) + })?; if let Ok(primitive_datetime) = PrimitiveDateTime::parse(datetime_str_trimmed, &date_format_with_millis) @@ -89,7 +91,9 @@ pub fn parse_date_time_string(datetime_str: &str) -> Result { // Fall back to parsing without milliseconds let date_format = format_description::parse("[year]-[month]-[day]T[hour]:[minute]:[second]") - .map_err(|e| Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")))?; + .map_err(|e| { + Error::ParseDateTimeStringError(format!("Failed to parse date format: {e}")) + })?; let primitive_datetime = PrimitiveDateTime::parse(datetime_str_trimmed, &date_format).map_err(|e| {