Skip to content

Make futures-util and futures-sink optional #4197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions embassy-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/
target = "thumbv7em-none-eabi"

[features]
default = ["futures-util", "futures-sink"]
std = []
turbowakers = []
futures-sink = ["dep:futures-sink"]
futures-util = ["dep:futures-util"]

[dependencies]
defmt = { version = "0.3", optional = true }
log = { version = "0.4.14", optional = true }

futures-sink = { version = "0.3", default-features = false, features = [] }
futures-util = { version = "0.3.17", default-features = false }
futures-sink = { version = "0.3", default-features = false, features = [], optional = true }
futures-util = { version = "0.3.17", default-features = false, optional = true }
critical-section = "1.1"
heapless = "0.8"
cfg-if = "1.0.0"
Expand Down
2 changes: 2 additions & 0 deletions embassy-sync/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ where
}
}

#[cfg(feature = "futures-util")]
impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
where
M: RawMutex,
Expand Down Expand Up @@ -782,6 +783,7 @@ where
}
}

#[cfg(feature = "futures-util")]
impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N>
where
M: RawMutex,
Expand Down
5 changes: 3 additions & 2 deletions embassy-sync/src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: us
/// Don't worry, we won't run out.
/// If a million messages were published every second, then the ID's would run out in about 584942 years.
next_message_id: u64,
/// Collection of wakers for Subscribers that are waiting.
/// Collection of wakers for Subscribers that are waiting.
subscriber_wakers: MultiWakerRegistration<SUBS>,
/// Collection of wakers for Publishers that are waiting.
/// Collection of wakers for Publishers that are waiting.
publisher_wakers: MultiWakerRegistration<PUBS>,
/// The amount of subscribers that are active
subscriber_count: usize,
Expand Down Expand Up @@ -760,6 +760,7 @@ mod tests {
}

#[futures_test::test]
#[cfg(feature = "futures-sink")]
async fn publisher_sink() {
use futures_util::{SinkExt, StreamExt};

Expand Down
4 changes: 4 additions & 0 deletions embassy-sync/src/pubsub/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
}

/// Create a [`futures::Sink`] adapter for this publisher.
#[cfg(feature = "futures-sink")]
#[inline]
pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
PubSink { publ: self, fut: None }
Expand Down Expand Up @@ -227,6 +228,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
}
}

#[cfg(feature = "futures-sink")]
#[must_use = "Sinks do nothing unless polled"]
/// [`futures_sink::Sink`] adapter for [`Pub`].
pub struct PubSink<'a, 'p, PSB, T>
Expand All @@ -238,6 +240,7 @@ where
fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
}

#[cfg(feature = "futures-sink")]
impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
where
PSB: PubSubBehavior<T> + ?Sized,
Expand All @@ -258,6 +261,7 @@ where
}
}

#[cfg(feature = "futures-sink")]
impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
where
PSB: PubSubBehavior<T> + ?Sized,
Expand Down
1 change: 1 addition & 0 deletions embassy-sync/src/pubsub/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}

/// Warning: The stream implementation ignores lag results and returns all messages.
/// This might miss some messages without you knowing it.
#[cfg(feature = "futures-util")]
impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
type Item = T;

Expand Down
11 changes: 8 additions & 3 deletions embassy-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ target = "x86_64-unknown-linux-gnu"
features = ["defmt", "std"]

[features]
default = ["futures-util"]

## Display the time since startup next to defmt log messages.
## At most 1 `defmt-timestamp-uptime-*` feature can be used.
## `defmt-timestamp-uptime` is provided for backwards compatibility (provides the same format as `uptime-us`).
Expand All @@ -36,6 +38,9 @@ defmt-timestamp-uptime-ts = ["defmt"]
defmt-timestamp-uptime-tms = ["defmt"]
defmt-timestamp-uptime-tus = ["defmt"]

## Disable `futures-util` support
futures-util = ["dep:futures-util"]

#! ### Time Drivers

#! Usually, time drivers are defined by a HAL, or a companion crate to the HAL. For `std` and WASM
Expand All @@ -53,10 +58,10 @@ wasm = ["dep:wasm-bindgen", "dep:js-sys", "dep:wasm-timer", "tick-hz-1_000_000",

#! By default embassy-time uses a timer queue implementation that is faster but depends on `embassy-executor`.
#! It will panic if you try to await any timer when using another executor.
#!
#!
#! Alternatively, you can choose to use a "generic" timer queue implementation that works on any executor.
#! To enable it, enable any of the features below.
#!
#!
#! The features also set how many timers are used for the generic queue. At most one
#! `generic-queue-*` feature can be enabled. If none is enabled, `queue_integrated` is used.
#!
Expand Down Expand Up @@ -427,7 +432,7 @@ embedded-hal-02 = { package = "embedded-hal", version = "0.2.6" }
embedded-hal-1 = { package = "embedded-hal", version = "1.0" }
embedded-hal-async = { version = "1.0" }

futures-util = { version = "0.3.17", default-features = false }
futures-util = { version = "0.3.17", default-features = false, optional = true }
critical-section = "1.1"
cfg-if = "1.0.0"

Expand Down
6 changes: 4 additions & 2 deletions embassy-time/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use core::future::{poll_fn, Future};
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_util::stream::FusedStream;
use futures_util::Stream;
#[cfg(feature = "futures-util")]
use futures_util::{stream::FusedStream, Stream};

use crate::{Duration, Instant};

Expand Down Expand Up @@ -277,6 +277,7 @@ impl Ticker {

impl Unpin for Ticker {}

#[cfg(feature = "futures-util")]
impl Stream for Ticker {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -291,6 +292,7 @@ impl Stream for Ticker {
}
}

#[cfg(feature = "futures-util")]
impl FusedStream for Ticker {
fn is_terminated(&self) -> bool {
// `Ticker` keeps yielding values until dropped, it never terminates.
Expand Down