diff --git a/embedded-service/src/bus_error.rs b/embedded-service/src/bus_error.rs new file mode 100644 index 00000000..878a6228 --- /dev/null +++ b/embedded-service/src/bus_error.rs @@ -0,0 +1,7 @@ +//! Module for traits related to bus errors + +/// Super trait for traits that need a bus-specific error type. +pub trait BusError { + /// Type of error returned by the bus + type BusError; +} diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index dfabd3d8..ec02e7c5 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -1,8 +1,12 @@ //! Common traits for event senders and receivers +use core::{future::ready, marker::PhantomData}; -use core::marker::PhantomData; +use crate::error; -use embassy_sync::channel::{DynamicReceiver, DynamicSender}; +use embassy_sync::{ + channel::{DynamicReceiver, DynamicSender}, + pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, +}; /// Common event sender trait pub trait Sender { @@ -44,6 +48,42 @@ impl Receiver for DynamicReceiver<'_, E> { } } +impl Sender for DynImmediatePublisher<'_, E> { + fn try_send(&mut self, event: E) -> Option<()> { + self.try_publish(event).ok() + } + + fn send(&mut self, event: E) -> impl Future { + self.publish_immediate(event); + ready(()) + } +} + +impl Receiver for DynSubscriber<'_, E> { + fn try_next(&mut self) -> Option { + match self.try_next_message() { + Some(WaitResult::Message(e)) => Some(e), + Some(WaitResult::Lagged(e)) => { + error!("Subscriber lagged, skipping {} events", e); + None + } + _ => None, + } + } + + async fn wait_next(&mut self) -> E { + loop { + match self.next_message().await { + WaitResult::Message(e) => return e, + WaitResult::Lagged(e) => { + error!("Subscriber lagged, skipping {} events", e); + continue; + } + } + } + } +} + /// A sender that discards all events sent to it. pub struct NoopSender; diff --git a/embedded-service/src/lib.rs b/embedded-service/src/lib.rs index e62562e3..8e23e24b 100644 --- a/embedded-service/src/lib.rs +++ b/embedded-service/src/lib.rs @@ -14,6 +14,7 @@ pub mod thread_mode_cell; pub mod activity; pub mod broadcaster; pub mod buffer; +pub mod bus_error; pub mod comms; pub mod event; pub mod fmt; diff --git a/examples/rt685s-evk/src/bin/type_c.rs b/examples/rt685s-evk/src/bin/type_c.rs index 629230a7..3b94eec7 100644 --- a/examples/rt685s-evk/src/bin/type_c.rs +++ b/examples/rt685s-evk/src/bin/type_c.rs @@ -12,11 +12,11 @@ use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion, HostToken}; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_services::{error, info}; use embedded_usb_pd::GlobalPortId; use power_policy_interface::psu; @@ -26,7 +26,7 @@ use static_cell::StaticCell; use tps6699x::asynchronous::embassy as tps6699x; use type_c_interface::port::ControllerId; use type_c_service::driver::tps6699x::{self as tps6699x_drv}; -use type_c_service::service::Service; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::ControllerWrapper; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -66,11 +66,27 @@ type Wrapper<'a> = ControllerWrapper< type Controller<'a> = tps6699x::controller::Controller>; type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static>; + #[embassy_executor::task] async fn pd_controller_task(controller: &'static Wrapper<'static>) { loop { @@ -95,12 +111,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::main] @@ -196,11 +213,12 @@ async fn main(spawner: Spawner) { // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, + PubSubChannel, > = StaticCell::new(); let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); // Guaranteed to not panic since we initialized the channel above let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); @@ -210,7 +228,7 @@ async fn main(spawner: Spawner) { let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -220,20 +238,20 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_publisher, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); let cfu_client = CfuClient::new(&CFU_CLIENT).await; info!("Spawining type-c service task"); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); info!("Spawining power policy task"); spawner.must_spawn(power_policy_task( diff --git a/examples/rt685s-evk/src/bin/type_c_cfu.rs b/examples/rt685s-evk/src/bin/type_c_cfu.rs index 7defa355..9e4c9da2 100644 --- a/examples/rt685s-evk/src/bin/type_c_cfu.rs +++ b/examples/rt685s-evk/src/bin/type_c_cfu.rs @@ -13,13 +13,13 @@ use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::*; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion}; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_services::{error, info}; use embedded_usb_pd::GlobalPortId; use power_policy_interface::psu; @@ -29,7 +29,7 @@ use static_cell::StaticCell; use tps6699x::asynchronous::embassy as tps6699x; use type_c_interface::port::ControllerId; use type_c_service::driver::tps6699x::{self as tps6699x_drv}; -use type_c_service::service::Service; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::ControllerWrapper; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -64,11 +64,27 @@ type Wrapper<'a> = ControllerWrapper< type Controller<'a> = tps6699x::controller::Controller>; type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static>; + const NUM_PD_CONTROLLERS: usize = 1; const CONTROLLER0_ID: ControllerId = ControllerId(0); const CONTROLLER0_CFU_ID: ComponentId = 0x12; @@ -179,12 +195,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::main] @@ -286,9 +303,20 @@ async fn main(spawner: Spawner) { static POWER_SERVICE_CONTEXT: StaticCell = StaticCell::new(); let power_service_context = POWER_SERVICE_CONTEXT.init(power_policy_service::service::context::Context::new()); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -298,30 +326,20 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_publisher, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); let cfu_client = CfuClient::new(&CFU_CLIENT).await; info!("Spawining type-c service task"); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); info!("Spawining power policy task"); spawner.must_spawn(power_policy_task( diff --git a/examples/std/src/bin/type_c/service.rs b/examples/std/src/bin/type_c/service.rs index bb6f1d7b..346ee26a 100644 --- a/examples/std/src/bin/type_c/service.rs +++ b/examples/std/src/bin/type_c/service.rs @@ -3,10 +3,10 @@ use embassy_executor::{Executor, Spawner}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::ado::Ado; use embedded_usb_pd::type_c::Current; @@ -19,8 +19,8 @@ use std_examples::type_c::mock_controller; use std_examples::type_c::mock_controller::Wrapper; use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; -use type_c_service::service::Service; use type_c_service::service::config::Config; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::util::power_capability_from_current; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::message::*; @@ -33,11 +33,27 @@ const DELAY_MS: u64 = 1000; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 1, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 1, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static>; + #[embassy_executor::task] async fn controller_task( wrapper: &'static Wrapper<'static>, @@ -80,9 +96,21 @@ async fn task(spawner: Spawner) { let (wrapper, policy_receiver, controller, state) = create_wrapper(controller_context); + // Create type-c service + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -92,24 +120,8 @@ async fn task(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Config::default(), - controller_context, - power_policy_publisher, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Config::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -119,7 +131,12 @@ async fn task(spawner: Spawner) { ArrayEventReceivers::new([&wrapper.ports[0].proxy], [policy_receiver]), power_service, )); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); spawner.must_spawn(controller_task(wrapper, controller)); Timer::after_millis(1000).await; @@ -156,12 +173,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } fn create_wrapper( diff --git a/examples/std/src/bin/type_c/ucsi.rs b/examples/std/src/bin/type_c/ucsi.rs index ce9e7d3e..42c16603 100644 --- a/examples/std/src/bin/type_c/ucsi.rs +++ b/examples/std/src/bin/type_c/ucsi.rs @@ -5,10 +5,10 @@ use embassy_executor::{Executor, Spawner}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embedded_services::GlobalRawMutex; use embedded_services::IntrusiveList; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::ucsi::lpm::get_connector_capability::OperationModeFlags; use embedded_usb_pd::ucsi::ppm::ack_cc_ci::Ack; @@ -22,10 +22,10 @@ use power_policy_service::psu::ArrayEventReceivers; use power_policy_service::service::registration::ArrayRegistration; use static_cell::StaticCell; use std_examples::type_c::mock_controller; +use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; -use type_c_service::service::Service; use type_c_service::service::config::Config; -use type_c_interface::port::ControllerId; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -39,13 +39,30 @@ const CFU1_ID: u8 = 0x01; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static>; + #[embassy_executor::task] async fn opm_task(_context: &'static Context, _state: [&'static mock_controller::ControllerState; NUM_PD_CONTROLLERS]) { + // TODO: migrate this logic to an integration test when we move away from 'static lifetimes. /*const CAPABILITY: PowerCapability = PowerCapability { voltage_mv: 20000, current_ma: 5000, @@ -194,12 +211,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::task] @@ -306,9 +324,20 @@ async fn task(spawner: Spawner) { static POWER_SERVICE_CONTEXT: StaticCell = StaticCell::new(); let power_service_context = POWER_SERVICE_CONTEXT.init(power_policy_service::service::context::Context::new()); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper0.ports[0].proxy, &wrapper1.ports[0].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -319,18 +348,8 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create( Config { ucsi_capabilities: UcsiCapabilities { num_connectors: 2, @@ -356,9 +375,7 @@ async fn task(spawner: Spawner) { ..Default::default() }, controller_context, - power_policy_publisher, - power_policy_subscriber, - )); + ))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -372,7 +389,12 @@ async fn task(spawner: Spawner) { power_service, )); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper0, wrapper1], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper0, wrapper1], + cfu_client, + )); spawner.must_spawn(wrapper_task(wrapper0)); spawner.must_spawn(wrapper_task(wrapper1)); spawner.must_spawn(opm_task(controller_context, [state0, state1])); diff --git a/examples/std/src/bin/type_c/unconstrained.rs b/examples/std/src/bin/type_c/unconstrained.rs index d207fcf5..4ce1ddb6 100644 --- a/examples/std/src/bin/type_c/unconstrained.rs +++ b/examples/std/src/bin/type_c/unconstrained.rs @@ -6,10 +6,10 @@ use embassy_sync::channel::DynamicReceiver; use embassy_sync::channel::DynamicSender; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use log::*; use power_policy_interface::capability::PowerCapability; @@ -19,12 +19,12 @@ use power_policy_service::service::registration::ArrayRegistration; use static_cell::StaticCell; use std_examples::type_c::mock_controller; use type_c_interface::port::ControllerId; -use type_c_service::service::Service; - -const NUM_PD_CONTROLLERS: usize = 3; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; +const NUM_PD_CONTROLLERS: usize = 3; + const CONTROLLER0_ID: ControllerId = ControllerId(0); const PORT0_ID: GlobalPortId = GlobalPortId(0); const CFU0_ID: u8 = 0x00; @@ -41,11 +41,27 @@ const DELAY_MS: u64 = 1000; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 3, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 3, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static>; + #[embassy_executor::task(pool_size = 3)] async fn controller_task(wrapper: &'static mock_controller::Wrapper<'static>) { loop { @@ -174,13 +190,24 @@ async fn task(spawner: Spawner) { crate::mock_controller::Validator, )); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [ &wrapper0.ports[0].proxy, &wrapper1.ports[0].proxy, &wrapper2.ports[0].proxy, ], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -191,23 +218,8 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_publisher, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -226,6 +238,7 @@ async fn task(spawner: Spawner) { )); spawner.must_spawn(type_c_service_task( type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), [wrapper0, wrapper1, wrapper2], cfu_client, )); @@ -293,12 +306,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } fn main() { diff --git a/power-policy-interface/src/service/event.rs b/power-policy-interface/src/service/event.rs index 40874119..41d7b319 100644 --- a/power-policy-interface/src/service/event.rs +++ b/power-policy-interface/src/service/event.rs @@ -11,7 +11,7 @@ use crate::{ /// This enum doesn't contain a reference to the device and is suitable /// for receivers that don't need to know which device triggered the event /// and allows for receivers that don't need to be generic over the device type. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum EventData { /// Consumer disconnected diff --git a/type-c-interface/src/port/displayport.rs b/type-c-interface/src/port/displayport.rs new file mode 100644 index 00000000..8f70ee76 --- /dev/null +++ b/type-c-interface/src/port/displayport.rs @@ -0,0 +1,15 @@ +//! Traits and types related to DisplayPort alternate mode + +use embedded_usb_pd::Error; + +use crate::port::{DpConfig, DpStatus, pd::Pd}; + +/// Trait for ports that support DisplayPort alternate mode operations +pub trait DisplayPort: Pd { + /// Get DisplayPort status + fn get_dp_status(&mut self) -> impl Future>>; + /// Set DisplayPort configuration + fn set_dp_config(&mut self, config: DpConfig) -> impl Future>>; + /// Get DisplayPort configuration + fn get_dp_config(&mut self) -> impl Future>>; +} diff --git a/type-c-interface/src/port/mod.rs b/type-c-interface/src/port/mod.rs index f52836b6..ca341682 100644 --- a/type-c-interface/src/port/mod.rs +++ b/type-c-interface/src/port/mod.rs @@ -1,7 +1,7 @@ //! PD controller related code use core::future::Future; -use embedded_usb_pd::ucsi::{self, lpm}; +use embedded_usb_pd::ucsi::{GlobalResponse as UcsiGlobalResponse, lpm}; use embedded_usb_pd::{ DataRole, Error, GlobalPortId, LocalPortId, PdError, PlugOrientation, PowerRole, ado::Ado, @@ -12,7 +12,14 @@ use embedded_usb_pd::{ use embedded_services::ipc::deferred; use embedded_services::{GlobalRawMutex, intrusive_list}; +pub mod displayport; pub mod event; +pub mod pd; +pub mod retimer; +pub mod state_machine; +pub mod thunderbolt; +pub mod ucsi; +pub mod usb; use event::{PortEvent, PortPending}; @@ -405,24 +412,24 @@ pub enum Command { /// Controller-specific response data #[derive(Copy, Clone, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum InternalResponseData<'a> { +pub enum InternalResponseData { /// Command complete Complete, /// Controller status - Status(ControllerStatus<'a>), + Status(ControllerStatus), } /// Response for controller-specific commands -pub type InternalResponse<'a> = Result, PdError>; +pub type InternalResponse = Result; /// PD controller command response #[derive(Copy, Clone, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum Response<'a> { +pub enum Response { /// Controller response - Controller(InternalResponse<'a>), + Controller(InternalResponse), /// UCSI response passthrough - Ucsi(ucsi::GlobalResponse), + Ucsi(UcsiGlobalResponse), /// Port response Port(PortResponse), } @@ -430,9 +437,9 @@ pub enum Response<'a> { /// Controller status #[derive(Copy, Clone, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct ControllerStatus<'a> { +pub struct ControllerStatus { /// Current controller mode - pub mode: &'a str, + pub mode: &'static str, /// True if we did not have to boot from a backup FW bank pub valid_fw_bank: bool, /// FW version 0 @@ -447,7 +454,7 @@ pub struct Device<'a> { id: ControllerId, ports: &'a [GlobalPortId], num_ports: usize, - command: deferred::Channel>, + command: deferred::Channel, } impl intrusive_list::NodeContainer for Device<'static> { @@ -474,7 +481,7 @@ impl<'a> Device<'a> { } /// Send a command to this controller - pub async fn execute_command(&self, command: Command) -> Response<'_> { + pub async fn execute_command(&self, command: Command) -> Response { self.command.execute(command).await } @@ -500,7 +507,7 @@ impl<'a> Device<'a> { /// Create a command handler for this controller /// /// DROP SAFETY: Direct call to deferred channel primitive - pub async fn receive(&self) -> deferred::Request<'_, GlobalRawMutex, Command, Response<'static>> { + pub async fn receive(&self) -> deferred::Request<'_, GlobalRawMutex, Command, Response> { self.command.receive().await } @@ -574,9 +581,7 @@ pub trait Controller { enable: bool, ) -> impl Future>>; /// Get current controller status - fn get_controller_status( - &mut self, - ) -> impl Future, Error>>; + fn get_controller_status(&mut self) -> impl Future>>; /// Get current PD alert fn get_pd_alert(&mut self, port: LocalPortId) -> impl Future, Error>>; /// Set the maximum sink voltage for the given port diff --git a/type-c-interface/src/port/pd.rs b/type-c-interface/src/port/pd.rs new file mode 100644 index 00000000..1c7048c3 --- /dev/null +++ b/type-c-interface/src/port/pd.rs @@ -0,0 +1,37 @@ +//! Module for core PD trait + +use embedded_services::bus_error::BusError; +use embedded_usb_pd::{Error, ado::Ado}; + +use crate::port::{AttnVdm, OtherVdm, PortStatus, SendVdm}; + +/// Core PD trait containing base functionality from the PD spec. +pub trait Pd: BusError { + /// Returns the port status + fn get_port_status(&mut self) -> impl Future>>; + + /// Clear the dead battery flag + fn clear_dead_battery_flag(&mut self) -> impl Future>>; + + /// Enable or disable sink path + fn enable_sink_path(&mut self, enable: bool) -> impl Future>>; + + /// Get current PD alert + fn get_pd_alert(&mut self) -> impl Future, Error>>; + + /// Set port unconstrained status + fn set_unconstrained_power( + &mut self, + unconstrained: bool, + ) -> impl Future>>; + + /// Get the Rx Other VDM data + fn get_other_vdm(&mut self) -> impl Future>>; + /// Get the Rx Attention VDM data + fn get_attn_vdm(&mut self) -> impl Future>>; + /// Send a VDM + fn send_vdm(&mut self, tx_vdm: SendVdm) -> impl Future>>; + + /// Execute PD Data Reset + fn execute_drst(&mut self) -> impl Future>>; +} diff --git a/type-c-interface/src/port/retimer.rs b/type-c-interface/src/port/retimer.rs new file mode 100644 index 00000000..4f137bba --- /dev/null +++ b/type-c-interface/src/port/retimer.rs @@ -0,0 +1,21 @@ +//! Module for retimer related traits and types + +use embedded_services::bus_error::BusError; +use embedded_usb_pd::Error; + +use crate::port::RetimerFwUpdateState; + +/// Retimer trait +pub trait Retimer: BusError { + /// Returns the retimer fw update state + fn get_rt_fw_update_status(&mut self) -> impl Future>>; + /// Set retimer fw update state + fn set_rt_fw_update_state(&mut self) -> impl Future>>; + /// Clear retimer fw update state + fn clear_rt_fw_update_state(&mut self) -> impl Future>>; + /// Set retimer compliance + fn set_rt_compliance(&mut self) -> impl Future>>; + + /// Reconfigure the retimer for the given port. + fn reconfigure_retimer(&mut self) -> impl Future>>; +} diff --git a/type-c-interface/src/port/state_machine.rs b/type-c-interface/src/port/state_machine.rs new file mode 100644 index 00000000..fc8de044 --- /dev/null +++ b/type-c-interface/src/port/state_machine.rs @@ -0,0 +1,33 @@ +//! Traits and types that deal with port state machines + +use embedded_usb_pd::Error; + +use crate::port::{PdStateMachineConfig, TypeCStateMachineState, pd::Pd}; + +/// Trait for ports that support Type-C state machine operations +pub trait TypeCStateMachine: Pd { + /// Set Type-C state-machine configuration + fn set_type_c_state_machine_config( + &mut self, + state: TypeCStateMachineState, + ) -> impl Future>>; + + /// Set Type-C state-machine configuration + fn get_type_c_state_machine_config( + &mut self, + ) -> impl Future>>; +} + +/// Trait for ports that support PD state machine operations +pub trait PdStateMachine: Pd { + /// Set PD state-machine configuration + fn set_pd_state_machine_config( + &mut self, + config: PdStateMachineConfig, + ) -> impl Future>>; + + /// Get PD state-machine configuration + fn get_pd_state_machine_config( + &mut self, + ) -> impl Future>>; +} diff --git a/type-c-interface/src/port/thunderbolt.rs b/type-c-interface/src/port/thunderbolt.rs new file mode 100644 index 00000000..6bdf5cdf --- /dev/null +++ b/type-c-interface/src/port/thunderbolt.rs @@ -0,0 +1,13 @@ +//! Traits and types related to Thunderbolt alternate mode + +use embedded_usb_pd::Error; + +use crate::port::{TbtConfig, pd::Pd}; + +/// Trait for ports that support Thunderbolt alternate mode operations +pub trait Thunderbolt: Pd { + /// Set Thunderbolt configuration + fn set_tbt_config(&mut self, config: TbtConfig) -> impl Future>>; + /// Get Thunderbolt configuration + fn get_tbt_config(&mut self) -> impl Future>>; +} diff --git a/type-c-interface/src/port/ucsi.rs b/type-c-interface/src/port/ucsi.rs new file mode 100644 index 00000000..1461970b --- /dev/null +++ b/type-c-interface/src/port/ucsi.rs @@ -0,0 +1,16 @@ +//! Traits and types related to UCSI operation + +use embedded_services::bus_error::BusError; +use embedded_usb_pd::{Error, ucsi::lpm}; + +/// Trait for ports that support UCSI operations +/// +/// This isn't a super type of [`super::Pd`] because it's possible to implement UCSI functionality +/// without directly exposing PD functionality. +pub trait Ucsi: BusError { + /// Execute the given UCSI command + fn execute_ucsi_command( + &mut self, + command: lpm::LocalCommand, + ) -> impl Future, Error>>; +} diff --git a/type-c-interface/src/port/usb.rs b/type-c-interface/src/port/usb.rs new file mode 100644 index 00000000..f30f08e6 --- /dev/null +++ b/type-c-interface/src/port/usb.rs @@ -0,0 +1,14 @@ +//! Traits and types related to USB operation and alt-modes + +use embedded_usb_pd::Error; + +use crate::port::{UsbControlConfig, pd::Pd}; + +/// Trait for controlling USB operation and alt-modes on a port +pub trait Usb: Pd { + /// Set USB control configuration + fn set_usb_control(&mut self, config: UsbControlConfig) -> impl Future>>; + + /// Get USB control configuration + fn get_usb_control(&mut self) -> impl Future>>; +} diff --git a/type-c-interface/src/service/context.rs b/type-c-interface/src/service/context.rs index 5efa9865..fd1df5b3 100644 --- a/type-c-interface/src/service/context.rs +++ b/type-c-interface/src/service/context.rs @@ -82,7 +82,7 @@ impl Context { &self, controller_id: ControllerId, command: InternalCommandData, - ) -> Result, PdError> { + ) -> Result { let node = self .controllers .into_iter() @@ -114,7 +114,7 @@ impl Context { &self, controller_id: ControllerId, command: InternalCommandData, - ) -> Result, PdError> { + ) -> Result { match with_timeout( DEFAULT_TIMEOUT, self.send_controller_command_no_timeout(controller_id, command), @@ -343,10 +343,7 @@ impl Context { } /// Get current controller status - pub async fn get_controller_status( - &self, - controller_id: ControllerId, - ) -> Result, PdError> { + pub async fn get_controller_status(&self, controller_id: ControllerId) -> Result { match self .send_controller_command(controller_id, InternalCommandData::Status) .await? diff --git a/type-c-service/src/driver/tps6699x.rs b/type-c-service/src/driver/tps6699x.rs index f801a6e8..26dd5b6d 100644 --- a/type-c-service/src/driver/tps6699x.rs +++ b/type-c-service/src/driver/tps6699x.rs @@ -466,7 +466,7 @@ impl Controller for Tps6699x<'_, M, B> { self.tps6699x.get_rx_ado(port).await.map_err(Error::from) } - async fn get_controller_status(&mut self) -> Result, Error> { + async fn get_controller_status(&mut self) -> Result> { let boot_flags = self.tps6699x.get_boot_flags().await?; let customer_use = CustomerUse(self.tps6699x.get_customer_use().await?); diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index 4e209f88..f63043c9 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -1,12 +1,11 @@ +use core::cell::RefCell; +use core::future::pending; + use embassy_futures::select::{Either, select}; -use embassy_sync::{ - mutex::Mutex, - pubsub::{DynImmediatePublisher, DynSubscriber}, -}; -use embedded_services::{GlobalRawMutex, debug, error, info, sync::Lockable, trace}; +use embedded_services::{debug, error, event::Receiver, info, trace}; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::PdError as Error; -use power_policy_interface::psu; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use crate::{PortEventStreamer, PortEventVariant}; use type_c_interface::port::event::{PortNotificationSingle, PortStatusChanged}; @@ -15,8 +14,8 @@ use type_c_interface::service::event; pub mod config; pub mod pd; -mod port; mod power; +pub mod registration; mod ucsi; pub mod vdm; @@ -27,8 +26,6 @@ const MAX_SUPPORTED_PORTS: usize = 4; struct State { /// Current port status port_status: [PortStatus; MAX_SUPPORTED_PORTS], - /// Next port to check, this is used to round-robin through ports - port_event_streaming_state: Option, /// UCSI state ucsi: ucsi::State, } @@ -37,28 +34,13 @@ struct State { /// /// Constructing a Service is the first step in using the Type-C service. /// Arguments should be an initialized context -pub struct Service<'a, PSU: Lockable> -where - PSU::Inner: psu::Psu, -{ +pub struct Service<'a> { /// Type-C context pub(crate) context: &'a type_c_interface::service::context::Context, /// Current state - state: Mutex, + state: State, /// Config config: config::Config, - /// Power policy event receiver - /// - /// This is the corresponding publisher to [`Self::power_policy_event_subscriber`], power policy events - /// will be buffered in the channel until they are brought into the event loop with the subscriber. - _power_policy_event_publisher: - embedded_services::broadcaster::immediate::Receiver<'a, power_policy_interface::service::event::Event<'a, PSU>>, - /// Power policy event subscriber - /// - /// This is the corresponding subscriber to [`Self::power_policy_event_publisher`], needs to be a mutex because getting a message - /// from the channel requires mutable access. - power_policy_event_subscriber: - Mutex>>, } /// Power policy events @@ -86,36 +68,29 @@ pub enum Event { PowerPolicy(PowerPolicyEvent), } -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a> Service<'a> { /// Create a new service the given configuration - pub fn create( - config: config::Config, - context: &'a type_c_interface::service::context::Context, - power_policy_publisher: DynImmediatePublisher<'a, power_policy_interface::service::event::Event<'a, PSU>>, - power_policy_subscriber: DynSubscriber<'a, power_policy_interface::service::event::Event<'a, PSU>>, - ) -> Self { + pub fn create(config: config::Config, context: &'a type_c_interface::service::context::Context) -> Self { Self { context, - state: Mutex::new(State::default()), + state: State::default(), config, - _power_policy_event_publisher: power_policy_publisher.into(), - power_policy_event_subscriber: Mutex::new(power_policy_subscriber), } } /// Get the cached port status - pub async fn get_cached_port_status(&self, port_id: GlobalPortId) -> Result { - let state = self.state.lock().await; - Ok(*state.port_status.get(port_id.0 as usize).ok_or(Error::InvalidPort)?) + pub fn get_cached_port_status(&self, port_id: GlobalPortId) -> Result { + Ok(*self + .state + .port_status + .get(port_id.0 as usize) + .ok_or(Error::InvalidPort)?) } /// Set the cached port status - async fn set_cached_port_status(&self, port_id: GlobalPortId, status: PortStatus) -> Result<(), Error> { - let mut state = self.state.lock().await; - *state + fn set_cached_port_status(&mut self, port_id: GlobalPortId, status: PortStatus) -> Result<(), Error> { + *self + .state .port_status .get_mut(port_id.0 as usize) .ok_or(Error::InvalidPort)? = status; @@ -124,12 +99,12 @@ where /// Process events for a specific port async fn process_port_event( - &self, + &mut self, port_id: GlobalPortId, event: PortStatusChanged, status: PortStatus, ) -> Result<(), Error> { - let old_status = self.get_cached_port_status(port_id).await?; + let old_status = self.get_cached_port_status(port_id)?; debug!("Port{}: Event: {:#?}", port_id.0, event); debug!("Port{} Previous status: {:#?}", port_id.0, old_status); @@ -152,14 +127,59 @@ where .await; } - self.set_cached_port_status(port_id, status).await?; + self.set_cached_port_status(port_id, status)?; self.handle_ucsi_port_event(port_id, event, &status).await; Ok(()) } + /// Process the given event + pub async fn process_event(&mut self, event: Event) -> Result<(), Error> { + match event { + Event::PortStatusChanged(port, event_kind, status) => { + trace!("Port{}: Processing port status changed", port.0); + self.process_port_event(port, event_kind, status).await + } + Event::PortNotification(port, notification) => { + // Other port notifications + info!("Port{}: Got port notification: {:?}", port.0, notification); + Ok(()) + } + Event::PowerPolicy(event) => { + trace!("Processing power policy event"); + self.process_power_policy_event(&event).await + } + } + } +} + +/// Event receiver for the Type-C service +pub struct EventReceiver<'a, PowerReceiver: Receiver> { + /// Type-C context + pub(crate) context: &'a type_c_interface::service::context::Context, + /// Next port to check, this is used to round-robin through ports + port_event_streaming_state: Option, + /// Power policy event subscriber + /// + /// Used to allow partial borrows of Self for the call to select + power_policy_event_subscriber: RefCell, +} + +impl<'a, PowerReceiver: Receiver> EventReceiver<'a, PowerReceiver> { + /// Create a new event receiver + pub fn new( + context: &'a type_c_interface::service::context::Context, + power_policy_event_subscriber: PowerReceiver, + ) -> Self { + Self { + context, + port_event_streaming_state: None, + power_policy_event_subscriber: RefCell::new(power_policy_event_subscriber), + } + } + /// Wait for the next event - pub async fn wait_next(&self) -> Result { + pub async fn wait_next(&mut self) -> Result { loop { match select(self.wait_port_flags(), self.wait_power_policy_event()).await { Either::First(mut stream) => { @@ -168,7 +188,7 @@ where .await? { let port_id = GlobalPortId(port_id as u8); - self.state.lock().await.port_event_streaming_state = Some(stream); + self.port_event_streaming_state = Some(stream); match event { PortEventVariant::StatusChanged(status_event) => { // Return a port status changed event @@ -182,7 +202,7 @@ where } } } else { - self.state.lock().await.port_event_streaming_state = None; + self.port_event_streaming_state = None; } } Either::Second(event) => return Ok(event), @@ -190,28 +210,43 @@ where } } - /// Process the given event - pub async fn process_event(&self, event: Event) -> Result<(), Error> { - match event { - Event::PortStatusChanged(port, event_kind, status) => { - trace!("Port{}: Processing port status changed", port.0); - self.process_port_event(port, event_kind, status).await - } - Event::PortNotification(port, notification) => { - // Other port notifications - info!("Port{}: Got port notification: {:?}", port.0, notification); - Ok(()) - } - Event::PowerPolicy(event) => { - trace!("Processing power policy event"); - self.process_power_policy_event(&event).await - } + /// Wait for port flags + async fn wait_port_flags(&self) -> PortEventStreamer { + if let Some(ref streamer) = self.port_event_streaming_state { + // If we have an existing iterator, return it + // Yield first to prevent starving other tasks + embassy_futures::yield_now().await; + *streamer + } else { + // Wait for the next port event and create a streamer + PortEventStreamer::new(self.context.get_unhandled_events().await.into_iter()) } } - /// Combined processing function - pub async fn process_next_event(&self) -> Result<(), Error> { - let event = self.wait_next().await?; - self.process_event(event).await + /// Wait for a power policy event + #[allow(clippy::await_holding_refcell_ref)] + async fn wait_power_policy_event(&self) -> Event { + let Ok(mut subscriber) = self.power_policy_event_subscriber.try_borrow_mut() else { + // This should never happen because this function is not public and is only called from wait_next, which takes &mut self + error!("Attempt to call `wait_power_policy_event` simultaneously"); + return pending().await; + }; + + loop { + match subscriber.wait_next().await { + power_policy_interface::service::event::EventData::Unconstrained(state) => { + return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); + } + power_policy_interface::service::event::EventData::ConsumerDisconnected => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); + } + power_policy_interface::service::event::EventData::ConsumerConnected(_) => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); + } + _ => { + // No other events currently implemented + } + } + } } } diff --git a/type-c-service/src/service/pd.rs b/type-c-service/src/service/pd.rs index b0d940ee..21934fa7 100644 --- a/type-c-service/src/service/pd.rs +++ b/type-c-service/src/service/pd.rs @@ -1,15 +1,10 @@ //! Power Delivery (PD) related functionality. -use embedded_services::sync::Lockable; use embedded_usb_pd::{GlobalPortId, PdError, ado::Ado}; -use power_policy_interface::psu; use super::Service; -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl Service<'_> { /// Get the oldest unhandled PD alert for the given port. /// /// Returns [`None`] if no alerts are pending. diff --git a/type-c-service/src/service/port.rs b/type-c-service/src/service/port.rs deleted file mode 100644 index b1aa047e..00000000 --- a/type-c-service/src/service/port.rs +++ /dev/null @@ -1,20 +0,0 @@ -use super::*; -use crate::PortEventStreamer; - -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ - /// Wait for port flags - pub(super) async fn wait_port_flags(&self) -> PortEventStreamer { - if let Some(ref streamer) = self.state.lock().await.port_event_streaming_state { - // If we have an existing iterator, return it - // Yield first to prevent starving other tasks - embassy_futures::yield_now().await; - *streamer - } else { - // Wait for the next port event and create a streamer - PortEventStreamer::new(self.context.get_unhandled_events().await.into_iter()) - } - } -} diff --git a/type-c-service/src/service/power.rs b/type-c-service/src/service/power.rs index 68e0ec5c..4ec9e380 100644 --- a/type-c-service/src/service/power.rs +++ b/type-c-service/src/service/power.rs @@ -1,40 +1,10 @@ -use embassy_sync::pubsub::WaitResult; use power_policy_interface::service as power_policy; use super::*; -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ - /// Wait for a power policy event - pub(super) async fn wait_power_policy_event(&self) -> Event { - loop { - match self.power_policy_event_subscriber.lock().await.next_message().await { - WaitResult::Lagged(lagged) => { - // Missed some messages, all we can do is log an error - error!("Power policy {} event(s) lagged", lagged); - } - WaitResult::Message(message) => match message { - power_policy_interface::service::event::Event::Unconstrained(state) => { - return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); - } - power_policy_interface::service::event::Event::ConsumerDisconnected(_) => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); - } - power_policy_interface::service::event::Event::ConsumerConnected(_, _) => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); - } - _ => { - // No other events currently implemented - } - }, - } - } - } - +impl Service<'_> { /// Set the unconstrained state for all ports - pub(super) async fn set_unconstrained_all(&self, unconstrained: bool) -> Result<(), Error> { + pub(super) async fn set_unconstrained_all(&mut self, unconstrained: bool) -> Result<(), Error> { for port_index in 0..self.context.get_num_ports() { self.context .set_unconstrained_power(GlobalPortId(port_index as u8), unconstrained) @@ -45,12 +15,10 @@ where /// Processed unconstrained state change pub(super) async fn process_unconstrained_state_change( - &self, + &mut self, unconstrained_state: &power_policy::UnconstrainedState, ) -> Result<(), Error> { if unconstrained_state.unconstrained { - let state = self.state.lock().await; - if unconstrained_state.available > 1 { // There are multiple available unconstrained consumers, set all ports to unconstrained // TODO: determine if we need to consider if we need to consider @@ -61,7 +29,8 @@ where } else { // Only one unconstrained device is present, see if that's one of our ports let num_ports = self.context.get_num_ports(); - let unconstrained_port = state + let unconstrained_port = self + .state .port_status .iter() .take(num_ports) @@ -97,21 +66,19 @@ where } /// Process power policy events - pub(super) async fn process_power_policy_event(&self, message: &PowerPolicyEvent) -> Result<(), Error> { + pub(super) async fn process_power_policy_event(&mut self, message: &PowerPolicyEvent) -> Result<(), Error> { match message { PowerPolicyEvent::Unconstrained(state) => self.process_unconstrained_state_change(state).await, PowerPolicyEvent::ConsumerDisconnected => { - let mut state = self.state.lock().await; - state.ucsi.psu_connected = false; + self.state.ucsi.psu_connected = false; // Notify OPM because this can affect battery charging capability status - self.pend_ucsi_connected_ports(&mut state).await; + self.pend_ucsi_connected_ports().await; Ok(()) } PowerPolicyEvent::ConsumerConnected => { - let mut state = self.state.lock().await; - state.ucsi.psu_connected = true; + self.state.ucsi.psu_connected = true; // Notify OPM because this can affect battery charging capability status - self.pend_ucsi_connected_ports(&mut state).await; + self.pend_ucsi_connected_ports().await; Ok(()) } } diff --git a/type-c-service/src/service/registration.rs b/type-c-service/src/service/registration.rs new file mode 100644 index 00000000..e62bd4c2 --- /dev/null +++ b/type-c-service/src/service/registration.rs @@ -0,0 +1,27 @@ +//! Types and traits related to registration for the type-C service. + +use embedded_services::sync::Lockable; +use type_c_interface::port::pd::Pd; + +/// Trait for type-C service registration. +pub trait Registration<'device> { + type Port: Lockable; + + fn ports(&self) -> &[&'device Self::Port]; +} + +/// A registration implementation based around arrays +pub struct ArrayRegistration<'device, Port: Lockable + 'device, const PORT_COUNT: usize> { + /// Array of registered ports + pub ports: [&'device Port; PORT_COUNT], +} + +impl<'device, Port: Lockable + 'device, const PORT_COUNT: usize> Registration<'device> + for ArrayRegistration<'device, Port, PORT_COUNT> +{ + type Port = Port; + + fn ports(&self) -> &[&'device Self::Port] { + &self.ports + } +} diff --git a/type-c-service/src/service/ucsi.rs b/type-c-service/src/service/ucsi.rs index ed3d2108..62749cc8 100644 --- a/type-c-service/src/service/ucsi.rs +++ b/type-c-service/src/service/ucsi.rs @@ -41,22 +41,19 @@ pub(super) struct State { pub(super) psu_connected: bool, } -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl Service<'_> { /// PPM reset implementation - fn process_ppm_reset(&self, state: &mut State) { + fn process_ppm_reset(&mut self) { debug!("Resetting PPM"); - state.notifications_enabled = NotificationEnable::default(); - state.pending_ports.clear(); - state.valid_battery_charging_capability.clear(); + self.state.ucsi.notifications_enabled = NotificationEnable::default(); + self.state.ucsi.pending_ports.clear(); + self.state.ucsi.valid_battery_charging_capability.clear(); } /// Set notification enable implementation - fn process_set_notification_enable(&self, state: &mut State, enable: NotificationEnable) { + fn process_set_notification_enable(&mut self, enable: NotificationEnable) { debug!("Set Notification Enable: {:?}", enable); - state.notifications_enabled = enable; + self.state.ucsi.notifications_enabled = enable; } /// PPM get capabilities implementation @@ -67,14 +64,10 @@ where ppm::ResponseData::GetCapability(capabilities) } - fn process_ppm_command( - &self, - state: &mut State, - command: &ucsi::ppm::Command, - ) -> Result, PdError> { + fn process_ppm_command(&mut self, command: &ucsi::ppm::Command) -> Result, PdError> { match command { ppm::Command::SetNotificationEnable(enable) => { - self.process_set_notification_enable(state, enable.notification_enable); + self.process_set_notification_enable(enable.notification_enable); Ok(None) } ppm::Command::GetCapability => Ok(Some(self.process_get_capabilities())), @@ -85,12 +78,11 @@ where /// Determine the battery charging capability status for the given port fn determine_battery_charging_capability_status( &self, - state: &mut State, port_id: GlobalPortId, port_status: &PortStatus, ) -> Option { if port_status.power_role == PowerRole::Sink { - if state.valid_battery_charging_capability.contains(&port_id) && !state.psu_connected { + if self.state.ucsi.valid_battery_charging_capability.contains(&port_id) && !self.state.ucsi.psu_connected { // Only run this logic when no PSU is attached to prevent excessive notifications // when new type-C PSUs are attached let power_mw = port_status @@ -110,8 +102,7 @@ where } async fn process_lpm_command( - &self, - state: &mut super::State, + &mut self, command: &ucsi::lpm::GlobalCommand, ) -> Result, PdError> { debug!("Processing LPM command: {:?}", command); @@ -137,9 +128,9 @@ where }))) = response { let raw_port = command.port().0 as usize; - let port_status = state.port_status.get(raw_port).ok_or(PdError::InvalidPort)?; + let port_status = self.state.port_status.get(raw_port).ok_or(PdError::InvalidPort)?; *battery_charging_status = - self.determine_battery_charging_capability_status(&mut state.ucsi, command.port(), port_status); + self.determine_battery_charging_capability_status(command.port(), port_status); states_change.set_battery_charging_status_change(battery_charging_status.is_some()); } @@ -149,9 +140,9 @@ where } } - /// Upate the CCI connector change field based on the current pending port - fn set_cci_connector_change(&self, state: &mut State, cci: &mut GlobalCci) { - if let Some(current_port) = state.pending_ports.front() { + /// Update the CCI connector change field based on the current pending port + fn set_cci_connector_change(&self, cci: &mut GlobalCci) { + if let Some(current_port) = self.state.ucsi.pending_ports.front() { // UCSI connector numbers are 1-based cci.set_connector_change(GlobalPortId(current_port.0 + 1)); } else { @@ -161,10 +152,10 @@ where } /// Acknowledge the current connector change and move to the next if present - async fn ack_connector_change(&self, state: &mut State, cci: &mut GlobalCci) { + async fn ack_connector_change(&mut self, cci: &mut GlobalCci) { // Pop the just acknowledged port and move to the next if present - if let Some(_current_port) = state.pending_ports.pop_front() { - if let Some(next_port) = state.pending_ports.front() { + if let Some(_current_port) = self.state.ucsi.pending_ports.pop_front() { + if let Some(next_port) = self.state.ucsi.pending_ports.front() { debug!("ACK_CCI processed, next pending port: {:?}", next_port); self.context .broadcast_message(Event::UcsiCci(UsciChangeIndicator { @@ -180,12 +171,11 @@ where warn!("Received ACK_CCI with no pending connector changes"); } - self.set_cci_connector_change(state, cci); + self.set_cci_connector_change(cci); } /// Process a UCSI command - pub async fn process_ucsi_command(&self, command: &GlobalCommand) -> UcsiResponse { - let state = &mut self.state.lock().await; + pub async fn process_ucsi_command(&mut self, command: &GlobalCommand) -> UcsiResponse { let mut next_input = Some(PpmInput::Command(command)); let mut response = UcsiResponse { notify_opm: false, @@ -198,7 +188,7 @@ where // Using a loop allows all logic to be centralized loop { let output = if let Some(next_input) = next_input.take() { - state.ucsi.ppm_state_machine.consume(next_input) + self.state.ucsi.ppm_state_machine.consume(next_input) } else { error!("Unexpected end of state machine processing"); return UcsiResponse { @@ -228,12 +218,12 @@ where match command { ucsi::GlobalCommand::PpmCommand(ppm_command) => { response.data = self - .process_ppm_command(&mut state.ucsi, ppm_command) + .process_ppm_command(ppm_command) .map(|inner| inner.map(ResponseData::Ppm)); } ucsi::GlobalCommand::LpmCommand(lpm_command) => { response.data = self - .process_lpm_command(state, lpm_command) + .process_lpm_command(lpm_command) .await .map(|inner| inner.map(ResponseData::Lpm)); } @@ -242,20 +232,20 @@ where // Don't return yet, need to inform state machine that command is complete } PpmOutput::OpmNotifyCommandComplete => { - response.notify_opm = state.ucsi.notifications_enabled.cmd_complete(); + response.notify_opm = self.state.ucsi.notifications_enabled.cmd_complete(); response.cci.set_cmd_complete(true); response.cci.set_error(response.data.is_err()); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } PpmOutput::AckComplete(ack) => { - response.notify_opm = state.ucsi.notifications_enabled.cmd_complete(); + response.notify_opm = self.state.ucsi.notifications_enabled.cmd_complete(); if ack.command_complete() { response.cci.set_ack_command(true); } if ack.connector_change() { - self.ack_connector_change(&mut state.ucsi, &mut response.cci).await; + self.ack_connector_change(&mut response.cci).await; } return response; @@ -263,18 +253,18 @@ where PpmOutput::ResetComplete => { // Resets don't follow the normal command execution flow // So do any reset processing here - self.process_ppm_reset(&mut state.ucsi); + self.process_ppm_reset(); // Don't notify OPM because it'll poll response.notify_opm = false; response.cci = Cci::new_reset_complete(); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } PpmOutput::OpmNotifyBusy => { // Notify if notifications are enabled in general - response.notify_opm = !state.ucsi.notifications_enabled.is_empty(); + response.notify_opm = !self.state.ucsi.notifications_enabled.is_empty(); response.cci.set_busy(true); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } }, @@ -283,7 +273,7 @@ where response.notify_opm = false; response.cci = Cci::default(); response.data = Ok(None); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } } @@ -292,12 +282,11 @@ where /// Handle PD port events, update UCSI state, and generate corresponding UCSI notifications pub(super) async fn handle_ucsi_port_event( - &self, + &mut self, port_id: GlobalPortId, port_event: PortStatusChanged, port_status: &PortStatus, ) { - let state = &mut self.state.lock().await.ucsi; let mut ucsi_event = ConnectorStatusChange::default(); ucsi_event.set_connect_change(port_event.plug_inserted_or_removed()); @@ -319,36 +308,48 @@ where ucsi_event.set_battery_charging_status_change(true); // Power negotiation completed, battery charging capability status is now valid - if state.valid_battery_charging_capability.insert(port_id).is_err() { + if self + .state + .ucsi + .valid_battery_charging_capability + .insert(port_id) + .is_err() + { error!("Valid battery charging capability overflow for port {:?}", port_id); } } if !port_status.is_connected() { // Reset battery charging capability status when disconnected - let _ = state.valid_battery_charging_capability.remove(&port_id); + let _ = self.state.ucsi.valid_battery_charging_capability.remove(&port_id); } - if ucsi_event.filter_enabled(state.notifications_enabled).is_empty() { + if ucsi_event + .filter_enabled(self.state.ucsi.notifications_enabled) + .is_empty() + { trace!("{:?}: event received, but no UCSI notifications enabled", port_id); return; } - self.pend_ucsi_port(state, port_id).await; + self.pend_ucsi_port(port_id).await; } /// Pend UCSI events for all connected ports - pub(super) async fn pend_ucsi_connected_ports(&self, state: &mut super::State) { - for (port_id, port_status) in state.port_status.iter().enumerate() { - if port_status.is_connected() { - self.pend_ucsi_port(&mut state.ucsi, GlobalPortId(port_id as u8)).await; + pub(super) async fn pend_ucsi_connected_ports(&mut self) { + // Panic Safety: i is limited by the length of port_status + #[allow(clippy::indexing_slicing)] + for i in 0..self.state.port_status.len() { + let port_id = GlobalPortId(i as u8); + if self.state.port_status[i].is_connected() { + self.pend_ucsi_port(port_id).await; } } } /// Pend a UCSI event for the given port - async fn pend_ucsi_port(&self, state: &mut State, port_id: GlobalPortId) { - if state.pending_ports.iter().any(|pending| *pending == port_id) { + async fn pend_ucsi_port(&mut self, port_id: GlobalPortId) { + if self.state.ucsi.pending_ports.iter().any(|pending| *pending == port_id) { // Already have a pending event for this port, don't need to process it twice return; } @@ -356,8 +357,8 @@ where // Only notifiy the OPM if we don't have any pending events // Once the OPM starts processing events, the next pending port will be sent as part // of the CCI response to the ACK_CC_CI command. See [`Self::set_cci_connector_change`] - let notify_opm = state.pending_ports.is_empty(); - if state.pending_ports.push_back(port_id).is_ok() { + let notify_opm = self.state.ucsi.pending_ports.is_empty(); + if self.state.ucsi.pending_ports.push_back(port_id).is_ok() { self.context .broadcast_message(Event::UcsiCci(UsciChangeIndicator { port: port_id, diff --git a/type-c-service/src/service/vdm.rs b/type-c-service/src/service/vdm.rs index ed2cfc15..d9259070 100644 --- a/type-c-service/src/service/vdm.rs +++ b/type-c-service/src/service/vdm.rs @@ -1,16 +1,11 @@ //! VDM (Vendor Defined Messages) related functionality. -use embedded_services::sync::Lockable; use embedded_usb_pd::{GlobalPortId, PdError}; -use power_policy_interface::psu; use type_c_interface::port::{AttnVdm, OtherVdm}; use super::Service; -impl Service<'_, PSU> -where - PSU::Inner: psu::Psu, -{ +impl Service<'_> { /// Get the other vdm for the given port pub async fn get_other_vdm(&self, port_id: GlobalPortId) -> Result { self.context.get_other_vdm(port_id).await diff --git a/type-c-service/src/task.rs b/type-c-service/src/task.rs index cdc999e2..ce446aaa 100644 --- a/type-c-service/src/task.rs +++ b/type-c-service/src/task.rs @@ -1,39 +1,31 @@ -use core::future::Future; -use embedded_services::{error, event, info, sync::Lockable}; +use embedded_services::{ + error, + event::{self, Receiver}, + info, + sync::Lockable, +}; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; -use power_policy_interface::psu; +use crate::{ + service::{EventReceiver, Service}, + wrapper::ControllerWrapper, +}; -use crate::{service::Service, wrapper::ControllerWrapper}; - -/// Task to run the Type-C service, takes a closure to customize the event loop -pub async fn task_closure< - 'a, - M, - D, - PSU: Lockable, - S, - V, - Fut: Future, - F: Fn(&'a Service<'a, PSU>) -> Fut, - const N: usize, ->( - service: &'static Service<'a, PSU>, - wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], - cfu_client: &'a cfu_service::CfuClient, - f: F, +/// Task to run the Type-C service, running the default event loop +pub async fn task, const N: usize>( + service: &'static impl Lockable>, + mut event_receiver: EventReceiver<'static, PowerReceiver>, + wrappers: [&'static ControllerWrapper<'static, M, D, S, V>; N], + cfu_client: &'static cfu_service::CfuClient, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, - D: Lockable, - PSU::Inner: psu::Psu, + D: embedded_services::sync::Lockable, S: event::Sender, V: crate::wrapper::FwOfferValidator, - D::Inner: type_c_interface::port::Controller, + ::Inner: type_c_interface::port::Controller, { info!("Starting type-c task"); - // TODO: move this service to use the new power policy event subscribers and receivers - // See https://github.com/OpenDevicePartnership/embedded-services/issues/742 - for controller_wrapper in wrappers { if controller_wrapper.register(cfu_client).is_err() { error!("Failed to register a controller"); @@ -42,27 +34,16 @@ pub async fn task_closure< } loop { - f(service).await; - } -} + let event = match event_receiver.wait_next().await { + Ok(event) => event, + Err(e) => { + error!("Error waiting for event: {:#?}", e); + continue; + } + }; -/// Task to run the Type-C service, running the default event loop -pub async fn task<'a, M, D, PSU: Lockable, S, V, const N: usize>( - service: &'static Service<'a, PSU>, - wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], - cfu_client: &'a cfu_service::CfuClient, -) where - M: embassy_sync::blocking_mutex::raw::RawMutex, - D: embedded_services::sync::Lockable, - PSU::Inner: psu::Psu, - S: event::Sender, - V: crate::wrapper::FwOfferValidator, - ::Inner: type_c_interface::port::Controller, -{ - task_closure(service, wrappers, cfu_client, |service: &Service<'_, PSU>| async { - if let Err(e) = service.process_next_event().await { + if let Err(e) = service.lock().await.process_event(event).await { error!("Type-C service processing error: {:#?}", e); } - }) - .await; + } } diff --git a/type-c-service/src/wrapper/message.rs b/type-c-service/src/wrapper/message.rs index aa4c5c03..663af709 100644 --- a/type-c-service/src/wrapper/message.rs +++ b/type-c-service/src/wrapper/message.rs @@ -1,10 +1,9 @@ //! [`crate::wrapper::ControllerWrapper`] message types -use embedded_services::{GlobalRawMutex, ipc::deferred}; -use embedded_usb_pd::{LocalPortId, ado::Ado}; +use embedded_usb_pd::{LocalPortId, PdError, ado::Ado}; -use type_c_interface::{ - port::event::{PortNotificationSingle, PortStatusChanged}, - port::{self, DpStatus, PortStatus}, +use type_c_interface::port::{ + DpStatus, PortResponseData, PortStatus, + event::{PortNotificationSingle, PortStatusChanged}, }; /// Port status changed event data @@ -35,6 +34,14 @@ pub struct EventPowerPolicyCommand { pub request: power_policy_interface::psu::CommandData, } +/// Port command event data +pub struct EventPortCommand { + /// Port ID + pub port: LocalPortId, + /// Deferred request + pub request: type_c_interface::port::PortCommandData, +} + /// CFU events #[derive(Copy, Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -48,7 +55,7 @@ pub enum EventCfu { } /// Wrapper events -pub enum Event<'a> { +pub enum Event { /// Port status changed PortStatusChanged(EventPortStatusChanged), /// Port notification @@ -56,7 +63,7 @@ pub enum Event<'a> { /// Power policy command received PowerPolicyCommand(EventPowerPolicyCommand), /// Command from TCPM - ControllerCommand(deferred::Request<'a, GlobalRawMutex, port::Command, port::Response<'static>>), + PortCommand(EventPortCommand), /// Cfu event CfuEvent(EventCfu), } @@ -92,11 +99,11 @@ pub struct OutputPowerPolicyCommand { } /// Controller command output data -pub struct OutputControllerCommand<'a> { - /// Controller request - pub request: deferred::Request<'a, GlobalRawMutex, port::Command, port::Response<'static>>, +pub struct OutputControllerCommand { + /// Port ID + pub port: LocalPortId, /// Response - pub response: port::Response<'static>, + pub response: Result, } pub mod vdm { @@ -141,7 +148,7 @@ pub struct OutputDpStatusChanged { } /// [`crate::wrapper::ControllerWrapper`] output -pub enum Output<'a> { +pub enum Output { /// No-op when nothing specific is needed Nop, /// Port status changed @@ -153,7 +160,7 @@ pub enum Output<'a> { /// Power policy command received PowerPolicyCommand(OutputPowerPolicyCommand), /// TPCM command response - ControllerCommand(OutputControllerCommand<'a>), + ControllerCommand(OutputControllerCommand), /// CFU recovery tick CfuRecovery, /// CFU response diff --git a/type-c-service/src/wrapper/mod.rs b/type-c-service/src/wrapper/mod.rs index 91506384..655babbc 100644 --- a/type-c-service/src/wrapper/mod.rs +++ b/type-c-service/src/wrapper/mod.rs @@ -21,7 +21,7 @@ use core::ops::DerefMut; use crate::wrapper::backing::{ControllerState, PortState}; use cfu_service::CfuClient; -use embassy_futures::select::{Either, Either5, select, select_array, select5}; +use embassy_futures::select::{Either, Either4, select, select_array, select4}; use embassy_sync::blocking_mutex::raw::RawMutex; use embassy_sync::mutex::Mutex; use embassy_sync::signal::Signal; @@ -34,7 +34,7 @@ use embedded_usb_pd::ado::Ado; use embedded_usb_pd::{Error, LocalPortId, PdError}; use crate::wrapper::message::*; -use crate::wrapper::proxy::PowerProxyReceiver; +use crate::wrapper::proxy::{PortProxyCommandData, PortProxyResponseData, PowerProxyReceiver}; use crate::{PortEventStreamer, PortEventVariant}; pub mod backing; @@ -88,8 +88,8 @@ pub struct ControllerWrapper< sw_status_event: Signal, /// General config config: config::Config, - /// Power proxy receivers - power_proxy_receivers: &'device [Mutex>], + /// Port proxy receivers + port_proxy_receivers: &'device [Mutex>], /// Port proxies pub ports: &'device [backing::Port<'device, M, S>], /// Controller state @@ -127,7 +127,7 @@ where ))), registration: backing.registration, sw_status_event: Signal::new(), - power_proxy_receivers: backing.power_receivers, + port_proxy_receivers: backing.power_receivers, ports: backing.ports, controller_state: Mutex::new(backing::ControllerState::default()), } @@ -213,7 +213,7 @@ where controller: &mut D::Inner, local_port_id: LocalPortId, status_event: PortStatusChanged, - ) -> Result, Error<::BusError>> { + ) -> Result::BusError>> { let global_port_id = self .registration .pd_controller @@ -374,24 +374,23 @@ where } /// Wait for the next event - pub async fn wait_next(&self) -> Result, Error<::BusError>> { + pub async fn wait_next(&self) -> Result::BusError>> { // This loop is to ensure that if we finish streaming events we go back to waiting for the next port event loop { let event = { let controller_state = self.controller_state.lock().await; let mut controller = self.controller.lock().await; // DROP SAFETY: Select over drop safe functions - select5( + select4( self.wait_port_pending(&controller_state, &mut controller), - self.wait_power_command(), - self.registration.pd_controller.receive(), + self.wait_port_command(), self.wait_cfu_command(), self.wait_sink_ready_timeout(), ) .await }; match event { - Either5::First(stream) => { + Either4::First(stream) => { let mut stream = stream?; if let Some((port_index, event)) = stream .next::::BusError>, _, _>(async |port_index| { @@ -435,12 +434,16 @@ where self.controller_state.lock().await.port_event_streaming_state = None; } } - Either5::Second((port, request)) => { - return Ok(Event::PowerPolicyCommand(EventPowerPolicyCommand { port, request })); - } - Either5::Third(request) => return Ok(Event::ControllerCommand(request)), - Either5::Fourth(event) => return Ok(Event::CfuEvent(event)), - Either5::Fifth(port) => { + Either4::Second((port, request)) => match request { + PortProxyCommandData::Power(request) => { + return Ok(Event::PowerPolicyCommand(EventPowerPolicyCommand { port, request })); + } + PortProxyCommandData::Port(request) => { + return Ok(Event::PortCommand(EventPortCommand { port, request })); + } + }, + Either4::Third(event) => return Ok(Event::CfuEvent(event)), + Either4::Fourth(port) => { // Sink ready timeout event debug!("Port{0}: Sink ready timeout", port.0); self.ports @@ -459,12 +462,12 @@ where } /// Process a port notification - async fn process_port_notification<'b>( + async fn process_port_notification( &self, controller: &mut D::Inner, port: LocalPortId, notification: PortNotificationSingle, - ) -> Result, Error<::BusError>> { + ) -> Result::BusError>> { match notification { PortNotificationSingle::Alert => { let ado = controller.get_pd_alert(port).await?; @@ -493,10 +496,7 @@ where /// Top-level processing function /// Only call this fn from one place in a loop. Otherwise a deadlock could occur. - pub async fn process_event<'b>( - &self, - event: Event<'b>, - ) -> Result, Error<::BusError>> { + pub async fn process_event(&self, event: Event) -> Result::BusError>> { let mut controller = self.controller.lock().await; let mut controller_state = self.controller_state.lock().await; match event { @@ -510,11 +510,14 @@ where .await; Ok(Output::PowerPolicyCommand(OutputPowerPolicyCommand { port, response })) } - Event::ControllerCommand(request) => { + Event::PortCommand(request) => { let response = self - .process_pd_command(&mut controller_state, &mut controller, &request.command) + .process_port_command(&mut controller_state, &mut controller, &request) .await; - Ok(Output::ControllerCommand(OutputControllerCommand { request, response })) + Ok(Output::ControllerCommand(OutputControllerCommand { + port: request.port, + response, + })) } Event::CfuEvent(event) => match event { EventCfu::Request(request) => { @@ -537,7 +540,7 @@ where } /// Event loop finalize - pub async fn finalize<'b>(&self, output: Output<'b>) -> Result<(), Error<::BusError>> { + pub async fn finalize(&self, output: Output) -> Result<(), Error<::BusError>> { match output { Output::Nop => Ok(()), Output::PortStatusChanged(OutputPortStatusChanged { @@ -548,17 +551,23 @@ where Output::PdAlert(OutputPdAlert { port, ado }) => self.finalize_pd_alert(port, ado).await, Output::Vdm(vdm) => self.finalize_vdm(vdm).await.map_err(Error::Pd), Output::PowerPolicyCommand(OutputPowerPolicyCommand { port, response }) => { - self.power_proxy_receivers + self.port_proxy_receivers .get(port.0 as usize) .ok_or(Error::Pd(PdError::InvalidPort))? .lock() .await - .send(response) + .send(PortProxyResponseData::Power(response)) .await; Ok(()) } - Output::ControllerCommand(OutputControllerCommand { request, response }) => { - request.respond(response); + Output::ControllerCommand(OutputControllerCommand { port, response }) => { + self.port_proxy_receivers + .get(port.0 as usize) + .ok_or(Error::Pd(PdError::InvalidPort))? + .lock() + .await + .send(PortProxyResponseData::Port(response)) + .await; Ok(()) } Output::CfuRecovery => { @@ -577,9 +586,9 @@ where } /// Combined processing and finialization function - pub async fn process_and_finalize_event<'b>( + pub async fn process_and_finalize_event( &self, - event: Event<'b>, + event: Event, ) -> Result<(), Error<::BusError>> { let output = self.process_event(event).await?; self.finalize(output).await diff --git a/type-c-service/src/wrapper/pd.rs b/type-c-service/src/wrapper/pd.rs index a789f3b3..37794b88 100644 --- a/type-c-service/src/wrapper/pd.rs +++ b/type-c-service/src/wrapper/pd.rs @@ -4,11 +4,10 @@ use embassy_sync::pubsub::WaitResult; use embassy_time::{Duration, Timer}; use embedded_services::debug; use embedded_usb_pd::constants::{T_PS_TRANSITION_EPR_MS, T_PS_TRANSITION_SPR_MS}; -use embedded_usb_pd::ucsi::{self, lpm}; +use embedded_usb_pd::ucsi::lpm; use power_policy_interface::psu::{self, PsuState}; -use type_c_interface::port; use type_c_interface::port::Cached; -use type_c_interface::port::{InternalResponseData, Response}; +use type_c_interface::port::{self, PortResponseData}; use super::*; @@ -160,33 +159,26 @@ where } /// Handle a port command - async fn process_port_command( + pub(super) async fn process_port_command( &self, controller_state: &mut ControllerState, controller: &mut D::Inner, - command: &port::PortCommand, - ) -> Response<'static> { + command: &EventPortCommand, + ) -> Result { if controller_state.fw_update_state.in_progress() { debug!("FW update in progress, ignoring port command"); - return port::Response::Port(Err(PdError::Busy)); + return Err(PdError::Busy); } - let local_port = if let Ok(port) = self.registration.pd_controller.lookup_local_port(command.port) { - port - } else { + let Some(port) = self.ports.get(command.port.0 as usize) else { debug!("Invalid port: {:?}", command.port); - return port::Response::Port(Err(PdError::InvalidPort)); - }; - - let Some(port) = self.ports.get(local_port.0 as usize) else { - debug!("Invalid port: {:?}", command.port); - return port::Response::Port(Err(PdError::InvalidPort)); + return Err(PdError::InvalidPort); }; let mut port_state = port.state.lock().await; - port::Response::Port(match command.data { + match command.request { port::PortCommandData::PortStatus(cached) => { - self.process_get_port_status(controller, &mut port_state, local_port, cached) + self.process_get_port_status(controller, &mut port_state, command.port, cached) .await } port::PortCommandData::ClearEvents => { @@ -194,7 +186,7 @@ where Ok(port::PortResponseData::ClearEvents(event)) } port::PortCommandData::RetimerFwUpdateGetState => { - match controller.get_rt_fw_update_status(local_port).await { + match controller.get_rt_fw_update_status(command.port).await { Ok(status) => Ok(port::PortResponseData::RtFwUpdateStatus(status)), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -203,7 +195,7 @@ where } } port::PortCommandData::RetimerFwUpdateSetState => { - match controller.set_rt_fw_update_state(local_port).await { + match controller.set_rt_fw_update_state(command.port).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -212,7 +204,7 @@ where } } port::PortCommandData::RetimerFwUpdateClearState => { - match controller.clear_rt_fw_update_state(local_port).await { + match controller.clear_rt_fw_update_state(command.port).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -220,42 +212,31 @@ where }, } } - port::PortCommandData::SetRetimerCompliance => match controller.set_rt_compliance(local_port).await { + port::PortCommandData::SetRetimerCompliance => match controller.set_rt_compliance(command.port).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), Error::Pd(e) => Err(e), }, }, - port::PortCommandData::ReconfigureRetimer => match controller.reconfigure_retimer(local_port).await { + port::PortCommandData::ReconfigureRetimer => match controller.reconfigure_retimer(command.port).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), Error::Pd(e) => Err(e), }, }, - port::PortCommandData::GetPdAlert => match self.process_get_pd_alert(&mut port_state, local_port).await { + port::PortCommandData::GetPdAlert => match self.process_get_pd_alert(&mut port_state, command.port).await { Ok(alert) => Ok(port::PortResponseData::PdAlert(alert)), Err(e) => Err(e), }, port::PortCommandData::SetMaxSinkVoltage(voltage_mv) => { - match self.registration.pd_controller.lookup_local_port(command.port) { - Ok(local_port) => { - let psu_state = port.proxy.lock().await.psu_state; - self.process_set_max_sink_voltage( - controller, - &mut port_state, - &psu_state, - local_port, - voltage_mv, - ) - .await - } - Err(e) => Err(e), - } + let psu_state = port.proxy.lock().await.psu_state; + self.process_set_max_sink_voltage(controller, &mut port_state, &psu_state, command.port, voltage_mv) + .await } port::PortCommandData::SetUnconstrainedPower(unconstrained) => { - match controller.set_unconstrained_power(local_port, unconstrained).await { + match controller.set_unconstrained_power(command.port, unconstrained).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -263,16 +244,17 @@ where }, } } - port::PortCommandData::ClearDeadBatteryFlag => match controller.clear_dead_battery_flag(local_port).await { + port::PortCommandData::ClearDeadBatteryFlag => match controller.clear_dead_battery_flag(command.port).await + { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), Error::Pd(e) => Err(e), }, }, - port::PortCommandData::GetOtherVdm => match controller.get_other_vdm(local_port).await { + port::PortCommandData::GetOtherVdm => match controller.get_other_vdm(command.port).await { Ok(vdm) => { - debug!("Port{}: Other VDM: {:?}", local_port.0, vdm); + debug!("Port{}: Other VDM: {:?}", command.port.0, vdm); Ok(port::PortResponseData::OtherVdm(vdm)) } Err(e) => match e { @@ -280,9 +262,9 @@ where Error::Pd(e) => Err(e), }, }, - port::PortCommandData::GetAttnVdm => match controller.get_attn_vdm(local_port).await { + port::PortCommandData::GetAttnVdm => match controller.get_attn_vdm(command.port).await { Ok(vdm) => { - debug!("Port{}: Attention VDM: {:?}", local_port.0, vdm); + debug!("Port{}: Attention VDM: {:?}", command.port.0, vdm); Ok(port::PortResponseData::AttnVdm(vdm)) } Err(e) => match e { @@ -290,7 +272,7 @@ where Error::Pd(e) => Err(e), }, }, - port::PortCommandData::SendVdm(tx_vdm) => match controller.send_vdm(local_port, tx_vdm).await { + port::PortCommandData::SendVdm(tx_vdm) => match controller.send_vdm(command.port, tx_vdm).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -298,7 +280,7 @@ where }, }, port::PortCommandData::SetUsbControl(config) => { - match controller.set_usb_control(local_port, config).await { + match controller.set_usb_control(command.port, config).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -306,9 +288,9 @@ where }, } } - port::PortCommandData::GetDpStatus => match controller.get_dp_status(local_port).await { + port::PortCommandData::GetDpStatus => match controller.get_dp_status(command.port).await { Ok(status) => { - debug!("Port{}: DP Status: {:?}", local_port.0, status); + debug!("Port{}: DP Status: {:?}", command.port.0, status); Ok(port::PortResponseData::DpStatus(status)) } Err(e) => match e { @@ -316,29 +298,31 @@ where Error::Pd(e) => Err(e), }, }, - port::PortCommandData::SetDpConfig(config) => match controller.set_dp_config(local_port, config).await { + port::PortCommandData::SetDpConfig(config) => match controller.set_dp_config(command.port, config).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), Error::Pd(e) => Err(e), }, }, - port::PortCommandData::ExecuteDrst => match controller.execute_drst(local_port).await { - Ok(()) => Ok(port::PortResponseData::Complete), - Err(e) => match e { - Error::Bus(_) => Err(PdError::Failed), - Error::Pd(e) => Err(e), - }, - }, - port::PortCommandData::SetTbtConfig(config) => match controller.set_tbt_config(local_port, config).await { + port::PortCommandData::ExecuteDrst => match controller.execute_drst(command.port).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), Error::Pd(e) => Err(e), }, }, + port::PortCommandData::SetTbtConfig(config) => { + match controller.set_tbt_config(command.port, config).await { + Ok(()) => Ok(port::PortResponseData::Complete), + Err(e) => match e { + Error::Bus(_) => Err(PdError::Failed), + Error::Pd(e) => Err(e), + }, + } + } port::PortCommandData::SetPdStateMachineConfig(config) => { - match controller.set_pd_state_machine_config(local_port, config).await { + match controller.set_pd_state_machine_config(command.port, config).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -347,7 +331,7 @@ where } } port::PortCommandData::SetTypeCStateMachineConfig(state) => { - match controller.set_type_c_state_machine_config(local_port, state).await { + match controller.set_type_c_state_machine_config(command.port, state).await { Ok(()) => Ok(port::PortResponseData::Complete), Err(e) => match e { Error::Bus(_) => Err(PdError::Failed), @@ -357,68 +341,13 @@ where } port::PortCommandData::ExecuteUcsiCommand(command_data) => Ok(port::PortResponseData::UcsiResponse( controller - .execute_ucsi_command(lpm::Command::new(local_port, command_data)) + .execute_ucsi_command(lpm::Command::new(command.port, command_data)) .await .map_err(|e| match e { Error::Bus(_) => PdError::Failed, Error::Pd(e) => e, }), )), - }) - } - - async fn process_controller_command( - &self, - controller_state: &mut ControllerState, - controller: &mut D::Inner, - command: &port::InternalCommandData, - ) -> Response<'static> { - if controller_state.fw_update_state.in_progress() { - debug!("FW update in progress, ignoring controller command"); - return port::Response::Controller(Err(PdError::Busy)); - } - - match command { - port::InternalCommandData::Status => { - let status = controller.get_controller_status().await; - port::Response::Controller(status.map(InternalResponseData::Status).map_err(|_| PdError::Failed)) - } - port::InternalCommandData::SyncState => { - let result = self.sync_state_internal(controller).await; - port::Response::Controller( - result - .map(|_| InternalResponseData::Complete) - .map_err(|_| PdError::Failed), - ) - } - port::InternalCommandData::Reset => { - let result = controller.reset_controller().await; - port::Response::Controller( - result - .map(|_| InternalResponseData::Complete) - .map_err(|_| PdError::Failed), - ) - } - } - } - - /// Handle a PD controller command - pub(super) async fn process_pd_command( - &self, - controller_state: &mut ControllerState, - controller: &mut D::Inner, - command: &port::Command, - ) -> Response<'static> { - match command { - port::Command::Port(command) => self.process_port_command(controller_state, controller, command).await, - port::Command::Controller(command) => { - self.process_controller_command(controller_state, controller, command) - .await - } - port::Command::Lpm(_) => port::Response::Ucsi(ucsi::Response { - cci: ucsi::cci::Cci::new_error(), - data: None, - }), } } } diff --git a/type-c-service/src/wrapper/power.rs b/type-c-service/src/wrapper/power.rs index fa9c33d3..1c7a0541 100644 --- a/type-c-service/src/wrapper/power.rs +++ b/type-c-service/src/wrapper/power.rs @@ -11,6 +11,7 @@ use power_policy_interface::psu::{CommandData, InternalResponseData, ResponseDat use crate::wrapper::backing::ControllerState; use crate::wrapper::config::UnconstrainedSink; +use crate::wrapper::proxy::PortProxyCommandData; use super::*; @@ -97,13 +98,13 @@ where Ok(()) } - /// Wait for a power command + /// Wait for a port command /// /// Returns (local port ID, deferred request) /// DROP SAFETY: Call to a select over drop safe futures - pub(super) async fn wait_power_command(&self) -> (LocalPortId, CommandData) { + pub(super) async fn wait_port_command(&self) -> (LocalPortId, PortProxyCommandData) { let mut futures = heapless::Vec::<_, MAX_SUPPORTED_PORTS>::new(); - for receiver in self.power_proxy_receivers { + for receiver in self.port_proxy_receivers { // TODO: check this at compile time if futures .push(async { diff --git a/type-c-service/src/wrapper/proxy.rs b/type-c-service/src/wrapper/proxy.rs index 170fd55d..c10f99d0 100644 --- a/type-c-service/src/wrapper/proxy.rs +++ b/type-c-service/src/wrapper/proxy.rs @@ -1,11 +1,34 @@ use embassy_sync::blocking_mutex::raw::RawMutex; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embedded_services::named::Named; +use embedded_usb_pd::PdError; use power_policy_interface::psu::{CommandData as PolicyCommandData, InternalResponseData as PolicyResponseData, Psu}; +use type_c_interface::port::{PortCommandData, PortResponseData}; + +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum PortProxyCommandData { + Power(PolicyCommandData), + Port(PortCommandData), +} + +pub enum PortProxyResponseData { + Power(PolicyResponseData), + Port(Result), +} + +impl From for Result<(), power_policy_interface::psu::Error> { + fn from(value: PortProxyResponseData) -> Self { + match value { + PortProxyResponseData::Power(response) => response?.complete_or_err(), + PortProxyResponseData::Port(_) => Err(power_policy_interface::psu::Error::InvalidResponse), + } + } +} pub struct PowerProxyChannel { - command_channel: Channel, - response_channel: Channel, + command_channel: Channel, + response_channel: Channel, } impl PowerProxyChannel { @@ -19,8 +42,8 @@ impl PowerProxyChannel { pub fn get_device_components( &self, ) -> ( - DynamicSender<'_, PolicyCommandData>, - DynamicReceiver<'_, PolicyResponseData>, + DynamicSender<'_, PortProxyCommandData>, + DynamicReceiver<'_, PortProxyResponseData>, ) { (self.command_channel.dyn_sender(), self.response_channel.dyn_receiver()) } @@ -34,30 +57,30 @@ impl PowerProxyChannel { } pub struct PowerProxyReceiver<'a> { - sender: DynamicSender<'a, PolicyResponseData>, - receiver: DynamicReceiver<'a, PolicyCommandData>, + sender: DynamicSender<'a, PortProxyResponseData>, + receiver: DynamicReceiver<'a, PortProxyCommandData>, } impl<'a> PowerProxyReceiver<'a> { pub fn new( - receiver: DynamicReceiver<'a, PolicyCommandData>, - sender: DynamicSender<'a, PolicyResponseData>, + receiver: DynamicReceiver<'a, PortProxyCommandData>, + sender: DynamicSender<'a, PortProxyResponseData>, ) -> Self { Self { receiver, sender } } - pub async fn receive(&mut self) -> PolicyCommandData { + pub async fn receive(&mut self) -> PortProxyCommandData { self.receiver.receive().await } - pub async fn send(&mut self, response: PolicyResponseData) { + pub async fn send(&mut self, response: PortProxyResponseData) { self.sender.send(response).await; } } pub struct PowerProxyDevice<'a> { - sender: DynamicSender<'a, PolicyCommandData>, - receiver: DynamicReceiver<'a, PolicyResponseData>, + sender: DynamicSender<'a, PortProxyCommandData>, + receiver: DynamicReceiver<'a, PortProxyResponseData>, /// Per-port PSU state pub(crate) psu_state: power_policy_interface::psu::State, name: &'static str, @@ -66,8 +89,8 @@ pub struct PowerProxyDevice<'a> { impl<'a> PowerProxyDevice<'a> { pub fn new( name: &'static str, - sender: DynamicSender<'a, PolicyCommandData>, - receiver: DynamicReceiver<'a, PolicyResponseData>, + sender: DynamicSender<'a, PortProxyCommandData>, + receiver: DynamicReceiver<'a, PortProxyResponseData>, ) -> Self { Self { name, @@ -77,7 +100,7 @@ impl<'a> PowerProxyDevice<'a> { } } - async fn execute(&mut self, command: PolicyCommandData) -> PolicyResponseData { + async fn execute(&mut self, command: PortProxyCommandData) -> PortProxyResponseData { self.sender.send(command).await; self.receiver.receive().await } @@ -85,25 +108,31 @@ impl<'a> PowerProxyDevice<'a> { impl<'a> Psu for PowerProxyDevice<'a> { async fn disconnect(&mut self) -> Result<(), power_policy_interface::psu::Error> { - self.execute(PolicyCommandData::Disconnect).await?.complete_or_err() + self.execute(PortProxyCommandData::Power(PolicyCommandData::Disconnect)) + .await + .into() } async fn connect_provider( &mut self, capability: power_policy_interface::capability::ProviderPowerCapability, ) -> Result<(), power_policy_interface::psu::Error> { - self.execute(PolicyCommandData::ConnectAsProvider(capability)) - .await? - .complete_or_err() + self.execute(PortProxyCommandData::Power(PolicyCommandData::ConnectAsProvider( + capability, + ))) + .await + .into() } async fn connect_consumer( &mut self, capability: power_policy_interface::capability::ConsumerPowerCapability, ) -> Result<(), power_policy_interface::psu::Error> { - self.execute(PolicyCommandData::ConnectAsConsumer(capability)) - .await? - .complete_or_err() + self.execute(PortProxyCommandData::Power(PolicyCommandData::ConnectAsConsumer( + capability, + ))) + .await + .into() } fn state(&self) -> &power_policy_interface::psu::State {