From 21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 22 Aug 2022 21:46:09 +0200 Subject: split `embassy-util` into `embassy-futures`, `embassy-sync`. --- .github/workflows/rust.yml | 2 +- .vscode/settings.json | 2 +- embassy-boot/boot/Cargo.toml | 2 +- embassy-boot/nrf/Cargo.toml | 2 +- embassy-boot/stm32/Cargo.toml | 2 +- embassy-cortex-m/Cargo.toml | 2 +- embassy-embedded-hal/Cargo.toml | 2 +- embassy-embedded-hal/src/shared_bus/asynch/i2c.rs | 8 +- embassy-embedded-hal/src/shared_bus/asynch/spi.rs | 8 +- .../src/shared_bus/blocking/i2c.rs | 6 +- .../src/shared_bus/blocking/spi.rs | 6 +- embassy-futures/Cargo.toml | 14 + embassy-futures/src/fmt.rs | 228 ++++++++ embassy-futures/src/lib.rs | 12 + embassy-futures/src/select.rs | 230 ++++++++ embassy-futures/src/yield_now.rs | 25 + embassy-lora/Cargo.toml | 2 +- embassy-lora/src/stm32wl/mod.rs | 2 +- embassy-net/Cargo.toml | 2 +- embassy-net/src/stack.rs | 2 +- embassy-nrf/Cargo.toml | 4 +- embassy-nrf/src/buffered_uarte.rs | 2 +- embassy-nrf/src/gpiote.rs | 2 +- embassy-nrf/src/qdec.rs | 2 +- embassy-nrf/src/qspi.rs | 2 +- embassy-nrf/src/rng.rs | 2 +- embassy-nrf/src/saadc.rs | 2 +- embassy-nrf/src/spim.rs | 2 +- embassy-nrf/src/temp.rs | 2 +- embassy-nrf/src/time_driver.rs | 4 +- embassy-nrf/src/timer.rs | 6 +- embassy-nrf/src/twim.rs | 2 +- embassy-nrf/src/uarte.rs | 2 +- embassy-nrf/src/usb.rs | 2 +- embassy-rp/Cargo.toml | 2 +- embassy-rp/src/gpio.rs | 2 +- embassy-rp/src/timer.rs | 4 +- embassy-stm32/Cargo.toml | 4 +- embassy-stm32/src/dcmi.rs | 2 +- embassy-stm32/src/dma/bdma.rs | 2 +- embassy-stm32/src/dma/dma.rs | 2 +- embassy-stm32/src/dma/gpdma.rs | 2 +- embassy-stm32/src/eth/v1/mod.rs | 2 +- embassy-stm32/src/eth/v2/mod.rs | 2 +- embassy-stm32/src/exti.rs | 2 +- embassy-stm32/src/i2c/v2.rs | 2 +- embassy-stm32/src/rng.rs | 2 +- embassy-stm32/src/sdmmc/mod.rs | 6 +- embassy-stm32/src/time_driver.rs | 4 +- embassy-stm32/src/usart/buffered.rs | 2 +- embassy-stm32/src/usb/usb.rs | 2 +- embassy-sync/Cargo.toml | 34 ++ embassy-sync/build.rs | 29 + embassy-sync/src/blocking_mutex/mod.rs | 189 +++++++ embassy-sync/src/blocking_mutex/raw.rs | 149 ++++++ embassy-sync/src/channel/mod.rs | 5 + embassy-sync/src/channel/mpmc.rs | 596 +++++++++++++++++++++ embassy-sync/src/channel/pubsub/mod.rs | 542 +++++++++++++++++++ embassy-sync/src/channel/pubsub/publisher.rs | 182 +++++++ embassy-sync/src/channel/pubsub/subscriber.rs | 152 ++++++ embassy-sync/src/channel/signal.rs | 100 ++++ embassy-sync/src/fmt.rs | 228 ++++++++ embassy-sync/src/lib.rs | 17 + embassy-sync/src/mutex.rs | 167 ++++++ embassy-sync/src/pipe.rs | 551 +++++++++++++++++++ embassy-sync/src/ring_buffer.rs | 146 +++++ embassy-sync/src/waitqueue/mod.rs | 7 + embassy-sync/src/waitqueue/multi_waker.rs | 33 ++ embassy-sync/src/waitqueue/waker.rs | 92 ++++ embassy-usb-hid/Cargo.toml | 2 +- embassy-usb-ncm/Cargo.toml | 2 +- embassy-usb-serial/Cargo.toml | 2 +- embassy-usb-serial/src/lib.rs | 2 +- embassy-usb/Cargo.toml | 2 +- embassy-usb/src/lib.rs | 2 +- embassy-util/Cargo.toml | 34 -- embassy-util/build.rs | 29 - embassy-util/src/blocking_mutex/mod.rs | 189 ------- embassy-util/src/blocking_mutex/raw.rs | 149 ------ embassy-util/src/channel/mod.rs | 5 - embassy-util/src/channel/mpmc.rs | 596 --------------------- embassy-util/src/channel/pubsub/mod.rs | 542 ------------------- embassy-util/src/channel/pubsub/publisher.rs | 182 ------- embassy-util/src/channel/pubsub/subscriber.rs | 152 ------ embassy-util/src/channel/signal.rs | 100 ---- embassy-util/src/fmt.rs | 228 -------- embassy-util/src/lib.rs | 23 - embassy-util/src/mutex.rs | 167 ------ embassy-util/src/pipe.rs | 551 ------------------- embassy-util/src/ring_buffer.rs | 146 ----- embassy-util/src/select.rs | 230 -------- embassy-util/src/waitqueue/mod.rs | 7 - embassy-util/src/waitqueue/multi_waker.rs | 33 -- embassy-util/src/waitqueue/waker.rs | 92 ---- embassy-util/src/yield_now.rs | 25 - examples/boot/application/nrf/Cargo.toml | 2 +- examples/boot/application/stm32f3/Cargo.toml | 2 +- examples/boot/application/stm32f7/Cargo.toml | 2 +- examples/boot/application/stm32h7/Cargo.toml | 2 +- examples/boot/application/stm32l0/Cargo.toml | 2 +- examples/boot/application/stm32l1/Cargo.toml | 2 +- examples/boot/application/stm32l4/Cargo.toml | 2 +- examples/boot/application/stm32wl/Cargo.toml | 2 +- examples/nrf-rtos-trace/Cargo.toml | 4 +- examples/nrf/Cargo.toml | 3 +- examples/nrf/src/bin/channel.rs | 4 +- examples/nrf/src/bin/channel_sender_receiver.rs | 4 +- examples/nrf/src/bin/mutex.rs | 4 +- examples/nrf/src/bin/pubsub.rs | 4 +- examples/nrf/src/bin/uart_split.rs | 4 +- examples/nrf/src/bin/usb_ethernet.rs | 4 +- examples/nrf/src/bin/usb_hid_keyboard.rs | 4 +- examples/rp/Cargo.toml | 2 +- examples/std/Cargo.toml | 2 +- examples/stm32f0/Cargo.toml | 2 +- examples/stm32f1/Cargo.toml | 2 +- examples/stm32f2/Cargo.toml | 2 +- examples/stm32f3/Cargo.toml | 2 +- examples/stm32f3/src/bin/button_events.rs | 4 +- examples/stm32f4/Cargo.toml | 2 +- examples/stm32f7/Cargo.toml | 2 +- examples/stm32g0/Cargo.toml | 2 +- examples/stm32g4/Cargo.toml | 2 +- examples/stm32h7/Cargo.toml | 2 +- examples/stm32h7/src/bin/signal.rs | 2 +- examples/stm32h7/src/bin/usart_split.rs | 4 +- examples/stm32l0/Cargo.toml | 2 +- examples/stm32l1/Cargo.toml | 2 +- examples/stm32l4/Cargo.toml | 2 +- examples/stm32l5/Cargo.toml | 2 +- examples/stm32l5/src/bin/usb_ethernet.rs | 4 +- examples/stm32u5/Cargo.toml | 2 +- examples/stm32wb/Cargo.toml | 2 +- examples/stm32wl/Cargo.toml | 2 +- examples/stm32wl/src/bin/subghz.rs | 2 +- examples/wasm/Cargo.toml | 2 +- tests/rp/Cargo.toml | 2 +- tests/stm32/Cargo.toml | 2 +- 138 files changed, 3854 insertions(+), 3605 deletions(-) create mode 100644 embassy-futures/Cargo.toml create mode 100644 embassy-futures/src/fmt.rs create mode 100644 embassy-futures/src/lib.rs create mode 100644 embassy-futures/src/select.rs create mode 100644 embassy-futures/src/yield_now.rs create mode 100644 embassy-sync/Cargo.toml create mode 100644 embassy-sync/build.rs create mode 100644 embassy-sync/src/blocking_mutex/mod.rs create mode 100644 embassy-sync/src/blocking_mutex/raw.rs create mode 100644 embassy-sync/src/channel/mod.rs create mode 100644 embassy-sync/src/channel/mpmc.rs create mode 100644 embassy-sync/src/channel/pubsub/mod.rs create mode 100644 embassy-sync/src/channel/pubsub/publisher.rs create mode 100644 embassy-sync/src/channel/pubsub/subscriber.rs create mode 100644 embassy-sync/src/channel/signal.rs create mode 100644 embassy-sync/src/fmt.rs create mode 100644 embassy-sync/src/lib.rs create mode 100644 embassy-sync/src/mutex.rs create mode 100644 embassy-sync/src/pipe.rs create mode 100644 embassy-sync/src/ring_buffer.rs create mode 100644 embassy-sync/src/waitqueue/mod.rs create mode 100644 embassy-sync/src/waitqueue/multi_waker.rs create mode 100644 embassy-sync/src/waitqueue/waker.rs delete mode 100644 embassy-util/Cargo.toml delete mode 100644 embassy-util/build.rs delete mode 100644 embassy-util/src/blocking_mutex/mod.rs delete mode 100644 embassy-util/src/blocking_mutex/raw.rs delete mode 100644 embassy-util/src/channel/mod.rs delete mode 100644 embassy-util/src/channel/mpmc.rs delete mode 100644 embassy-util/src/channel/pubsub/mod.rs delete mode 100644 embassy-util/src/channel/pubsub/publisher.rs delete mode 100644 embassy-util/src/channel/pubsub/subscriber.rs delete mode 100644 embassy-util/src/channel/signal.rs delete mode 100644 embassy-util/src/fmt.rs delete mode 100644 embassy-util/src/lib.rs delete mode 100644 embassy-util/src/mutex.rs delete mode 100644 embassy-util/src/pipe.rs delete mode 100644 embassy-util/src/ring_buffer.rs delete mode 100644 embassy-util/src/select.rs delete mode 100644 embassy-util/src/waitqueue/mod.rs delete mode 100644 embassy-util/src/waitqueue/multi_waker.rs delete mode 100644 embassy-util/src/waitqueue/waker.rs delete mode 100644 embassy-util/src/yield_now.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d76e5ced..d2e8e316 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -69,4 +69,4 @@ jobs: steps: - uses: actions/checkout@v2 - name: Test - run: cd embassy-util && cargo test + run: cd embassy-sync && cargo test diff --git a/.vscode/settings.json b/.vscode/settings.json index d6ce75c9..5e9e5179 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -18,7 +18,7 @@ "rust-analyzer.linkedProjects": [ // Declare for the target you wish to develop //"embassy-executor/Cargo.toml", - //"embassy-util/Cargo.toml", + //"embassy-sync/Cargo.toml", "examples/nrf/Cargo.toml", // "examples/rp/Cargo.toml", // "examples/std/Cargo.toml", diff --git a/embassy-boot/boot/Cargo.toml b/embassy-boot/boot/Cargo.toml index 9c2e72be..a42f8868 100644 --- a/embassy-boot/boot/Cargo.toml +++ b/embassy-boot/boot/Cargo.toml @@ -14,7 +14,7 @@ target = "thumbv7em-none-eabi" [dependencies] defmt = { version = "0.3", optional = true } log = { version = "0.4", optional = true } -embassy-util = { version = "0.1.0", path = "../../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync" } embedded-storage = "0.3.0" embedded-storage-async = "0.3.0" diff --git a/embassy-boot/nrf/Cargo.toml b/embassy-boot/nrf/Cargo.toml index b06e8102..234393e7 100644 --- a/embassy-boot/nrf/Cargo.toml +++ b/embassy-boot/nrf/Cargo.toml @@ -15,7 +15,7 @@ target = "thumbv7em-none-eabi" [dependencies] defmt = { version = "0.3", optional = true } -embassy-util = { path = "../../embassy-util" } +embassy-sync = { path = "../../embassy-sync" } embassy-nrf = { path = "../../embassy-nrf", default-features = false, features = ["nightly"] } embassy-boot = { path = "../boot", default-features = false } cortex-m = { version = "0.7.6" } diff --git a/embassy-boot/stm32/Cargo.toml b/embassy-boot/stm32/Cargo.toml index d8f49253..ad4657e0 100644 --- a/embassy-boot/stm32/Cargo.toml +++ b/embassy-boot/stm32/Cargo.toml @@ -17,7 +17,7 @@ defmt = { version = "0.3", optional = true } defmt-rtt = { version = "0.3", optional = true } log = { version = "0.4", optional = true } -embassy-util = { path = "../../embassy-util" } +embassy-sync = { path = "../../embassy-sync" } embassy-stm32 = { path = "../../embassy-stm32", default-features = false, features = ["nightly"] } embassy-boot = { path = "../boot", default-features = false } cortex-m = { version = "0.7.6" } diff --git a/embassy-cortex-m/Cargo.toml b/embassy-cortex-m/Cargo.toml index 1f16da31..7efced66 100644 --- a/embassy-cortex-m/Cargo.toml +++ b/embassy-cortex-m/Cargo.toml @@ -35,7 +35,7 @@ prio-bits-8 = [] defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-executor = { version = "0.1.0", path = "../embassy-executor"} embassy-macros = { version = "0.1.0", path = "../embassy-macros"} embassy-hal-common = { version = "0.1.0", path = "../embassy-hal-common"} diff --git a/embassy-embedded-hal/Cargo.toml b/embassy-embedded-hal/Cargo.toml index 86666687..46268072 100644 --- a/embassy-embedded-hal/Cargo.toml +++ b/embassy-embedded-hal/Cargo.toml @@ -16,7 +16,7 @@ std = [] nightly = ["embedded-hal-async", "embedded-storage-async"] [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embedded-hal-02 = { package = "embedded-hal", version = "0.2.6", features = ["unproven"] } embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8" } embedded-hal-async = { version = "0.1.0-alpha.1", optional = true } diff --git a/embassy-embedded-hal/src/shared_bus/asynch/i2c.rs b/embassy-embedded-hal/src/shared_bus/asynch/i2c.rs index dc483b82..0bc6afd9 100644 --- a/embassy-embedded-hal/src/shared_bus/asynch/i2c.rs +++ b/embassy-embedded-hal/src/shared_bus/asynch/i2c.rs @@ -4,8 +4,8 @@ //! //! ```rust //! use embassy_embedded_hal::shared_bus::i2c::I2cDevice; -//! use embassy_util::mutex::Mutex; -//! use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; +//! use embassy_sync::mutex::Mutex; +//! use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; //! //! static I2C_BUS: StaticCell>> = StaticCell::new(); //! let config = twim::Config::default(); @@ -24,8 +24,8 @@ //! ``` use core::future::Future; -use embassy_util::blocking_mutex::raw::RawMutex; -use embassy_util::mutex::Mutex; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::mutex::Mutex; use embedded_hal_async::i2c; use crate::shared_bus::I2cDeviceError; diff --git a/embassy-embedded-hal/src/shared_bus/asynch/spi.rs b/embassy-embedded-hal/src/shared_bus/asynch/spi.rs index bb419d6a..c95b59ef 100644 --- a/embassy-embedded-hal/src/shared_bus/asynch/spi.rs +++ b/embassy-embedded-hal/src/shared_bus/asynch/spi.rs @@ -4,8 +4,8 @@ //! //! ```rust //! use embassy_embedded_hal::shared_bus::spi::SpiDevice; -//! use embassy_util::mutex::Mutex; -//! use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; +//! use embassy_sync::mutex::Mutex; +//! use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; //! //! static SPI_BUS: StaticCell>> = StaticCell::new(); //! let mut config = spim::Config::default(); @@ -27,8 +27,8 @@ //! ``` use core::future::Future; -use embassy_util::blocking_mutex::raw::RawMutex; -use embassy_util::mutex::Mutex; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::mutex::Mutex; use embedded_hal_1::digital::blocking::OutputPin; use embedded_hal_1::spi::ErrorType; use embedded_hal_async::spi; diff --git a/embassy-embedded-hal/src/shared_bus/blocking/i2c.rs b/embassy-embedded-hal/src/shared_bus/blocking/i2c.rs index 837312e8..a611e2d2 100644 --- a/embassy-embedded-hal/src/shared_bus/blocking/i2c.rs +++ b/embassy-embedded-hal/src/shared_bus/blocking/i2c.rs @@ -4,7 +4,7 @@ //! //! ```rust //! use embassy_embedded_hal::shared_bus::blocking::i2c::I2cDevice; -//! use embassy_util::blocking_mutex::{NoopMutex, raw::NoopRawMutex}; +//! use embassy_sync::blocking_mutex::{NoopMutex, raw::NoopRawMutex}; //! //! static I2C_BUS: StaticCell>>> = StaticCell::new(); //! let irq = interrupt::take!(SPIM0_SPIS0_TWIM0_TWIS0_SPI0_TWI0); @@ -18,8 +18,8 @@ use core::cell::RefCell; -use embassy_util::blocking_mutex::raw::RawMutex; -use embassy_util::blocking_mutex::Mutex; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::blocking_mutex::Mutex; use embedded_hal_1::i2c::blocking::{I2c, Operation}; use embedded_hal_1::i2c::ErrorType; diff --git a/embassy-embedded-hal/src/shared_bus/blocking/spi.rs b/embassy-embedded-hal/src/shared_bus/blocking/spi.rs index a48d9183..23845d88 100644 --- a/embassy-embedded-hal/src/shared_bus/blocking/spi.rs +++ b/embassy-embedded-hal/src/shared_bus/blocking/spi.rs @@ -4,7 +4,7 @@ //! //! ```rust //! use embassy_embedded_hal::shared_bus::blocking::spi::SpiDevice; -//! use embassy_util::blocking_mutex::{NoopMutex, raw::NoopRawMutex}; +//! use embassy_sync::blocking_mutex::{NoopMutex, raw::NoopRawMutex}; //! //! static SPI_BUS: StaticCell>>> = StaticCell::new(); //! let irq = interrupt::take!(SPIM3); @@ -20,8 +20,8 @@ use core::cell::RefCell; -use embassy_util::blocking_mutex::raw::RawMutex; -use embassy_util::blocking_mutex::Mutex; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::blocking_mutex::Mutex; use embedded_hal_1::digital::blocking::OutputPin; use embedded_hal_1::spi; use embedded_hal_1::spi::blocking::SpiBusFlush; diff --git a/embassy-futures/Cargo.toml b/embassy-futures/Cargo.toml new file mode 100644 index 00000000..e564f5a9 --- /dev/null +++ b/embassy-futures/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "embassy-futures" +version = "0.1.0" +edition = "2021" + +[package.metadata.embassy_docs] +src_base = "https://github.com/embassy-rs/embassy/blob/embassy-futures-v$VERSION/embassy-futures/src/" +src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-futures/src/" +features = ["nightly"] +target = "thumbv7em-none-eabi" + +[dependencies] +defmt = { version = "0.3", optional = true } +log = { version = "0.4.14", optional = true } diff --git a/embassy-futures/src/fmt.rs b/embassy-futures/src/fmt.rs new file mode 100644 index 00000000..f8bb0a03 --- /dev/null +++ b/embassy-futures/src/fmt.rs @@ -0,0 +1,228 @@ +#![macro_use] +#![allow(unused_macros)] + +#[cfg(all(feature = "defmt", feature = "log"))] +compile_error!("You may not enable both `defmt` and `log` features."); + +macro_rules! assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert!($($x)*); + } + }; +} + +macro_rules! assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_eq!($($x)*); + } + }; +} + +macro_rules! assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_ne!($($x)*); + } + }; +} + +macro_rules! debug_assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert!($($x)*); + } + }; +} + +macro_rules! debug_assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_eq!($($x)*); + } + }; +} + +macro_rules! debug_assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_ne!($($x)*); + } + }; +} + +macro_rules! todo { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::todo!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::todo!($($x)*); + } + }; +} + +macro_rules! unreachable { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::unreachable!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::unreachable!($($x)*); + } + }; +} + +macro_rules! panic { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::panic!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::panic!($($x)*); + } + }; +} + +macro_rules! trace { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::trace!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::trace!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! debug { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::debug!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::debug!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! info { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::info!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::info!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! warn { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::warn!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::warn!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! error { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::error!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::error!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +#[cfg(feature = "defmt")] +macro_rules! unwrap { + ($($x:tt)*) => { + ::defmt::unwrap!($($x)*) + }; +} + +#[cfg(not(feature = "defmt"))] +macro_rules! unwrap { + ($arg:expr) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); + } + } + }; + ($arg:expr, $($msg:expr),+ $(,)? ) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); + } + } + } +} + +#[cfg(feature = "defmt-timestamp-uptime")] +defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct NoneError; + +pub trait Try { + type Ok; + type Error; + fn into_result(self) -> Result; +} + +impl Try for Option { + type Ok = T; + type Error = NoneError; + + #[inline] + fn into_result(self) -> Result { + self.ok_or(NoneError) + } +} + +impl Try for Result { + type Ok = T; + type Error = E; + + #[inline] + fn into_result(self) -> Self { + self + } +} diff --git a/embassy-futures/src/lib.rs b/embassy-futures/src/lib.rs new file mode 100644 index 00000000..48c9c857 --- /dev/null +++ b/embassy-futures/src/lib.rs @@ -0,0 +1,12 @@ +#![no_std] +#![doc = include_str!("../../README.md")] +#![warn(missing_docs)] + +// This mod MUST go first, so that the others see its macros. +pub(crate) mod fmt; + +mod select; +mod yield_now; + +pub use select::*; +pub use yield_now::*; diff --git a/embassy-futures/src/select.rs b/embassy-futures/src/select.rs new file mode 100644 index 00000000..8cecb7fa --- /dev/null +++ b/embassy-futures/src/select.rs @@ -0,0 +1,230 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Result for [`select`]. +#[derive(Debug, Clone)] +pub enum Either { + /// First future finished first. + First(A), + /// Second future finished first. + Second(B), +} + +/// Wait for one of two futures to complete. +/// +/// This function returns a new future which polls all the futures. +/// When one of them completes, it will complete with its result value. +/// +/// The other future is dropped. +pub fn select(a: A, b: B) -> Select +where + A: Future, + B: Future, +{ + Select { a, b } +} + +/// Future for the [`select`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Select { + a: A, + b: B, +} + +impl Unpin for Select {} + +impl Future for Select +where + A: Future, + B: Future, +{ + type Output = Either; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + let a = unsafe { Pin::new_unchecked(&mut this.a) }; + let b = unsafe { Pin::new_unchecked(&mut this.b) }; + if let Poll::Ready(x) = a.poll(cx) { + return Poll::Ready(Either::First(x)); + } + if let Poll::Ready(x) = b.poll(cx) { + return Poll::Ready(Either::Second(x)); + } + Poll::Pending + } +} + +// ==================================================================== + +/// Result for [`select3`]. +#[derive(Debug, Clone)] +pub enum Either3 { + /// First future finished first. + First(A), + /// Second future finished first. + Second(B), + /// Third future finished first. + Third(C), +} + +/// Same as [`select`], but with more futures. +pub fn select3(a: A, b: B, c: C) -> Select3 +where + A: Future, + B: Future, + C: Future, +{ + Select3 { a, b, c } +} + +/// Future for the [`select3`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Select3 { + a: A, + b: B, + c: C, +} + +impl Future for Select3 +where + A: Future, + B: Future, + C: Future, +{ + type Output = Either3; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + let a = unsafe { Pin::new_unchecked(&mut this.a) }; + let b = unsafe { Pin::new_unchecked(&mut this.b) }; + let c = unsafe { Pin::new_unchecked(&mut this.c) }; + if let Poll::Ready(x) = a.poll(cx) { + return Poll::Ready(Either3::First(x)); + } + if let Poll::Ready(x) = b.poll(cx) { + return Poll::Ready(Either3::Second(x)); + } + if let Poll::Ready(x) = c.poll(cx) { + return Poll::Ready(Either3::Third(x)); + } + Poll::Pending + } +} + +// ==================================================================== + +/// Result for [`select4`]. +#[derive(Debug, Clone)] +pub enum Either4 { + /// First future finished first. + First(A), + /// Second future finished first. + Second(B), + /// Third future finished first. + Third(C), + /// Fourth future finished first. + Fourth(D), +} + +/// Same as [`select`], but with more futures. +pub fn select4(a: A, b: B, c: C, d: D) -> Select4 +where + A: Future, + B: Future, + C: Future, + D: Future, +{ + Select4 { a, b, c, d } +} + +/// Future for the [`select4`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Select4 { + a: A, + b: B, + c: C, + d: D, +} + +impl Future for Select4 +where + A: Future, + B: Future, + C: Future, + D: Future, +{ + type Output = Either4; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { self.get_unchecked_mut() }; + let a = unsafe { Pin::new_unchecked(&mut this.a) }; + let b = unsafe { Pin::new_unchecked(&mut this.b) }; + let c = unsafe { Pin::new_unchecked(&mut this.c) }; + let d = unsafe { Pin::new_unchecked(&mut this.d) }; + if let Poll::Ready(x) = a.poll(cx) { + return Poll::Ready(Either4::First(x)); + } + if let Poll::Ready(x) = b.poll(cx) { + return Poll::Ready(Either4::Second(x)); + } + if let Poll::Ready(x) = c.poll(cx) { + return Poll::Ready(Either4::Third(x)); + } + if let Poll::Ready(x) = d.poll(cx) { + return Poll::Ready(Either4::Fourth(x)); + } + Poll::Pending + } +} + +// ==================================================================== + +/// Future for the [`select_all`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SelectAll { + inner: [Fut; N], +} + +/// Creates a new future which will select over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready. Upon +/// completion the item resolved will be returned, along with the index of the +/// future that was ready. +/// +/// # Panics +/// +/// This function will panic if the array specified contains no items. +pub fn select_all(arr: [Fut; N]) -> SelectAll { + assert!(N > 0); + SelectAll { inner: arr } +} + +impl Future for SelectAll { + type Output = (Fut::Output, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: Since `self` is pinned, `inner` cannot move. Since `inner` cannot move, + // its elements also cannot move. Therefore it is safe to access `inner` and pin + // references to the contained futures. + let item = unsafe { + self.get_unchecked_mut() + .inner + .iter_mut() + .enumerate() + .find_map(|(i, f)| match Pin::new_unchecked(f).poll(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + }) + }; + + match item { + Some((idx, res)) => Poll::Ready((res, idx)), + None => Poll::Pending, + } + } +} diff --git a/embassy-futures/src/yield_now.rs b/embassy-futures/src/yield_now.rs new file mode 100644 index 00000000..1ebecb91 --- /dev/null +++ b/embassy-futures/src/yield_now.rs @@ -0,0 +1,25 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Yield from the current task once, allowing other tasks to run. +pub fn yield_now() -> impl Future { + YieldNowFuture { yielded: false } +} + +struct YieldNowFuture { + yielded: bool, +} + +impl Future for YieldNowFuture { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.yielded { + Poll::Ready(()) + } else { + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} diff --git a/embassy-lora/Cargo.toml b/embassy-lora/Cargo.toml index c7435ab3..9d5e7aed 100644 --- a/embassy-lora/Cargo.toml +++ b/embassy-lora/Cargo.toml @@ -25,7 +25,7 @@ defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } embassy-time = { version = "0.1.0", path = "../embassy-time" } -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-stm32 = { version = "0.1.0", path = "../embassy-stm32", default-features = false, optional = true } embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8" } embedded-hal-async = { version = "0.1.0-alpha.1" } diff --git a/embassy-lora/src/stm32wl/mod.rs b/embassy-lora/src/stm32wl/mod.rs index 4a4c5cfb..374c8532 100644 --- a/embassy-lora/src/stm32wl/mod.rs +++ b/embassy-lora/src/stm32wl/mod.rs @@ -12,7 +12,7 @@ use embassy_stm32::subghz::{ Status, SubGhz, TcxoMode, TcxoTrim, Timeout, TxParams, }; use embassy_stm32::Peripheral; -use embassy_util::channel::signal::Signal; +use embassy_sync::channel::signal::Signal; use lorawan_device::async_device::radio::{Bandwidth, PhyRxTx, RfConfig, RxQuality, SpreadingFactor, TxConfig}; use lorawan_device::async_device::Timings; diff --git a/embassy-net/Cargo.toml b/embassy-net/Cargo.toml index 9f9bb226..2143f36d 100644 --- a/embassy-net/Cargo.toml +++ b/embassy-net/Cargo.toml @@ -38,7 +38,7 @@ defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } embassy-time = { version = "0.1.0", path = "../embassy-time" } -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embedded-io = { version = "0.3.0", features = [ "async" ] } managed = { version = "0.8.0", default-features = false, features = [ "map" ] } diff --git a/embassy-net/src/stack.rs b/embassy-net/src/stack.rs index 4b6a7ae2..8d2dd4bc 100644 --- a/embassy-net/src/stack.rs +++ b/embassy-net/src/stack.rs @@ -2,8 +2,8 @@ use core::cell::UnsafeCell; use core::future::Future; use core::task::{Context, Poll}; +use embassy_sync::waitqueue::WakerRegistration; use embassy_time::{Instant, Timer}; -use embassy_util::waitqueue::WakerRegistration; use futures::future::poll_fn; use futures::pin_mut; use heapless::Vec; diff --git a/embassy-nrf/Cargo.toml b/embassy-nrf/Cargo.toml index 0ef7f5bb..186c73a5 100644 --- a/embassy-nrf/Cargo.toml +++ b/embassy-nrf/Cargo.toml @@ -18,7 +18,7 @@ flavors = [ time = ["dep:embassy-time"] -defmt = ["dep:defmt", "embassy-executor/defmt", "embassy-util/defmt", "embassy-usb?/defmt", "embedded-io?/defmt", "embassy-embedded-hal/defmt"] +defmt = ["dep:defmt", "embassy-executor/defmt", "embassy-sync/defmt", "embassy-usb?/defmt", "embedded-io?/defmt", "embassy-embedded-hal/defmt"] # Enable nightly-only features nightly = ["embedded-hal-1", "embedded-hal-async", "embassy-usb", "embedded-storage-async", "dep:embedded-io", "embassy-embedded-hal/nightly"] @@ -66,7 +66,7 @@ _gpio-p1 = [] [dependencies] embassy-executor = { version = "0.1.0", path = "../embassy-executor", optional = true } embassy-time = { version = "0.1.0", path = "../embassy-time", optional = true } -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-cortex-m = { version = "0.1.0", path = "../embassy-cortex-m", features = ["prio-bits-3"]} embassy-hal-common = {version = "0.1.0", path = "../embassy-hal-common" } embassy-embedded-hal = {version = "0.1.0", path = "../embassy-embedded-hal" } diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 08dfcbcf..62af544a 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -21,7 +21,7 @@ use core::task::Poll; use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::ring_buffer::RingBuffer; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::WakerRegistration; +use embassy_sync::waitqueue::WakerRegistration; use futures::future::poll_fn; // Re-export SVD variants to allow user to directly set values pub use pac::uarte0::{baudrate::BAUDRATE_A as Baudrate, config::PARITY_A as Parity}; diff --git a/embassy-nrf/src/gpiote.rs b/embassy-nrf/src/gpiote.rs index cf49b0db..b5203570 100644 --- a/embassy-nrf/src/gpiote.rs +++ b/embassy-nrf/src/gpiote.rs @@ -3,7 +3,7 @@ use core::future::Future; use core::task::{Context, Poll}; use embassy_hal_common::{impl_peripheral, Peripheral, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::gpio::sealed::Pin as _; diff --git a/embassy-nrf/src/qdec.rs b/embassy-nrf/src/qdec.rs index 83f2916b..762e0971 100644 --- a/embassy-nrf/src/qdec.rs +++ b/embassy-nrf/src/qdec.rs @@ -3,7 +3,7 @@ use core::task::Poll; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::gpio::sealed::Pin as _; diff --git a/embassy-nrf/src/qspi.rs b/embassy-nrf/src/qspi.rs index 6d7ebb4b..c97cb165 100644 --- a/embassy-nrf/src/qspi.rs +++ b/embassy-nrf/src/qspi.rs @@ -526,7 +526,7 @@ cfg_if::cfg_if! { } pub(crate) mod sealed { - use embassy_util::waitqueue::AtomicWaker; + use embassy_sync::waitqueue::AtomicWaker; use super::*; diff --git a/embassy-nrf/src/rng.rs b/embassy-nrf/src/rng.rs index 7aad561b..42da51d0 100644 --- a/embassy-nrf/src/rng.rs +++ b/embassy-nrf/src/rng.rs @@ -4,7 +4,7 @@ use core::task::Poll; use embassy_hal_common::drop::OnDrop; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::interrupt::InterruptExt; diff --git a/embassy-nrf/src/saadc.rs b/embassy-nrf/src/saadc.rs index f2ef46d8..7dc66349 100644 --- a/embassy-nrf/src/saadc.rs +++ b/embassy-nrf/src/saadc.rs @@ -4,7 +4,7 @@ use core::sync::atomic::{compiler_fence, Ordering}; use core::task::Poll; use embassy_hal_common::{impl_peripheral, into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use pac::{saadc, SAADC}; use saadc::ch::config::{GAIN_A, REFSEL_A, RESP_A, TACQ_A}; diff --git a/embassy-nrf/src/spim.rs b/embassy-nrf/src/spim.rs index 57c0c14c..be2fc02f 100644 --- a/embassy-nrf/src/spim.rs +++ b/embassy-nrf/src/spim.rs @@ -363,7 +363,7 @@ impl<'d, T: Instance> Drop for Spim<'d, T> { } pub(crate) mod sealed { - use embassy_util::waitqueue::AtomicWaker; + use embassy_sync::waitqueue::AtomicWaker; use super::*; diff --git a/embassy-nrf/src/temp.rs b/embassy-nrf/src/temp.rs index 1491e426..d520fd68 100644 --- a/embassy-nrf/src/temp.rs +++ b/embassy-nrf/src/temp.rs @@ -4,7 +4,7 @@ use core::task::Poll; use embassy_hal_common::drop::OnDrop; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use fixed::types::I30F2; use futures::future::poll_fn; diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index b961d65a..c32a4463 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -3,9 +3,9 @@ use core::sync::atomic::{compiler_fence, AtomicU32, AtomicU8, Ordering}; use core::{mem, ptr}; use critical_section::CriticalSection; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::blocking_mutex::CriticalSectionMutex as Mutex; use embassy_time::driver::{AlarmHandle, Driver}; -use embassy_util::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_util::blocking_mutex::CriticalSectionMutex as Mutex; use crate::interrupt::{Interrupt, InterruptExt}; use crate::{interrupt, pac}; diff --git a/embassy-nrf/src/timer.rs b/embassy-nrf/src/timer.rs index b3b613db..3de5a896 100644 --- a/embassy-nrf/src/timer.rs +++ b/embassy-nrf/src/timer.rs @@ -5,7 +5,7 @@ use core::task::Poll; use embassy_hal_common::drop::OnDrop; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::interrupt::{Interrupt, InterruptExt}; @@ -40,8 +40,8 @@ macro_rules! impl_timer { fn regs() -> &'static pac::timer0::RegisterBlock { unsafe { &*(pac::$pac_type::ptr() as *const pac::timer0::RegisterBlock) } } - fn waker(n: usize) -> &'static ::embassy_util::waitqueue::AtomicWaker { - use ::embassy_util::waitqueue::AtomicWaker; + fn waker(n: usize) -> &'static ::embassy_sync::waitqueue::AtomicWaker { + use ::embassy_sync::waitqueue::AtomicWaker; const NEW_AW: AtomicWaker = AtomicWaker::new(); static WAKERS: [AtomicWaker; $ccs] = [NEW_AW; $ccs]; &WAKERS[n] diff --git a/embassy-nrf/src/twim.rs b/embassy-nrf/src/twim.rs index 9587d1f4..850f6d0f 100644 --- a/embassy-nrf/src/twim.rs +++ b/embassy-nrf/src/twim.rs @@ -13,9 +13,9 @@ use core::task::Poll; use embassy_embedded_hal::SetConfig; use embassy_hal_common::{into_ref, PeripheralRef}; +use embassy_sync::waitqueue::AtomicWaker; #[cfg(feature = "time")] use embassy_time::{Duration, Instant}; -use embassy_util::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::chip::{EASY_DMA_SIZE, FORCE_COPY_BUFFER_SIZE}; diff --git a/embassy-nrf/src/uarte.rs b/embassy-nrf/src/uarte.rs index 0d24cf65..4347ea55 100644 --- a/embassy-nrf/src/uarte.rs +++ b/embassy-nrf/src/uarte.rs @@ -932,7 +932,7 @@ impl<'d, U: Instance, T: TimerInstance> UarteRxWithIdle<'d, U, T> { pub(crate) mod sealed { use core::sync::atomic::AtomicU8; - use embassy_util::waitqueue::AtomicWaker; + use embassy_sync::waitqueue::AtomicWaker; use super::*; diff --git a/embassy-nrf/src/usb.rs b/embassy-nrf/src/usb.rs index 509ee313..688326e9 100644 --- a/embassy-nrf/src/usb.rs +++ b/embassy-nrf/src/usb.rs @@ -7,10 +7,10 @@ use core::task::Poll; use cortex_m::peripheral::NVIC; use embassy_hal_common::{into_ref, PeripheralRef}; +use embassy_sync::waitqueue::AtomicWaker; pub use embassy_usb; use embassy_usb::driver::{self, EndpointError, Event, Unsupported}; use embassy_usb::types::{EndpointAddress, EndpointInfo, EndpointType, UsbDirection}; -use embassy_util::waitqueue::AtomicWaker; use futures::future::poll_fn; use futures::Future; use pac::usbd::RegisterBlock; diff --git a/embassy-rp/Cargo.toml b/embassy-rp/Cargo.toml index 0e53d3a3..cfd95b7b 100644 --- a/embassy-rp/Cargo.toml +++ b/embassy-rp/Cargo.toml @@ -27,7 +27,7 @@ nightly = ["embassy-executor/nightly", "embedded-hal-1", "embedded-hal-async", " unstable-traits = ["embedded-hal-1"] [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-executor = { version = "0.1.0", path = "../embassy-executor" } embassy-time = { version = "0.1.0", path = "../embassy-time", features = [ "tick-1mhz" ] } embassy-cortex-m = { version = "0.1.0", path = "../embassy-cortex-m", features = ["prio-bits-2"]} diff --git a/embassy-rp/src/gpio.rs b/embassy-rp/src/gpio.rs index 90862fa3..428855c7 100644 --- a/embassy-rp/src/gpio.rs +++ b/embassy-rp/src/gpio.rs @@ -5,7 +5,7 @@ use core::task::{Context, Poll}; use embassy_cortex_m::interrupt::{Interrupt, InterruptExt}; use embassy_hal_common::{impl_peripheral, into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use crate::pac::common::{Reg, RW}; use crate::pac::SIO; diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs index 5bc1f66c..5215c0c0 100644 --- a/embassy-rp/src/timer.rs +++ b/embassy-rp/src/timer.rs @@ -2,9 +2,9 @@ use core::cell::Cell; use atomic_polyfill::{AtomicU8, Ordering}; use critical_section::CriticalSection; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::blocking_mutex::Mutex; use embassy_time::driver::{AlarmHandle, Driver}; -use embassy_util::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_util::blocking_mutex::Mutex; use crate::interrupt::{Interrupt, InterruptExt}; use crate::{interrupt, pac}; diff --git a/embassy-stm32/Cargo.toml b/embassy-stm32/Cargo.toml index c47ea0bc..7a8e5c59 100644 --- a/embassy-stm32/Cargo.toml +++ b/embassy-stm32/Cargo.toml @@ -31,7 +31,7 @@ flavors = [ ] [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-executor = { version = "0.1.0", path = "../embassy-executor" } embassy-time = { version = "0.1.0", path = "../embassy-time", optional = true } embassy-cortex-m = { version = "0.1.0", path = "../embassy-cortex-m", features = ["prio-bits-4"]} @@ -72,7 +72,7 @@ quote = "1.0.15" stm32-metapac = { version = "0.1.0", path = "../stm32-metapac", default-features = false, features = ["metadata"]} [features] -defmt = ["dep:defmt", "bxcan/unstable-defmt", "embassy-util/defmt", "embassy-executor/defmt", "embassy-embedded-hal/defmt", "embedded-io?/defmt", "embassy-usb?/defmt"] +defmt = ["dep:defmt", "bxcan/unstable-defmt", "embassy-sync/defmt", "embassy-executor/defmt", "embassy-embedded-hal/defmt", "embedded-io?/defmt", "embassy-usb?/defmt"] sdmmc-rs = ["embedded-sdmmc"] net = ["embassy-net" ] memory-x = ["stm32-metapac/memory-x"] diff --git a/embassy-stm32/src/dcmi.rs b/embassy-stm32/src/dcmi.rs index bbb9a12c..fb9dc9d0 100644 --- a/embassy-stm32/src/dcmi.rs +++ b/embassy-stm32/src/dcmi.rs @@ -1,7 +1,7 @@ use core::task::Poll; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::gpio::sealed::AFType; diff --git a/embassy-stm32/src/dma/bdma.rs b/embassy-stm32/src/dma/bdma.rs index bd2cd5b5..674255dd 100644 --- a/embassy-stm32/src/dma/bdma.rs +++ b/embassy-stm32/src/dma/bdma.rs @@ -3,7 +3,7 @@ use core::sync::atomic::{fence, Ordering}; use core::task::Waker; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use super::{TransferOptions, Word, WordSize}; use crate::_generated::BDMA_CHANNEL_COUNT; diff --git a/embassy-stm32/src/dma/dma.rs b/embassy-stm32/src/dma/dma.rs index 0c66005c..a45b8780 100644 --- a/embassy-stm32/src/dma/dma.rs +++ b/embassy-stm32/src/dma/dma.rs @@ -1,7 +1,7 @@ use core::sync::atomic::{fence, Ordering}; use core::task::Waker; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use super::{Burst, FlowControl, Request, TransferOptions, Word, WordSize}; use crate::_generated::DMA_CHANNEL_COUNT; diff --git a/embassy-stm32/src/dma/gpdma.rs b/embassy-stm32/src/dma/gpdma.rs index 1aea6c65..bde8c3ef 100644 --- a/embassy-stm32/src/dma/gpdma.rs +++ b/embassy-stm32/src/dma/gpdma.rs @@ -1,7 +1,7 @@ use core::sync::atomic::{fence, Ordering}; use core::task::Waker; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use super::{Request, TransferOptions, Word, WordSize}; use crate::_generated::GPDMA_CHANNEL_COUNT; diff --git a/embassy-stm32/src/eth/v1/mod.rs b/embassy-stm32/src/eth/v1/mod.rs index 37593914..1ab0438a 100644 --- a/embassy-stm32/src/eth/v1/mod.rs +++ b/embassy-stm32/src/eth/v1/mod.rs @@ -7,7 +7,7 @@ use core::task::Waker; use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::{into_ref, PeripheralRef}; use embassy_net::{Device, DeviceCapabilities, LinkState, PacketBuf, MTU}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use crate::gpio::sealed::{AFType, Pin as __GpioPin}; use crate::gpio::{AnyPin, Speed}; diff --git a/embassy-stm32/src/eth/v2/mod.rs b/embassy-stm32/src/eth/v2/mod.rs index 1bc1fb72..d67c3c5e 100644 --- a/embassy-stm32/src/eth/v2/mod.rs +++ b/embassy-stm32/src/eth/v2/mod.rs @@ -5,7 +5,7 @@ use core::task::Waker; use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::{into_ref, PeripheralRef}; use embassy_net::{Device, DeviceCapabilities, LinkState, PacketBuf, MTU}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use crate::gpio::sealed::{AFType, Pin as _}; use crate::gpio::{AnyPin, Speed}; diff --git a/embassy-stm32/src/exti.rs b/embassy-stm32/src/exti.rs index ecb180bb..935149b1 100644 --- a/embassy-stm32/src/exti.rs +++ b/embassy-stm32/src/exti.rs @@ -4,7 +4,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use embassy_hal_common::impl_peripheral; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use crate::gpio::{AnyPin, Input, Pin as GpioPin}; use crate::pac::exti::regs::Lines; diff --git a/embassy-stm32/src/i2c/v2.rs b/embassy-stm32/src/i2c/v2.rs index b4303d3d..07a3105d 100644 --- a/embassy-stm32/src/i2c/v2.rs +++ b/embassy-stm32/src/i2c/v2.rs @@ -5,7 +5,7 @@ use atomic_polyfill::{AtomicUsize, Ordering}; use embassy_embedded_hal::SetConfig; use embassy_hal_common::drop::OnDrop; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use crate::dma::NoDma; diff --git a/embassy-stm32/src/rng.rs b/embassy-stm32/src/rng.rs index 81e28f35..520f2ab9 100644 --- a/embassy-stm32/src/rng.rs +++ b/embassy-stm32/src/rng.rs @@ -3,7 +3,7 @@ use core::task::Poll; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use rand_core::{CryptoRng, RngCore}; diff --git a/embassy-stm32/src/sdmmc/mod.rs b/embassy-stm32/src/sdmmc/mod.rs index 3ad31ec8..67758c49 100644 --- a/embassy-stm32/src/sdmmc/mod.rs +++ b/embassy-stm32/src/sdmmc/mod.rs @@ -5,7 +5,7 @@ use core::task::Poll; use embassy_hal_common::drop::OnDrop; use embassy_hal_common::{into_ref, PeripheralRef}; -use embassy_util::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::AtomicWaker; use futures::future::poll_fn; use sdio_host::{BusWidth, CardCapacity, CardStatus, CurrentState, SDStatus, CID, CSD, OCR, SCR}; @@ -1514,8 +1514,8 @@ foreach_peripheral!( INNER } - fn state() -> &'static ::embassy_util::waitqueue::AtomicWaker { - static WAKER: ::embassy_util::waitqueue::AtomicWaker = ::embassy_util::waitqueue::AtomicWaker::new(); + fn state() -> &'static ::embassy_sync::waitqueue::AtomicWaker { + static WAKER: ::embassy_sync::waitqueue::AtomicWaker = ::embassy_sync::waitqueue::AtomicWaker::new(); &WAKER } } diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index 7f472316..6989a43d 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -4,10 +4,10 @@ use core::sync::atomic::{compiler_fence, Ordering}; use core::{mem, ptr}; use atomic_polyfill::{AtomicU32, AtomicU8}; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::blocking_mutex::Mutex; use embassy_time::driver::{AlarmHandle, Driver}; use embassy_time::TICKS_PER_SECOND; -use embassy_util::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_util::blocking_mutex::Mutex; use stm32_metapac::timer::regs; use crate::interrupt::{CriticalSection, InterruptExt}; diff --git a/embassy-stm32/src/usart/buffered.rs b/embassy-stm32/src/usart/buffered.rs index ec2231e4..a7fa4389 100644 --- a/embassy-stm32/src/usart/buffered.rs +++ b/embassy-stm32/src/usart/buffered.rs @@ -4,7 +4,7 @@ use core::task::Poll; use atomic_polyfill::{compiler_fence, Ordering}; use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::ring_buffer::RingBuffer; -use embassy_util::waitqueue::WakerRegistration; +use embassy_sync::waitqueue::WakerRegistration; use futures::future::poll_fn; use super::*; diff --git a/embassy-stm32/src/usb/usb.rs b/embassy-stm32/src/usb/usb.rs index 3861e42d..db965824 100644 --- a/embassy-stm32/src/usb/usb.rs +++ b/embassy-stm32/src/usb/usb.rs @@ -6,10 +6,10 @@ use core::task::Poll; use atomic_polyfill::{AtomicBool, AtomicU8}; use embassy_hal_common::into_ref; +use embassy_sync::waitqueue::AtomicWaker; use embassy_time::{block_for, Duration}; use embassy_usb::driver::{self, EndpointAllocError, EndpointError, Event, Unsupported}; use embassy_usb::types::{EndpointAddress, EndpointInfo, EndpointType, UsbDirection}; -use embassy_util::waitqueue::AtomicWaker; use futures::future::poll_fn; use futures::Future; use pac::common::{Reg, RW}; diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml new file mode 100644 index 00000000..0d14bba5 --- /dev/null +++ b/embassy-sync/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "embassy-sync" +version = "0.1.0" +edition = "2021" + +[package.metadata.embassy_docs] +src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" +src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/src/" +features = ["nightly"] +target = "thumbv7em-none-eabi" + +[features] +nightly = ["embedded-io/async"] + +[dependencies] +defmt = { version = "0.3", optional = true } +log = { version = "0.4.14", optional = true } + +futures-util = { version = "0.3.17", default-features = false } +atomic-polyfill = "1.0.1" +critical-section = "1.1" +heapless = "0.7.5" +cfg-if = "1.0.0" +embedded-io = "0.3.0" + +[dev-dependencies] +futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } +futures-test = "0.3.17" +futures-timer = "3.0.2" +futures-util = { version = "0.3.17", features = [ "channel" ] } + +# Enable critical-section implementation for std, for tests +critical-section = { version = "1.1", features = ["std"] } +static_cell = "1.0" diff --git a/embassy-sync/build.rs b/embassy-sync/build.rs new file mode 100644 index 00000000..6fe82b44 --- /dev/null +++ b/embassy-sync/build.rs @@ -0,0 +1,29 @@ +use std::env; + +fn main() { + let target = env::var("TARGET").unwrap(); + + if target.starts_with("thumbv6m-") { + println!("cargo:rustc-cfg=cortex_m"); + println!("cargo:rustc-cfg=armv6m"); + } else if target.starts_with("thumbv7m-") { + println!("cargo:rustc-cfg=cortex_m"); + println!("cargo:rustc-cfg=armv7m"); + } else if target.starts_with("thumbv7em-") { + println!("cargo:rustc-cfg=cortex_m"); + println!("cargo:rustc-cfg=armv7m"); + println!("cargo:rustc-cfg=armv7em"); // (not currently used) + } else if target.starts_with("thumbv8m.base") { + println!("cargo:rustc-cfg=cortex_m"); + println!("cargo:rustc-cfg=armv8m"); + println!("cargo:rustc-cfg=armv8m_base"); + } else if target.starts_with("thumbv8m.main") { + println!("cargo:rustc-cfg=cortex_m"); + println!("cargo:rustc-cfg=armv8m"); + println!("cargo:rustc-cfg=armv8m_main"); + } + + if target.ends_with("-eabihf") { + println!("cargo:rustc-cfg=has_fpu"); + } +} diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs new file mode 100644 index 00000000..8a4a4c64 --- /dev/null +++ b/embassy-sync/src/blocking_mutex/mod.rs @@ -0,0 +1,189 @@ +//! Blocking mutex. +//! +//! This module provides a blocking mutex that can be used to synchronize data. +pub mod raw; + +use core::cell::UnsafeCell; + +use self::raw::RawMutex; + +/// Blocking mutex (not async) +/// +/// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`]. +/// +/// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind +/// of interior mutability fits your use case. +/// +/// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts. +/// +/// Use [`NoopMutex`] when data is only shared between tasks running on the same executor. +/// +/// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton. +/// +/// In all cases, the blocking mutex is intended to be short lived and not held across await points. +/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. +pub struct Mutex { + // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets + // to run BEFORE dropping `data`. + raw: R, + data: UnsafeCell, +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + #[inline] + pub const fn new(val: T) -> Mutex { + Mutex { + raw: R::INIT, + data: UnsafeCell::new(val), + } + } + + /// Creates a critical section and grants temporary access to the protected data. + pub fn lock(&self, f: impl FnOnce(&T) -> U) -> U { + self.raw.lock(|| { + let ptr = self.data.get() as *const T; + let inner = unsafe { &*ptr }; + f(inner) + }) + } +} + +impl Mutex { + /// Creates a new mutex based on a pre-existing raw mutex. + /// + /// This allows creating a mutex in a constant context on stable Rust. + #[inline] + pub const fn const_new(raw_mutex: R, val: T) -> Mutex { + Mutex { + raw: raw_mutex, + data: UnsafeCell::new(val), + } + } + + /// Consumes this mutex, returning the underlying data. + #[inline] + pub fn into_inner(self) -> T { + self.data.into_inner() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place---the mutable borrow statically guarantees no locks exist. + #[inline] + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.data.get() } + } +} + +/// A mutex that allows borrowing data across executors and interrupts. +/// +/// # Safety +/// +/// This mutex is safe to share between different executors and interrupts. +pub type CriticalSectionMutex = Mutex; + +/// A mutex that allows borrowing data in the context of a single executor. +/// +/// # Safety +/// +/// **This Mutex is only safe within a single executor.** +pub type NoopMutex = Mutex; + +impl Mutex { + /// Borrows the data for the duration of the critical section + pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T { + let ptr = self.data.get() as *const T; + unsafe { &*ptr } + } +} + +impl Mutex { + /// Borrows the data + pub fn borrow(&self) -> &T { + let ptr = self.data.get() as *const T; + unsafe { &*ptr } + } +} + +// ThreadModeMutex does NOT use the generic mutex from above because it's special: +// it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?). +// +// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), +// but that will require T: Send even though it shouldn't be needed. + +#[cfg(any(cortex_m, feature = "std"))] +pub use thread_mode_mutex::*; +#[cfg(any(cortex_m, feature = "std"))] +mod thread_mode_mutex { + use super::*; + + /// A "mutex" that only allows borrowing from thread mode. + /// + /// # Safety + /// + /// **This Mutex is only safe on single-core systems.** + /// + /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. + pub struct ThreadModeMutex { + inner: UnsafeCell, + } + + // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. + // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can + // be Send+Sync even if T is not Send (unlike CriticalSectionMutex) + unsafe impl Sync for ThreadModeMutex {} + unsafe impl Send for ThreadModeMutex {} + + impl ThreadModeMutex { + /// Creates a new mutex + pub const fn new(value: T) -> Self { + ThreadModeMutex { + inner: UnsafeCell::new(value), + } + } + } + + impl ThreadModeMutex { + /// Lock the `ThreadModeMutex`, granting access to the data. + /// + /// # Panics + /// + /// This will panic if not currently running in thread mode. + pub fn lock(&self, f: impl FnOnce(&T) -> R) -> R { + f(self.borrow()) + } + + /// Borrows the data + /// + /// # Panics + /// + /// This will panic if not currently running in thread mode. + pub fn borrow(&self) -> &T { + assert!( + raw::in_thread_mode(), + "ThreadModeMutex can only be borrowed from thread mode." + ); + unsafe { &*self.inner.get() } + } + } + + impl Drop for ThreadModeMutex { + fn drop(&mut self) { + // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so + // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if + // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, + // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. + assert!( + raw::in_thread_mode(), + "ThreadModeMutex can only be dropped from thread mode." + ); + + // Drop of the inner `T` happens after this. + } + } +} diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs new file mode 100644 index 00000000..15796f1b --- /dev/null +++ b/embassy-sync/src/blocking_mutex/raw.rs @@ -0,0 +1,149 @@ +//! Mutex primitives. +//! +//! This module provides a trait for mutexes that can be used in different contexts. +use core::marker::PhantomData; + +/// Raw mutex trait. +/// +/// This mutex is "raw", which means it does not actually contain the protected data, it +/// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead, +/// which is generic over a RawMutex and contains the protected data. +/// +/// Note that, unlike other mutexes, implementations only guarantee no +/// concurrent access from other threads: concurrent access from the current +/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly. +/// +/// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access +/// to the data, it is not enough to guarantee exclusive (`&mut`) access. +/// +/// # Safety +/// +/// RawMutex implementations must ensure that, while locked, no other thread can lock +/// the RawMutex concurrently. +/// +/// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior. +pub unsafe trait RawMutex { + /// Create a new `RawMutex` instance. + /// + /// This is a const instead of a method to allow creating instances in const context. + const INIT: Self; + + /// Lock this `RawMutex`. + fn lock(&self, f: impl FnOnce() -> R) -> R; +} + +/// A mutex that allows borrowing data across executors and interrupts. +/// +/// # Safety +/// +/// This mutex is safe to share between different executors and interrupts. +pub struct CriticalSectionRawMutex { + _phantom: PhantomData<()>, +} +unsafe impl Send for CriticalSectionRawMutex {} +unsafe impl Sync for CriticalSectionRawMutex {} + +impl CriticalSectionRawMutex { + /// Create a new `CriticalSectionRawMutex`. + pub const fn new() -> Self { + Self { _phantom: PhantomData } + } +} + +unsafe impl RawMutex for CriticalSectionRawMutex { + const INIT: Self = Self::new(); + + fn lock(&self, f: impl FnOnce() -> R) -> R { + critical_section::with(|_| f()) + } +} + +// ================ + +/// A mutex that allows borrowing data in the context of a single executor. +/// +/// # Safety +/// +/// **This Mutex is only safe within a single executor.** +pub struct NoopRawMutex { + _phantom: PhantomData<*mut ()>, +} + +unsafe impl Send for NoopRawMutex {} + +impl NoopRawMutex { + /// Create a new `NoopRawMutex`. + pub const fn new() -> Self { + Self { _phantom: PhantomData } + } +} + +unsafe impl RawMutex for NoopRawMutex { + const INIT: Self = Self::new(); + fn lock(&self, f: impl FnOnce() -> R) -> R { + f() + } +} + +// ================ + +#[cfg(any(cortex_m, feature = "std"))] +mod thread_mode { + use super::*; + + /// A "mutex" that only allows borrowing from thread mode. + /// + /// # Safety + /// + /// **This Mutex is only safe on single-core systems.** + /// + /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access. + pub struct ThreadModeRawMutex { + _phantom: PhantomData<()>, + } + + unsafe impl Send for ThreadModeRawMutex {} + unsafe impl Sync for ThreadModeRawMutex {} + + impl ThreadModeRawMutex { + /// Create a new `ThreadModeRawMutex`. + pub const fn new() -> Self { + Self { _phantom: PhantomData } + } + } + + unsafe impl RawMutex for ThreadModeRawMutex { + const INIT: Self = Self::new(); + fn lock(&self, f: impl FnOnce() -> R) -> R { + assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode."); + + f() + } + } + + impl Drop for ThreadModeRawMutex { + fn drop(&mut self) { + // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so + // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if + // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, + // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. + assert!( + in_thread_mode(), + "ThreadModeMutex can only be dropped from thread mode." + ); + + // Drop of the inner `T` happens after this. + } + } + + pub(crate) fn in_thread_mode() -> bool { + #[cfg(feature = "std")] + return Some("main") == std::thread::current().name(); + + #[cfg(not(feature = "std"))] + // ICSR.VECTACTIVE == 0 + return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; + } +} +#[cfg(any(cortex_m, feature = "std"))] +pub use thread_mode::*; diff --git a/embassy-sync/src/channel/mod.rs b/embassy-sync/src/channel/mod.rs new file mode 100644 index 00000000..5df1f5c5 --- /dev/null +++ b/embassy-sync/src/channel/mod.rs @@ -0,0 +1,5 @@ +//! Async channels + +pub mod mpmc; +pub mod pubsub; +pub mod signal; diff --git a/embassy-sync/src/channel/mpmc.rs b/embassy-sync/src/channel/mpmc.rs new file mode 100644 index 00000000..7bebd341 --- /dev/null +++ b/embassy-sync/src/channel/mpmc.rs @@ -0,0 +1,596 @@ +//! A queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. +//! + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use heapless::Deque; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// Send-only access to a [`Channel`]. +#[derive(Copy)] +pub struct Sender<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Sender { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> +where + M: RawMutex, +{ + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { + self.channel.send(message) + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send(message) + } +} + +/// Send-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicSender<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicSender<'ch, T> { + fn clone(&self) -> Self { + DynamicSender { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, T> +where + M: RawMutex, +{ + fn from(s: Sender<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, T> DynamicSender<'ch, T> { + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } +} + +/// Receive-only access to a [`Channel`]. +#[derive(Copy)] +pub struct Receiver<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Receiver { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> +where + M: RawMutex, +{ + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + self.channel.recv() + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } +} + +/// Receive-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicReceiver<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicReceiver<'ch, T> { + fn clone(&self) -> Self { + DynamicReceiver { channel: self.channel } + } +} + +impl<'ch, T> DynamicReceiver<'ch, T> { + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> DynamicRecvFuture<'_, T> { + DynamicRecvFuture { channel: self.channel } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv_with_context(None) + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> +where + M: RawMutex, +{ + fn from(s: Receiver<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +/// Future returned by [`DynamicReceiver::recv`]. +pub struct DynamicRecvFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +/// Future returned by [`Channel::send`] and [`Sender::send`]. +pub struct SendFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, + message: Option, +} + +impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} + +/// Future returned by [`DynamicSender::send`]. +pub struct DynamicSendFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, + message: Option, +} + +impl<'ch, T> Future for DynamicSendFuture<'ch, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} + +trait DynamicChannel { + fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; +} + +/// Error returned by [`try_recv`](Channel::try_recv). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryRecvError { + /// A message could not be received because the channel is empty. + Empty, +} + +/// Error returned by [`try_send`](Channel::try_send). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), +} + +struct ChannelState { + queue: Deque, + receiver_waker: WakerRegistration, + senders_waker: WakerRegistration, +} + +impl ChannelState { + const fn new() -> Self { + ChannelState { + queue: Deque::new(), + receiver_waker: WakerRegistration::new(), + senders_waker: WakerRegistration::new(), + } + } + + fn try_recv(&mut self) -> Result { + self.try_recv_with_context(None) + } + + fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop_front() { + Ok(message) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryRecvError::Empty) + } + } + + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.try_send_with_context(message, None) + } + + fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + match self.queue.push_back(message) { + Ok(()) => { + self.receiver_waker.wake(); + Ok(()) + } + Err(message) => { + if let Some(cx) = cx { + self.senders_waker.register(cx.waker()); + } + Err(TrySendError::Full(message)) + } + } + } +} + +/// A bounded channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +pub struct Channel +where + M: RawMutex, +{ + inner: Mutex>>, +} + +impl Channel +where + M: RawMutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy_sync::channel::mpmc::Channel; + /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::new(); + /// ``` + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(ChannelState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { + self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + } + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + self.lock(|c| c.try_recv_with_context(cx)) + } + + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + self.lock(|c| c.try_send_with_context(m, cx)) + } + + /// Get a sender for this channel. + pub fn sender(&self) -> Sender<'_, M, T, N> { + Sender { channel: self } + } + + /// Get a receiver for this channel. + pub fn receiver(&self) -> Receiver<'_, M, T, N> { + Receiver { channel: self } + } + + /// Send a value, waiting until there is capacity. + /// + /// Sending completes when the value has been pushed to the channel's queue. + /// This doesn't mean the value has been received yet. + pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { + SendFuture { + channel: self, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// This method differs from [`send`](Channel::send) by returning immediately if the channel's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`Channel`], then an + /// error is returned. + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.lock(|c| c.try_send(message)) + } + + /// Receive the next value. + /// + /// If there are no messages in the channel's buffer, this method will + /// wait until a message is sent. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + RecvFuture { channel: self } + } + + /// Attempt to immediately receive a message. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + pub fn try_recv(&self) -> Result { + self.lock(|c| c.try_recv()) + } +} + +/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the +/// tradeoff cost of dynamic dispatch. +impl DynamicChannel for Channel +where + M: RawMutex, +{ + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + Channel::try_send_with_context(self, m, cx) + } + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + Channel::try_recv_with_context(self, cx) + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use futures_executor::ThreadPool; + use futures_timer::Delay; + use futures_util::task::SpawnExt; + use static_cell::StaticCell; + + use super::*; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + + fn capacity(c: &ChannelState) -> usize { + c.queue.capacity() - c.queue.len() + } + + #[test] + fn sending_once() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn sending_when_full() { + let mut c = ChannelState::::new(); + let _ = c.try_send(1); + let _ = c.try_send(1); + let _ = c.try_send(1); + match c.try_send(2) { + Err(TrySendError::Full(2)) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = ChannelState::::new(); + match c.try_recv() { + Err(TryRecvError::Empty) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = Channel::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + } + + #[test] + fn cloning() { + let c = Channel::::new(); + let r1 = c.receiver(); + let s1 = c.sender(); + + let _ = r1.clone(); + let _ = s1.clone(); + } + + #[test] + fn dynamic_dispatch() { + let c = Channel::::new(); + let s: DynamicSender<'_, u32> = c.sender().into(); + let r: DynamicReceiver<'_, u32> = c.receiver().into(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_recv().unwrap(), 1); + } + + #[futures_test::test] + async fn receiver_receives_given_try_send_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(Channel::new()); + let c2 = c; + assert!(executor + .spawn(async move { + assert!(c2.try_send(1).is_ok()); + }) + .is_ok()); + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = Channel::::new(); + c.send(1).await; + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn senders_sends_wait_until_capacity() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(Channel::new()); + assert!(c.try_send(1).is_ok()); + + let c2 = c; + let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); + let c2 = c; + let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); + // Wish I could think of a means of determining that the async send is waiting instead. + // However, I've used the debugger to observe that the send does indeed wait. + Delay::new(Duration::from_millis(500)).await; + assert_eq!(c.recv().await, 1); + assert!(executor + .spawn(async move { + loop { + c.recv().await; + } + }) + .is_ok()); + send_task_1.unwrap().await; + send_task_2.unwrap().await; + } +} diff --git a/embassy-sync/src/channel/pubsub/mod.rs b/embassy-sync/src/channel/pubsub/mod.rs new file mode 100644 index 00000000..f62b4d11 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/mod.rs @@ -0,0 +1,542 @@ +//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. + +#![deny(missing_docs)] + +use core::cell::RefCell; +use core::fmt::Debug; +use core::task::{Context, Poll, Waker}; + +use heapless::Deque; + +use self::publisher::{ImmediatePub, Pub}; +use self::subscriber::Sub; +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::MultiWakerRegistration; + +pub mod publisher; +pub mod subscriber; + +pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; +pub use subscriber::{DynSubscriber, Subscriber}; + +/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers +/// +/// Any published message can be read by all subscribers. +/// A publisher can choose how it sends its message. +/// +/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. +/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message +/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive +/// an error to indicate that it has lagged. +/// +/// ## Example +/// +/// ``` +/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; +/// # use embassy_sync::channel::pubsub::WaitResult; +/// # use embassy_sync::channel::pubsub::PubSubChannel; +/// # use futures_executor::block_on; +/// # let test = async { +/// // Create the channel. This can be static as well +/// let channel = PubSubChannel::::new(); +/// +/// // This is a generic subscriber with a direct reference to the channel +/// let mut sub0 = channel.subscriber().unwrap(); +/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel +/// let mut sub1 = channel.dyn_subscriber().unwrap(); +/// +/// let pub0 = channel.publisher().unwrap(); +/// +/// // Publish a message, but wait if the queue is full +/// pub0.publish(42).await; +/// +/// // Publish a message, but if the queue is full, just kick out the oldest message. +/// // This may cause some subscribers to miss a message +/// pub0.publish_immediate(43); +/// +/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result +/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); +/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); +/// +/// // Wait again, but this time ignore any Lag results +/// assert_eq!(sub0.next_message_pure().await, 43); +/// assert_eq!(sub1.next_message_pure().await, 43); +/// +/// // There's also a polling interface +/// assert_eq!(sub0.try_next_message(), None); +/// assert_eq!(sub1.try_next_message(), None); +/// # }; +/// # +/// # block_on(test); +/// ``` +/// +pub struct PubSubChannel { + inner: Mutex>>, +} + +impl + PubSubChannel +{ + /// Create a new channel + pub const fn new() -> Self { + Self { + inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), + } + } + + /// Create a new subscriber. It will only receive messages that are published after its creation. + /// + /// If there are no subscriber slots left, an error will be returned. + pub fn subscriber(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(Subscriber(Sub::new(s.next_message_id, self))) + } + }) + } + + /// Create a new subscriber. It will only receive messages that are published after its creation. + /// + /// If there are no subscriber slots left, an error will be returned. + pub fn dyn_subscriber(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(DynSubscriber(Sub::new(s.next_message_id, self))) + } + }) + } + + /// Create a new publisher + /// + /// If there are no publisher slots left, an error will be returned. + pub fn publisher(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(Publisher(Pub::new(self))) + } + }) + } + + /// Create a new publisher + /// + /// If there are no publisher slots left, an error will be returned. + pub fn dyn_publisher(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(DynPublisher(Pub::new(self))) + } + }) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn immediate_publisher(&self) -> ImmediatePublisher { + ImmediatePublisher(ImmediatePub::new(self)) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { + DynImmediatePublisher(ImmediatePub::new(self)) + } +} + +impl PubSubBehavior + for PubSubChannel +{ + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + + // Check if we can read a message + match s.get_message(*next_message_id) { + // Yes, so we are done polling + Some(WaitResult::Message(message)) => { + *next_message_id += 1; + Poll::Ready(WaitResult::Message(message)) + } + // No, so we need to reregister our waker and sleep again + None => { + if let Some(cx) = cx { + s.register_subscriber_waker(cx.waker()); + } + Poll::Pending + } + // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + Some(WaitResult::Lagged(amount)) => { + *next_message_id += amount; + Poll::Ready(WaitResult::Lagged(amount)) + } + } + }) + } + + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + // Try to publish the message + match s.try_publish(message) { + // We did it, we are ready + Ok(()) => Ok(()), + // The queue is full, so we need to reregister our waker and go to sleep + Err(message) => { + if let Some(cx) = cx { + s.register_publisher_waker(cx.waker()); + } + Err(message) + } + } + }) + } + + fn publish_immediate(&self, message: T) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.publish_immediate(message) + }) + } + + fn unregister_subscriber(&self, subscriber_next_message_id: u64) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_subscriber(subscriber_next_message_id) + }) + } + + fn unregister_publisher(&self) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_publisher() + }) + } +} + +/// Internal state for the PubSub channel +struct PubSubState { + /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it + queue: Deque<(T, usize), CAP>, + /// Every message has an id. + /// Don't worry, we won't run out. + /// If a million messages were published every second, then the ID's would run out in about 584942 years. + next_message_id: u64, + /// Collection of wakers for Subscribers that are waiting. + subscriber_wakers: MultiWakerRegistration, + /// Collection of wakers for Publishers that are waiting. + publisher_wakers: MultiWakerRegistration, + /// The amount of subscribers that are active + subscriber_count: usize, + /// The amount of publishers that are active + publisher_count: usize, +} + +impl PubSubState { + /// Create a new internal channel state + const fn new() -> Self { + Self { + queue: Deque::new(), + next_message_id: 0, + subscriber_wakers: MultiWakerRegistration::new(), + publisher_wakers: MultiWakerRegistration::new(), + subscriber_count: 0, + publisher_count: 0, + } + } + + fn try_publish(&mut self, message: T) -> Result<(), T> { + if self.subscriber_count == 0 { + // We don't need to publish anything because there is no one to receive it + return Ok(()); + } + + if self.queue.is_full() { + return Err(message); + } + // We just did a check for this + self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); + + self.next_message_id += 1; + + // Wake all of the subscribers + self.subscriber_wakers.wake(); + + Ok(()) + } + + fn publish_immediate(&mut self, message: T) { + // Make space in the queue if required + if self.queue.is_full() { + self.queue.pop_front(); + } + + // This will succeed because we made sure there is space + self.try_publish(message).ok().unwrap(); + } + + fn get_message(&mut self, message_id: u64) -> Option> { + let start_id = self.next_message_id - self.queue.len() as u64; + + if message_id < start_id { + return Some(WaitResult::Lagged(start_id - message_id)); + } + + let current_message_index = (message_id - start_id) as usize; + + if current_message_index >= self.queue.len() { + return None; + } + + // We've checked that the index is valid + let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); + + // We're reading this item, so decrement the counter + queue_item.1 -= 1; + let message = queue_item.0.clone(); + + if current_message_index == 0 && queue_item.1 == 0 { + self.queue.pop_front(); + self.publisher_wakers.wake(); + } + + Some(WaitResult::Message(message)) + } + + fn register_subscriber_waker(&mut self, waker: &Waker) { + match self.subscriber_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a subscriber that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.subscriber_wakers.wake(); + self.subscriber_wakers.register(waker).unwrap(); + } + } + } + + fn register_publisher_waker(&mut self, waker: &Waker) { + match self.publisher_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a publisher that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.publisher_wakers.wake(); + self.publisher_wakers.register(waker).unwrap(); + } + } + } + + fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { + self.subscriber_count -= 1; + + // All messages that haven't been read yet by this subscriber must have their counter decremented + let start_id = self.next_message_id - self.queue.len() as u64; + if subscriber_next_message_id >= start_id { + let current_message_index = (subscriber_next_message_id - start_id) as usize; + self.queue + .iter_mut() + .skip(current_message_index) + .for_each(|(_, counter)| *counter -= 1); + } + } + + fn unregister_publisher(&mut self) { + self.publisher_count -= 1; + } +} + +/// Error type for the [PubSubChannel] +#[derive(Debug, PartialEq, Eq, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum Error { + /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or + /// the capacity of the channels must be increased. + MaximumSubscribersReached, + /// All publisher slots are used. To add another publisher, first another publisher must be dropped or + /// the capacity of the channels must be increased. + MaximumPublishersReached, +} + +/// 'Middle level' behaviour of the pubsub channel. +/// This trait is used so that Sub and Pub can be generic over the channel. +pub trait PubSubBehavior { + /// Try to get a message from the queue with the given message id. + /// + /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; + + /// Try to publish a message to the queue. + /// + /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; + + /// Publish a message immediately + fn publish_immediate(&self, message: T); + + /// Let the channel know that a subscriber has dropped + fn unregister_subscriber(&self, subscriber_next_message_id: u64); + + /// Let the channel know that a publisher has dropped + fn unregister_publisher(&self); +} + +/// The result of the subscriber wait procedure +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum WaitResult { + /// The subscriber did not receive all messages and lagged by the given amount of messages. + /// (This is the amount of messages that were missed) + Lagged(u64), + /// A message was received + Message(T), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocking_mutex::raw::NoopRawMutex; + + #[futures_test::test] + async fn dyn_pub_sub_works() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.dyn_subscriber().unwrap(); + let mut sub1 = channel.dyn_subscriber().unwrap(); + let pub0 = channel.dyn_publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + + #[futures_test::test] + async fn all_subscribers_receive() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + + #[futures_test::test] + async fn lag_when_queue_full_on_immediate_publish() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish_immediate(42); + pub0.publish_immediate(43); + pub0.publish_immediate(44); + pub0.publish_immediate(45); + pub0.publish_immediate(46); + pub0.publish_immediate(47); + + assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); + assert_eq!(sub0.next_message().await, WaitResult::Message(44)); + assert_eq!(sub0.next_message().await, WaitResult::Message(45)); + assert_eq!(sub0.next_message().await, WaitResult::Message(46)); + assert_eq!(sub0.next_message().await, WaitResult::Message(47)); + assert_eq!(sub0.try_next_message(), None); + } + + #[test] + fn limited_subs_and_pubs() { + let channel = PubSubChannel::::new(); + + let sub0 = channel.subscriber(); + let sub1 = channel.subscriber(); + let sub2 = channel.subscriber(); + let sub3 = channel.subscriber(); + let sub4 = channel.subscriber(); + + assert!(sub0.is_ok()); + assert!(sub1.is_ok()); + assert!(sub2.is_ok()); + assert!(sub3.is_ok()); + assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); + + drop(sub0); + + let sub5 = channel.subscriber(); + assert!(sub5.is_ok()); + + // publishers + + let pub0 = channel.publisher(); + let pub1 = channel.publisher(); + let pub2 = channel.publisher(); + let pub3 = channel.publisher(); + let pub4 = channel.publisher(); + + assert!(pub0.is_ok()); + assert!(pub1.is_ok()); + assert!(pub2.is_ok()); + assert!(pub3.is_ok()); + assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); + + drop(pub0); + + let pub5 = channel.publisher(); + assert!(pub5.is_ok()); + } + + #[test] + fn publisher_wait_on_full_queue() { + let channel = PubSubChannel::::new(); + + let pub0 = channel.publisher().unwrap(); + + // There are no subscribers, so the queue will never be full + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + + let sub0 = channel.subscriber().unwrap(); + + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Err(0)); + + drop(sub0); + } +} diff --git a/embassy-sync/src/channel/pubsub/publisher.rs b/embassy-sync/src/channel/pubsub/publisher.rs new file mode 100644 index 00000000..705797f6 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/publisher.rs @@ -0,0 +1,182 @@ +//! Implementation of anything directly publisher related + +use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{PubSubBehavior, PubSubChannel}; +use crate::blocking_mutex::raw::RawMutex; + +/// A publisher to a channel +pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + + /// Publish a message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { + PublisherWaitFuture { + message: Some(message), + publisher: self, + } + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_publisher() + } +} + +/// A publisher that holds a dynamic reference to the channel +pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynPublisher<'a, T> { + type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that holds a generic reference to the channel +pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Pub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Pub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + /// Publish the message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +/// An immediate publisher that holds a dynamic reference to the channel +pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { + type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// An immediate publisher that holds a generic reference to the channel +pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) ImmediatePub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = ImmediatePub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the publisher wait action +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message we need to publish + message: Option, + publisher: &'s Pub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let message = self.message.take().unwrap(); + match self.publisher.channel.publish_with_context(message, Some(cx)) { + Ok(()) => Poll::Ready(()), + Err(message) => { + self.message = Some(message); + Poll::Pending + } + } + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/channel/pubsub/subscriber.rs b/embassy-sync/src/channel/pubsub/subscriber.rs new file mode 100644 index 00000000..b9a2cbe1 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/subscriber.rs @@ -0,0 +1,152 @@ +//! Implementation of anything directly subscriber related + +use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{PubSubBehavior, PubSubChannel, WaitResult}; +use crate::blocking_mutex::raw::RawMutex; + +/// A subscriber to a channel +pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message id of the next message we are yet to receive + next_message_id: u64, + /// The channel we are a subscriber to + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { + pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { + Self { + next_message_id, + channel, + _phantom: Default::default(), + } + } + + /// Wait for a published message + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { + SubscriberWaitFuture { subscriber: self } + } + + /// Wait for a published message (ignoring lag results) + pub async fn next_message_pure(&mut self) -> T { + loop { + match self.next_message().await { + WaitResult::Lagged(_) => continue, + WaitResult::Message(message) => break message, + } + } + } + + /// Try to see if there's a published message we haven't received yet. + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message(&mut self) -> Option> { + match self.channel.get_message_with_context(&mut self.next_message_id, None) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } + } + + /// Try to see if there's a published message we haven't received yet (ignoring lag results). + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message_pure(&mut self) -> Option { + loop { + match self.try_next_message() { + Some(WaitResult::Lagged(_)) => continue, + Some(WaitResult::Message(message)) => break Some(message), + None => break None, + } + } + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_subscriber(self.next_message_id) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} + +/// Warning: The stream implementation ignores lag results and returns all messages. +/// This might miss some messages without you knowing it. +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self + .channel + .get_message_with_context(&mut self.next_message_id, Some(cx)) + { + Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), + Poll::Ready(WaitResult::Lagged(_)) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } +} + +/// A subscriber that holds a dynamic reference to the channel +pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { + type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A subscriber that holds a generic reference to the channel +pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Sub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Sub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the subscriber wait action +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + subscriber: &'s mut Sub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { + type Output = WaitResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.subscriber + .channel + .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/channel/signal.rs b/embassy-sync/src/channel/signal.rs new file mode 100644 index 00000000..9279266c --- /dev/null +++ b/embassy-sync/src/channel/signal.rs @@ -0,0 +1,100 @@ +//! A synchronization primitive for passing the latest value to a task. +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem; +use core::task::{Context, Poll, Waker}; + +/// Single-slot signaling primitive. +/// +/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except +/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead +/// of waiting for the receiver to pop the previous value. +/// +/// It is useful for sending data between tasks when the receiver only cares about +/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" +/// updates. +/// +/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. +/// +/// Signals are generally declared as `static`s and then borrowed as required. +/// +/// ``` +/// use embassy_sync::channel::signal::Signal; +/// +/// enum SomeCommand { +/// On, +/// Off, +/// } +/// +/// static SOME_SIGNAL: Signal = Signal::new(); +/// ``` +pub struct Signal { + state: UnsafeCell>, +} + +enum State { + None, + Waiting(Waker), + Signaled(T), +} + +unsafe impl Send for Signal {} +unsafe impl Sync for Signal {} + +impl Signal { + /// Create a new `Signal`. + pub const fn new() -> Self { + Self { + state: UnsafeCell::new(State::None), + } + } +} + +impl Signal { + /// Mark this Signal as signaled. + pub fn signal(&self, val: T) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { + waker.wake(); + } + }) + } + + /// Remove the queued value in this `Signal`, if any. + pub fn reset(&self) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + *state = State::None + }) + } + + /// Manually poll the Signal future. + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + match state { + State::None => { + *state = State::Waiting(cx.waker().clone()); + Poll::Pending + } + State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, + State::Waiting(_) => panic!("waker overflow"), + State::Signaled(_) => match mem::replace(state, State::None) { + State::Signaled(res) => Poll::Ready(res), + _ => unreachable!(), + }, + } + }) + } + + /// Future that completes when this Signal has been signaled. + pub fn wait(&self) -> impl Future + '_ { + futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) + } + + /// non-blocking method to check whether this signal has been signaled. + pub fn signaled(&self) -> bool { + critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) + } +} diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs new file mode 100644 index 00000000..f8bb0a03 --- /dev/null +++ b/embassy-sync/src/fmt.rs @@ -0,0 +1,228 @@ +#![macro_use] +#![allow(unused_macros)] + +#[cfg(all(feature = "defmt", feature = "log"))] +compile_error!("You may not enable both `defmt` and `log` features."); + +macro_rules! assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert!($($x)*); + } + }; +} + +macro_rules! assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_eq!($($x)*); + } + }; +} + +macro_rules! assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_ne!($($x)*); + } + }; +} + +macro_rules! debug_assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert!($($x)*); + } + }; +} + +macro_rules! debug_assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_eq!($($x)*); + } + }; +} + +macro_rules! debug_assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_ne!($($x)*); + } + }; +} + +macro_rules! todo { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::todo!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::todo!($($x)*); + } + }; +} + +macro_rules! unreachable { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::unreachable!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::unreachable!($($x)*); + } + }; +} + +macro_rules! panic { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::panic!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::panic!($($x)*); + } + }; +} + +macro_rules! trace { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::trace!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::trace!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! debug { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::debug!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::debug!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! info { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::info!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::info!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! warn { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::warn!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::warn!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! error { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::error!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::error!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +#[cfg(feature = "defmt")] +macro_rules! unwrap { + ($($x:tt)*) => { + ::defmt::unwrap!($($x)*) + }; +} + +#[cfg(not(feature = "defmt"))] +macro_rules! unwrap { + ($arg:expr) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); + } + } + }; + ($arg:expr, $($msg:expr),+ $(,)? ) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); + } + } + } +} + +#[cfg(feature = "defmt-timestamp-uptime")] +defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct NoneError; + +pub trait Try { + type Ok; + type Error; + fn into_result(self) -> Result; +} + +impl Try for Option { + type Ok = T; + type Error = NoneError; + + #[inline] + fn into_result(self) -> Result { + self.ok_or(NoneError) + } +} + +impl Try for Result { + type Ok = T; + type Error = E; + + #[inline] + fn into_result(self) -> Self { + self + } +} diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs new file mode 100644 index 00000000..7d881590 --- /dev/null +++ b/embassy-sync/src/lib.rs @@ -0,0 +1,17 @@ +#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] +#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] +#![allow(clippy::new_without_default)] +#![doc = include_str!("../../README.md")] +#![warn(missing_docs)] + +// This mod MUST go first, so that the others see its macros. +pub(crate) mod fmt; + +// internal use +mod ring_buffer; + +pub mod blocking_mutex; +pub mod channel; +pub mod mutex; +pub mod pipe; +pub mod waitqueue; diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs new file mode 100644 index 00000000..75a6e8dd --- /dev/null +++ b/embassy-sync/src/mutex.rs @@ -0,0 +1,167 @@ +//! Async mutex. +//! +//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. +use core::cell::{RefCell, UnsafeCell}; +use core::ops::{Deref, DerefMut}; +use core::task::Poll; + +use futures_util::future::poll_fn; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex as BlockingMutex; +use crate::waitqueue::WakerRegistration; + +/// Error returned by [`Mutex::try_lock`] +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct TryLockError; + +struct State { + locked: bool, + waker: WakerRegistration, +} + +/// Async mutex. +/// +/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). +/// The raw mutex is used to guard access to the internal "is locked" flag. It +/// is held for very short periods only, while locking and unlocking. It is *not* held +/// for the entire time the async Mutex is locked. +/// +/// Which implementation you select depends on the context in which you're using the mutex. +/// +/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. +/// +/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. +/// +/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. +/// +pub struct Mutex +where + M: RawMutex, + T: ?Sized, +{ + state: BlockingMutex>, + inner: UnsafeCell, +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +/// Async mutex. +impl Mutex +where + M: RawMutex, +{ + /// Create a new mutex with the given value. + pub const fn new(value: T) -> Self { + Self { + inner: UnsafeCell::new(value), + state: BlockingMutex::new(RefCell::new(State { + locked: false, + waker: WakerRegistration::new(), + })), + } + } +} + +impl Mutex +where + M: RawMutex, + T: ?Sized, +{ + /// Lock the mutex. + /// + /// This will wait for the mutex to be unlocked if it's already locked. + pub async fn lock(&self) -> MutexGuard<'_, M, T> { + poll_fn(|cx| { + let ready = self.state.lock(|s| { + let mut s = s.borrow_mut(); + if s.locked { + s.waker.register(cx.waker()); + false + } else { + s.locked = true; + true + } + }); + + if ready { + Poll::Ready(MutexGuard { mutex: self }) + } else { + Poll::Pending + } + }) + .await + } + + /// Attempt to immediately lock the mutex. + /// + /// If the mutex is already locked, this will return an error instead of waiting. + pub fn try_lock(&self) -> Result, TryLockError> { + self.state.lock(|s| { + let mut s = s.borrow_mut(); + if s.locked { + Err(TryLockError) + } else { + s.locked = true; + Ok(()) + } + })?; + + Ok(MutexGuard { mutex: self }) + } +} + +/// Async mutex guard. +/// +/// Owning an instance of this type indicates having +/// successfully locked the mutex, and grants access to the contents. +/// +/// Dropping it unlocks the mutex. +pub struct MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + mutex: &'a Mutex, +} + +impl<'a, M, T> Drop for MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + fn drop(&mut self) { + self.mutex.state.lock(|s| { + let mut s = s.borrow_mut(); + s.locked = false; + s.waker.wake(); + }) + } +} + +impl<'a, M, T> Deref for MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + type Target = T; + fn deref(&self) -> &Self::Target { + // Safety: the MutexGuard represents exclusive access to the contents + // of the mutex, so it's OK to get it. + unsafe { &*(self.mutex.inner.get() as *const T) } + } +} + +impl<'a, M, T> DerefMut for MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + // Safety: the MutexGuard represents exclusive access to the contents + // of the mutex, so it's OK to get it. + unsafe { &mut *(self.mutex.inner.get()) } + } +} diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs new file mode 100644 index 00000000..7d64b648 --- /dev/null +++ b/embassy-sync/src/pipe.rs @@ -0,0 +1,551 @@ +//! Async byte stream pipe. + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::ring_buffer::RingBuffer; +use crate::waitqueue::WakerRegistration; + +/// Write-only access to a [`Pipe`]. +#[derive(Copy)] +pub struct Writer<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, +} + +impl<'p, M, const N: usize> Clone for Writer<'p, M, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Writer { pipe: self.pipe } + } +} + +impl<'p, M, const N: usize> Writer<'p, M, N> +where + M: RawMutex, +{ + /// Writes a value. + /// + /// See [`Pipe::write()`] + pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { + self.pipe.write(buf) + } + + /// Attempt to immediately write a message. + /// + /// See [`Pipe::write()`] + pub fn try_write(&self, buf: &[u8]) -> Result { + self.pipe.try_write(buf) + } +} + +/// Future returned by [`Pipe::write`] and [`Writer::write`]. +pub struct WriteFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, + buf: &'p [u8], +} + +impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.pipe.try_write_with_context(Some(cx), self.buf) { + Ok(n) => Poll::Ready(n), + Err(TryWriteError::Full) => Poll::Pending, + } + } +} + +impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} + +/// Read-only access to a [`Pipe`]. +#[derive(Copy)] +pub struct Reader<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, +} + +impl<'p, M, const N: usize> Clone for Reader<'p, M, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Reader { pipe: self.pipe } + } +} + +impl<'p, M, const N: usize> Reader<'p, M, N> +where + M: RawMutex, +{ + /// Reads a value. + /// + /// See [`Pipe::read()`] + pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { + self.pipe.read(buf) + } + + /// Attempt to immediately read a message. + /// + /// See [`Pipe::read()`] + pub fn try_read(&self, buf: &mut [u8]) -> Result { + self.pipe.try_read(buf) + } +} + +/// Future returned by [`Pipe::read`] and [`Reader::read`]. +pub struct ReadFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: &'p Pipe, + buf: &'p mut [u8], +} + +impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = usize; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.pipe.try_read_with_context(Some(cx), self.buf) { + Ok(n) => Poll::Ready(n), + Err(TryReadError::Empty) => Poll::Pending, + } + } +} + +impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} + +/// Error returned by [`try_read`](Pipe::try_read). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryReadError { + /// No data could be read from the pipe because it is currently + /// empty, and reading would require blocking. + Empty, +} + +/// Error returned by [`try_write`](Pipe::try_write). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryWriteError { + /// No data could be written to the pipe because it is + /// currently full, and writing would require blocking. + Full, +} + +struct PipeState { + buffer: RingBuffer, + read_waker: WakerRegistration, + write_waker: WakerRegistration, +} + +impl PipeState { + const fn new() -> Self { + PipeState { + buffer: RingBuffer::new(), + read_waker: WakerRegistration::new(), + write_waker: WakerRegistration::new(), + } + } + + fn clear(&mut self) { + self.buffer.clear(); + self.write_waker.wake(); + } + + fn try_read(&mut self, buf: &mut [u8]) -> Result { + self.try_read_with_context(None, buf) + } + + fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { + if self.buffer.is_full() { + self.write_waker.wake(); + } + + let available = self.buffer.pop_buf(); + if available.is_empty() { + if let Some(cx) = cx { + self.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + let n = available.len().min(buf.len()); + buf[..n].copy_from_slice(&available[..n]); + self.buffer.pop(n); + Ok(n) + } + + fn try_write(&mut self, buf: &[u8]) -> Result { + self.try_write_with_context(None, buf) + } + + fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { + if self.buffer.is_empty() { + self.read_waker.wake(); + } + + let available = self.buffer.push_buf(); + if available.is_empty() { + if let Some(cx) = cx { + self.write_waker.register(cx.waker()); + } + return Err(TryWriteError::Full); + } + + let n = available.len().min(buf.len()); + available[..n].copy_from_slice(&buf[..n]); + self.buffer.push(n); + Ok(n) + } +} + +/// A bounded pipe for communicating between asynchronous tasks +/// with backpressure. +/// +/// The pipe will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `write` new messages will wait until a message is +/// read from the pipe. +/// +/// All data written will become available in the same order as it was written. +pub struct Pipe +where + M: RawMutex, +{ + inner: Mutex>>, +} + +impl Pipe +where + M: RawMutex, +{ + /// Establish a new bounded pipe. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy_sync::pipe::Pipe; + /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded pipe, with a buffer of 256 bytes. + /// let mut pipe = Pipe::::new(); + /// ``` + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(PipeState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut PipeState) -> R) -> R { + self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + } + + fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { + self.lock(|c| c.try_read_with_context(cx, buf)) + } + + fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { + self.lock(|c| c.try_write_with_context(cx, buf)) + } + + /// Get a writer for this pipe. + pub fn writer(&self) -> Writer<'_, M, N> { + Writer { pipe: self } + } + + /// Get a reader for this pipe. + pub fn reader(&self) -> Reader<'_, M, N> { + Reader { pipe: self } + } + + /// Write a value, waiting until there is capacity. + /// + /// Writeing completes when the value has been pushed to the pipe's queue. + /// This doesn't mean the value has been read yet. + pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { + WriteFuture { pipe: self, buf } + } + + /// Attempt to immediately write a message. + /// + /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the pipe capacity has been reached, i.e., the pipe has `n` + /// buffered values where `n` is the argument passed to [`Pipe`], then an + /// error is returned. + pub fn try_write(&self, buf: &[u8]) -> Result { + self.lock(|c| c.try_write(buf)) + } + + /// Receive the next value. + /// + /// If there are no messages in the pipe's buffer, this method will + /// wait until a message is written. + pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { + ReadFuture { pipe: self, buf } + } + + /// Attempt to immediately read a message. + /// + /// This method will either read a message from the pipe immediately or return an error + /// if the pipe is empty. + pub fn try_read(&self, buf: &mut [u8]) -> Result { + self.lock(|c| c.try_read(buf)) + } + + /// Clear the data in the pipe's buffer. + pub fn clear(&self) { + self.lock(|c| c.clear()) + } + + /// Return whether the pipe is full (no free space in the buffer) + pub fn is_full(&self) -> bool { + self.len() == N + } + + /// Return whether the pipe is empty (no data buffered) + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Total byte capacity. + /// + /// This is the same as the `N` generic param. + pub fn capacity(&self) -> usize { + N + } + + /// Used byte capacity. + pub fn len(&self) -> usize { + self.lock(|c| c.buffer.len()) + } + + /// Free byte capacity. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + N - self.len() + } +} + +#[cfg(feature = "nightly")] +mod io_impls { + use core::convert::Infallible; + + use futures_util::FutureExt; + + use super::*; + + impl embedded_io::Io for Pipe { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for Pipe { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Pipe::read(self, buf).map(Ok) + } + } + + impl embedded_io::asynch::Write for Pipe { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Pipe::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } + + impl embedded_io::Io for &Pipe { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for &Pipe { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Pipe::read(self, buf).map(Ok) + } + } + + impl embedded_io::asynch::Write for &Pipe { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Pipe::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } + + impl embedded_io::Io for Reader<'_, M, N> { + type Error = Infallible; + } + + impl embedded_io::asynch::Read for Reader<'_, M, N> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + Reader::read(self, buf).map(Ok) + } + } + + impl embedded_io::Io for Writer<'_, M, N> { + type Error = Infallible; + } + + impl embedded_io::asynch::Write for Writer<'_, M, N> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + Writer::write(self, buf).map(Ok) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + futures_util::future::ready(Ok(())) + } + } +} + +#[cfg(test)] +mod tests { + use futures_executor::ThreadPool; + use futures_util::task::SpawnExt; + use static_cell::StaticCell; + + use super::*; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + + fn capacity(c: &PipeState) -> usize { + N - c.buffer.len() + } + + #[test] + fn writing_once() { + let mut c = PipeState::<3>::new(); + assert!(c.try_write(&[1]).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn writing_when_full() { + let mut c = PipeState::<3>::new(); + assert_eq!(c.try_write(&[42]), Ok(1)); + assert_eq!(c.try_write(&[43]), Ok(1)); + assert_eq!(c.try_write(&[44]), Ok(1)); + assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = PipeState::<3>::new(); + assert!(c.try_write(&[42]).is_ok()); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Ok(1)); + assert_eq!(buf[0], 42); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = PipeState::<3>::new(); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = Pipe::::new(); + assert!(c.try_write(&[42]).is_ok()); + let mut buf = [0; 16]; + assert_eq!(c.try_read(&mut buf), Ok(1)); + assert_eq!(buf[0], 42); + } + + #[test] + fn cloning() { + let c = Pipe::::new(); + let r1 = c.reader(); + let w1 = c.writer(); + + let _ = r1.clone(); + let _ = w1.clone(); + } + + #[futures_test::test] + async fn receiver_receives_given_try_write_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(Pipe::new()); + let c2 = c; + let f = async move { + assert_eq!(c2.try_write(&[42]), Ok(1)); + }; + executor.spawn(f).unwrap(); + let mut buf = [0; 16]; + assert_eq!(c.read(&mut buf).await, 1); + assert_eq!(buf[0], 42); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = Pipe::::new(); + c.write(&[42]).await; + let mut buf = [0; 16]; + assert_eq!(c.read(&mut buf).await, 1); + assert_eq!(buf[0], 42); + } +} diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs new file mode 100644 index 00000000..52108402 --- /dev/null +++ b/embassy-sync/src/ring_buffer.rs @@ -0,0 +1,146 @@ +pub struct RingBuffer { + buf: [u8; N], + start: usize, + end: usize, + empty: bool, +} + +impl RingBuffer { + pub const fn new() -> Self { + Self { + buf: [0; N], + start: 0, + end: 0, + empty: true, + } + } + + pub fn push_buf(&mut self) -> &mut [u8] { + if self.start == self.end && !self.empty { + trace!(" ringbuf: push_buf empty"); + return &mut self.buf[..0]; + } + + let n = if self.start <= self.end { + self.buf.len() - self.end + } else { + self.start - self.end + }; + + trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); + &mut self.buf[self.end..self.end + n] + } + + pub fn push(&mut self, n: usize) { + trace!(" ringbuf: push {:?}", n); + if n == 0 { + return; + } + + self.end = self.wrap(self.end + n); + self.empty = false; + } + + pub fn pop_buf(&mut self) -> &mut [u8] { + if self.empty { + trace!(" ringbuf: pop_buf empty"); + return &mut self.buf[..0]; + } + + let n = if self.end <= self.start { + self.buf.len() - self.start + } else { + self.end - self.start + }; + + trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); + &mut self.buf[self.start..self.start + n] + } + + pub fn pop(&mut self, n: usize) { + trace!(" ringbuf: pop {:?}", n); + if n == 0 { + return; + } + + self.start = self.wrap(self.start + n); + self.empty = self.start == self.end; + } + + pub fn is_full(&self) -> bool { + self.start == self.end && !self.empty + } + + pub fn is_empty(&self) -> bool { + self.empty + } + + #[allow(unused)] + pub fn len(&self) -> usize { + if self.empty { + 0 + } else if self.start < self.end { + self.end - self.start + } else { + N + self.end - self.start + } + } + + pub fn clear(&mut self) { + self.start = 0; + self.end = 0; + self.empty = true; + } + + fn wrap(&self, n: usize) -> usize { + assert!(n <= self.buf.len()); + if n == self.buf.len() { + 0 + } else { + n + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_pop() { + let mut rb: RingBuffer<4> = RingBuffer::new(); + let buf = rb.push_buf(); + assert_eq!(4, buf.len()); + buf[0] = 1; + buf[1] = 2; + buf[2] = 3; + buf[3] = 4; + rb.push(4); + + let buf = rb.pop_buf(); + assert_eq!(4, buf.len()); + assert_eq!(1, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(3, buf.len()); + assert_eq!(2, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(2, buf.len()); + assert_eq!(3, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(1, buf.len()); + assert_eq!(4, buf[0]); + rb.pop(1); + + let buf = rb.pop_buf(); + assert_eq!(0, buf.len()); + + let buf = rb.push_buf(); + assert_eq!(4, buf.len()); + } +} diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs new file mode 100644 index 00000000..6661a6b6 --- /dev/null +++ b/embassy-sync/src/waitqueue/mod.rs @@ -0,0 +1,7 @@ +//! Async low-level wait queues + +mod waker; +pub use waker::*; + +mod multi_waker; +pub use multi_waker::*; diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs new file mode 100644 index 00000000..325d2cb3 --- /dev/null +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -0,0 +1,33 @@ +use core::task::Waker; + +use super::WakerRegistration; + +/// Utility struct to register and wake multiple wakers. +pub struct MultiWakerRegistration { + wakers: [WakerRegistration; N], +} + +impl MultiWakerRegistration { + /// Create a new empty instance + pub const fn new() -> Self { + const WAKER: WakerRegistration = WakerRegistration::new(); + Self { wakers: [WAKER; N] } + } + + /// Register a waker. If the buffer is full the function returns it in the error + pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { + if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { + waker_slot.register(w); + Ok(()) + } else { + Err(w) + } + } + + /// Wake all registered wakers. This clears the buffer + pub fn wake(&mut self) { + for waker_slot in self.wakers.iter_mut() { + waker_slot.wake() + } + } +} diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs new file mode 100644 index 00000000..64e300eb --- /dev/null +++ b/embassy-sync/src/waitqueue/waker.rs @@ -0,0 +1,92 @@ +use core::cell::Cell; +use core::mem; +use core::task::Waker; + +use crate::blocking_mutex::raw::CriticalSectionRawMutex; +use crate::blocking_mutex::Mutex; + +/// Utility struct to register and wake a waker. +#[derive(Debug)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + /// Create a new `WakerRegistration`. + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake() + } + } + + /// Returns true if a waker is currently registered + pub fn occupied(&self) -> bool { + self.waker.is_some() + } +} + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: Mutex>>, +} + +impl AtomicWaker { + /// Create a new `AtomicWaker`. + pub const fn new() -> Self { + Self { + waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + cell.set(match cell.replace(None) { + Some(w2) if (w2.will_wake(w)) => Some(w2), + _ => Some(w.clone()), + }) + }) + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + if let Some(w) = cell.replace(None) { + w.wake_by_ref(); + cell.set(Some(w)); + } + }) + } +} diff --git a/embassy-usb-hid/Cargo.toml b/embassy-usb-hid/Cargo.toml index 5e9cfebf..73035148 100644 --- a/embassy-usb-hid/Cargo.toml +++ b/embassy-usb-hid/Cargo.toml @@ -14,7 +14,7 @@ default = ["usbd-hid"] usbd-hid = ["dep:usbd-hid", "ssmarshal"] [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-usb = { version = "0.1.0", path = "../embassy-usb" } defmt = { version = "0.3", optional = true } diff --git a/embassy-usb-ncm/Cargo.toml b/embassy-usb-ncm/Cargo.toml index 47c1f36b..15d3db96 100644 --- a/embassy-usb-ncm/Cargo.toml +++ b/embassy-usb-ncm/Cargo.toml @@ -10,7 +10,7 @@ features = ["defmt"] target = "thumbv7em-none-eabi" [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-usb = { version = "0.1.0", path = "../embassy-usb" } defmt = { version = "0.3", optional = true } diff --git a/embassy-usb-serial/Cargo.toml b/embassy-usb-serial/Cargo.toml index 63361047..9788588e 100644 --- a/embassy-usb-serial/Cargo.toml +++ b/embassy-usb-serial/Cargo.toml @@ -10,7 +10,7 @@ features = ["defmt"] target = "thumbv7em-none-eabi" [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../embassy-sync" } embassy-usb = { version = "0.1.0", path = "../embassy-usb" } defmt = { version = "0.3", optional = true } diff --git a/embassy-usb-serial/src/lib.rs b/embassy-usb-serial/src/lib.rs index e561be9d..f3de2ec1 100644 --- a/embassy-usb-serial/src/lib.rs +++ b/embassy-usb-serial/src/lib.rs @@ -9,11 +9,11 @@ use core::cell::Cell; use core::mem::{self, MaybeUninit}; use core::sync::atomic::{AtomicBool, Ordering}; +use embassy_sync::blocking_mutex::CriticalSectionMutex; use embassy_usb::control::{self, ControlHandler, InResponse, OutResponse, Request}; use embassy_usb::driver::{Driver, Endpoint, EndpointError, EndpointIn, EndpointOut}; use embassy_usb::types::*; use embassy_usb::Builder; -use embassy_util::blocking_mutex::CriticalSectionMutex; /// This should be used as `device_class` when building the `UsbDevice`. pub const USB_CLASS_CDC: u8 = 0x02; diff --git a/embassy-usb/Cargo.toml b/embassy-usb/Cargo.toml index 6adbd399..8cad4d31 100644 --- a/embassy-usb/Cargo.toml +++ b/embassy-usb/Cargo.toml @@ -10,7 +10,7 @@ features = ["defmt"] target = "thumbv7em-none-eabi" [dependencies] -embassy-util = { version = "0.1.0", path = "../embassy-util" } +embassy-futures = { version = "0.1.0", path = "../embassy-futures" } defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } diff --git a/embassy-usb/src/lib.rs b/embassy-usb/src/lib.rs index 3f6e1347..5a3f8ba8 100644 --- a/embassy-usb/src/lib.rs +++ b/embassy-usb/src/lib.rs @@ -12,7 +12,7 @@ mod descriptor_reader; pub mod driver; pub mod types; -use embassy_util::{select, Either}; +use embassy_futures::{select, Either}; use heapless::Vec; pub use self::builder::{Builder, Config}; diff --git a/embassy-util/Cargo.toml b/embassy-util/Cargo.toml deleted file mode 100644 index b54a58b4..00000000 --- a/embassy-util/Cargo.toml +++ /dev/null @@ -1,34 +0,0 @@ -[package] -name = "embassy-util" -version = "0.1.0" -edition = "2021" - -[package.metadata.embassy_docs] -src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/" -src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/" -features = ["nightly"] -target = "thumbv7em-none-eabi" - -[features] -nightly = ["embedded-io/async"] - -[dependencies] -defmt = { version = "0.3", optional = true } -log = { version = "0.4.14", optional = true } - -futures-util = { version = "0.3.17", default-features = false } -atomic-polyfill = "1.0.1" -critical-section = "1.1" -heapless = "0.7.5" -cfg-if = "1.0.0" -embedded-io = "0.3.0" - -[dev-dependencies] -futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } -futures-test = "0.3.17" -futures-timer = "3.0.2" -futures-util = { version = "0.3.17", features = [ "channel" ] } - -# Enable critical-section implementation for std, for tests -critical-section = { version = "1.1", features = ["std"] } -static_cell = "1.0" diff --git a/embassy-util/build.rs b/embassy-util/build.rs deleted file mode 100644 index 6fe82b44..00000000 --- a/embassy-util/build.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::env; - -fn main() { - let target = env::var("TARGET").unwrap(); - - if target.starts_with("thumbv6m-") { - println!("cargo:rustc-cfg=cortex_m"); - println!("cargo:rustc-cfg=armv6m"); - } else if target.starts_with("thumbv7m-") { - println!("cargo:rustc-cfg=cortex_m"); - println!("cargo:rustc-cfg=armv7m"); - } else if target.starts_with("thumbv7em-") { - println!("cargo:rustc-cfg=cortex_m"); - println!("cargo:rustc-cfg=armv7m"); - println!("cargo:rustc-cfg=armv7em"); // (not currently used) - } else if target.starts_with("thumbv8m.base") { - println!("cargo:rustc-cfg=cortex_m"); - println!("cargo:rustc-cfg=armv8m"); - println!("cargo:rustc-cfg=armv8m_base"); - } else if target.starts_with("thumbv8m.main") { - println!("cargo:rustc-cfg=cortex_m"); - println!("cargo:rustc-cfg=armv8m"); - println!("cargo:rustc-cfg=armv8m_main"); - } - - if target.ends_with("-eabihf") { - println!("cargo:rustc-cfg=has_fpu"); - } -} diff --git a/embassy-util/src/blocking_mutex/mod.rs b/embassy-util/src/blocking_mutex/mod.rs deleted file mode 100644 index 8a4a4c64..00000000 --- a/embassy-util/src/blocking_mutex/mod.rs +++ /dev/null @@ -1,189 +0,0 @@ -//! Blocking mutex. -//! -//! This module provides a blocking mutex that can be used to synchronize data. -pub mod raw; - -use core::cell::UnsafeCell; - -use self::raw::RawMutex; - -/// Blocking mutex (not async) -/// -/// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`]. -/// -/// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind -/// of interior mutability fits your use case. -/// -/// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts. -/// -/// Use [`NoopMutex`] when data is only shared between tasks running on the same executor. -/// -/// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton. -/// -/// In all cases, the blocking mutex is intended to be short lived and not held across await points. -/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. -pub struct Mutex { - // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets - // to run BEFORE dropping `data`. - raw: R, - data: UnsafeCell, -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -impl Mutex { - /// Creates a new mutex in an unlocked state ready for use. - #[inline] - pub const fn new(val: T) -> Mutex { - Mutex { - raw: R::INIT, - data: UnsafeCell::new(val), - } - } - - /// Creates a critical section and grants temporary access to the protected data. - pub fn lock(&self, f: impl FnOnce(&T) -> U) -> U { - self.raw.lock(|| { - let ptr = self.data.get() as *const T; - let inner = unsafe { &*ptr }; - f(inner) - }) - } -} - -impl Mutex { - /// Creates a new mutex based on a pre-existing raw mutex. - /// - /// This allows creating a mutex in a constant context on stable Rust. - #[inline] - pub const fn const_new(raw_mutex: R, val: T) -> Mutex { - Mutex { - raw: raw_mutex, - data: UnsafeCell::new(val), - } - } - - /// Consumes this mutex, returning the underlying data. - #[inline] - pub fn into_inner(self) -> T { - self.data.into_inner() - } - - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the `Mutex` mutably, no actual locking needs to - /// take place---the mutable borrow statically guarantees no locks exist. - #[inline] - pub fn get_mut(&mut self) -> &mut T { - unsafe { &mut *self.data.get() } - } -} - -/// A mutex that allows borrowing data across executors and interrupts. -/// -/// # Safety -/// -/// This mutex is safe to share between different executors and interrupts. -pub type CriticalSectionMutex = Mutex; - -/// A mutex that allows borrowing data in the context of a single executor. -/// -/// # Safety -/// -/// **This Mutex is only safe within a single executor.** -pub type NoopMutex = Mutex; - -impl Mutex { - /// Borrows the data for the duration of the critical section - pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T { - let ptr = self.data.get() as *const T; - unsafe { &*ptr } - } -} - -impl Mutex { - /// Borrows the data - pub fn borrow(&self) -> &T { - let ptr = self.data.get() as *const T; - unsafe { &*ptr } - } -} - -// ThreadModeMutex does NOT use the generic mutex from above because it's special: -// it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?). -// -// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), -// but that will require T: Send even though it shouldn't be needed. - -#[cfg(any(cortex_m, feature = "std"))] -pub use thread_mode_mutex::*; -#[cfg(any(cortex_m, feature = "std"))] -mod thread_mode_mutex { - use super::*; - - /// A "mutex" that only allows borrowing from thread mode. - /// - /// # Safety - /// - /// **This Mutex is only safe on single-core systems.** - /// - /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. - pub struct ThreadModeMutex { - inner: UnsafeCell, - } - - // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. - // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can - // be Send+Sync even if T is not Send (unlike CriticalSectionMutex) - unsafe impl Sync for ThreadModeMutex {} - unsafe impl Send for ThreadModeMutex {} - - impl ThreadModeMutex { - /// Creates a new mutex - pub const fn new(value: T) -> Self { - ThreadModeMutex { - inner: UnsafeCell::new(value), - } - } - } - - impl ThreadModeMutex { - /// Lock the `ThreadModeMutex`, granting access to the data. - /// - /// # Panics - /// - /// This will panic if not currently running in thread mode. - pub fn lock(&self, f: impl FnOnce(&T) -> R) -> R { - f(self.borrow()) - } - - /// Borrows the data - /// - /// # Panics - /// - /// This will panic if not currently running in thread mode. - pub fn borrow(&self) -> &T { - assert!( - raw::in_thread_mode(), - "ThreadModeMutex can only be borrowed from thread mode." - ); - unsafe { &*self.inner.get() } - } - } - - impl Drop for ThreadModeMutex { - fn drop(&mut self) { - // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so - // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if - // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, - // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. - assert!( - raw::in_thread_mode(), - "ThreadModeMutex can only be dropped from thread mode." - ); - - // Drop of the inner `T` happens after this. - } - } -} diff --git a/embassy-util/src/blocking_mutex/raw.rs b/embassy-util/src/blocking_mutex/raw.rs deleted file mode 100644 index 15796f1b..00000000 --- a/embassy-util/src/blocking_mutex/raw.rs +++ /dev/null @@ -1,149 +0,0 @@ -//! Mutex primitives. -//! -//! This module provides a trait for mutexes that can be used in different contexts. -use core::marker::PhantomData; - -/// Raw mutex trait. -/// -/// This mutex is "raw", which means it does not actually contain the protected data, it -/// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead, -/// which is generic over a RawMutex and contains the protected data. -/// -/// Note that, unlike other mutexes, implementations only guarantee no -/// concurrent access from other threads: concurrent access from the current -/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly. -/// -/// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access -/// to the data, it is not enough to guarantee exclusive (`&mut`) access. -/// -/// # Safety -/// -/// RawMutex implementations must ensure that, while locked, no other thread can lock -/// the RawMutex concurrently. -/// -/// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior. -pub unsafe trait RawMutex { - /// Create a new `RawMutex` instance. - /// - /// This is a const instead of a method to allow creating instances in const context. - const INIT: Self; - - /// Lock this `RawMutex`. - fn lock(&self, f: impl FnOnce() -> R) -> R; -} - -/// A mutex that allows borrowing data across executors and interrupts. -/// -/// # Safety -/// -/// This mutex is safe to share between different executors and interrupts. -pub struct CriticalSectionRawMutex { - _phantom: PhantomData<()>, -} -unsafe impl Send for CriticalSectionRawMutex {} -unsafe impl Sync for CriticalSectionRawMutex {} - -impl CriticalSectionRawMutex { - /// Create a new `CriticalSectionRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } -} - -unsafe impl RawMutex for CriticalSectionRawMutex { - const INIT: Self = Self::new(); - - fn lock(&self, f: impl FnOnce() -> R) -> R { - critical_section::with(|_| f()) - } -} - -// ================ - -/// A mutex that allows borrowing data in the context of a single executor. -/// -/// # Safety -/// -/// **This Mutex is only safe within a single executor.** -pub struct NoopRawMutex { - _phantom: PhantomData<*mut ()>, -} - -unsafe impl Send for NoopRawMutex {} - -impl NoopRawMutex { - /// Create a new `NoopRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } -} - -unsafe impl RawMutex for NoopRawMutex { - const INIT: Self = Self::new(); - fn lock(&self, f: impl FnOnce() -> R) -> R { - f() - } -} - -// ================ - -#[cfg(any(cortex_m, feature = "std"))] -mod thread_mode { - use super::*; - - /// A "mutex" that only allows borrowing from thread mode. - /// - /// # Safety - /// - /// **This Mutex is only safe on single-core systems.** - /// - /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access. - pub struct ThreadModeRawMutex { - _phantom: PhantomData<()>, - } - - unsafe impl Send for ThreadModeRawMutex {} - unsafe impl Sync for ThreadModeRawMutex {} - - impl ThreadModeRawMutex { - /// Create a new `ThreadModeRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } - } - - unsafe impl RawMutex for ThreadModeRawMutex { - const INIT: Self = Self::new(); - fn lock(&self, f: impl FnOnce() -> R) -> R { - assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode."); - - f() - } - } - - impl Drop for ThreadModeRawMutex { - fn drop(&mut self) { - // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so - // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if - // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, - // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. - assert!( - in_thread_mode(), - "ThreadModeMutex can only be dropped from thread mode." - ); - - // Drop of the inner `T` happens after this. - } - } - - pub(crate) fn in_thread_mode() -> bool { - #[cfg(feature = "std")] - return Some("main") == std::thread::current().name(); - - #[cfg(not(feature = "std"))] - // ICSR.VECTACTIVE == 0 - return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; - } -} -#[cfg(any(cortex_m, feature = "std"))] -pub use thread_mode::*; diff --git a/embassy-util/src/channel/mod.rs b/embassy-util/src/channel/mod.rs deleted file mode 100644 index 5df1f5c5..00000000 --- a/embassy-util/src/channel/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Async channels - -pub mod mpmc; -pub mod pubsub; -pub mod signal; diff --git a/embassy-util/src/channel/mpmc.rs b/embassy-util/src/channel/mpmc.rs deleted file mode 100644 index 263f29bf..00000000 --- a/embassy-util/src/channel/mpmc.rs +++ /dev/null @@ -1,596 +0,0 @@ -//! A queue for sending values between asynchronous tasks. -//! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. -//! -//! This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. -//! - -use core::cell::RefCell; -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use heapless::Deque; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; - -/// Send-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Sender<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Sender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> -where - M: RawMutex, -{ - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { - self.channel.send(message) - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send(message) - } -} - -/// Send-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicSender<'ch, T> { - fn clone(&self) -> Self { - DynamicSender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, T> -where - M: RawMutex, -{ - fn from(s: Sender<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -impl<'ch, T> DynamicSender<'ch, T> { - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { - DynamicSendFuture { - channel: self.channel, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send_with_context(message, None) - } -} - -/// Receive-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Receiver<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Receiver { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - self.channel.recv() - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv() - } -} - -/// Receive-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicReceiver<'ch, T> { - fn clone(&self) -> Self { - DynamicReceiver { channel: self.channel } - } -} - -impl<'ch, T> DynamicReceiver<'ch, T> { - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> DynamicRecvFuture<'_, T> { - DynamicRecvFuture { channel: self.channel } - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv_with_context(None) - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> -where - M: RawMutex, -{ - fn from(s: Receiver<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. -pub struct RecvFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`DynamicReceiver::recv`]. -pub struct DynamicRecvFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`Channel::send`] and [`Sender::send`]. -pub struct SendFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, - message: Option, -} - -impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} - -/// Future returned by [`DynamicSender::send`]. -pub struct DynamicSendFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, - message: Option, -} - -impl<'ch, T> Future for DynamicSendFuture<'ch, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} - -trait DynamicChannel { - fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; -} - -/// Error returned by [`try_recv`](Channel::try_recv). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { - /// A message could not be received because the channel is empty. - Empty, -} - -/// Error returned by [`try_send`](Channel::try_send). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), -} - -struct ChannelState { - queue: Deque, - receiver_waker: WakerRegistration, - senders_waker: WakerRegistration, -} - -impl ChannelState { - const fn new() -> Self { - ChannelState { - queue: Deque::new(), - receiver_waker: WakerRegistration::new(), - senders_waker: WakerRegistration::new(), - } - } - - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) - } - - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { - if self.queue.is_full() { - self.senders_waker.wake(); - } - - if let Some(message) = self.queue.pop_front() { - Ok(message) - } else { - if let Some(cx) = cx { - self.receiver_waker.register(cx.waker()); - } - Err(TryRecvError::Empty) - } - } - - fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.try_send_with_context(message, None) - } - - fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - match self.queue.push_back(message) { - Ok(()) => { - self.receiver_waker.wake(); - Ok(()) - } - Err(message) => { - if let Some(cx) = cx { - self.senders_waker.register(cx.waker()); - } - Err(TrySendError::Full(message)) - } - } - } -} - -/// A bounded channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -pub struct Channel -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Channel -where - M: RawMutex, -{ - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy_util::channel::mpmc::Channel; - /// use embassy_util::blocking_mutex::raw::NoopRawMutex; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// ``` - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(ChannelState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - self.lock(|c| c.try_recv_with_context(cx)) - } - - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - self.lock(|c| c.try_send_with_context(m, cx)) - } - - /// Get a sender for this channel. - pub fn sender(&self) -> Sender<'_, M, T, N> { - Sender { channel: self } - } - - /// Get a receiver for this channel. - pub fn receiver(&self) -> Receiver<'_, M, T, N> { - Receiver { channel: self } - } - - /// Send a value, waiting until there is capacity. - /// - /// Sending completes when the value has been pushed to the channel's queue. - /// This doesn't mean the value has been received yet. - pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { - SendFuture { - channel: self, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// This method differs from [`send`](Channel::send) by returning immediately if the channel's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the channel capacity has been reached, i.e., the channel has `n` - /// buffered values where `n` is the argument passed to [`Channel`], then an - /// error is returned. - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.lock(|c| c.try_send(message)) - } - - /// Receive the next value. - /// - /// If there are no messages in the channel's buffer, this method will - /// wait until a message is sent. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - RecvFuture { channel: self } - } - - /// Attempt to immediately receive a message. - /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. - pub fn try_recv(&self) -> Result { - self.lock(|c| c.try_recv()) - } -} - -/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the -/// tradeoff cost of dynamic dispatch. -impl DynamicChannel for Channel -where - M: RawMutex, -{ - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - Channel::try_send_with_context(self, m, cx) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - Channel::try_recv_with_context(self, cx) - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use futures_executor::ThreadPool; - use futures_timer::Delay; - use futures_util::task::SpawnExt; - use static_cell::StaticCell; - - use super::*; - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - - fn capacity(c: &ChannelState) -> usize { - c.queue.capacity() - c.queue.len() - } - - #[test] - fn sending_once() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn sending_when_full() { - let mut c = ChannelState::::new(); - let _ = c.try_send(1); - let _ = c.try_send(1); - let _ = c.try_send(1); - match c.try_send(2) { - Err(TrySendError::Full(2)) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 0); - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 3); - } - - #[test] - fn simple_send_and_receive() { - let c = Channel::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - } - - #[test] - fn cloning() { - let c = Channel::::new(); - let r1 = c.receiver(); - let s1 = c.sender(); - - let _ = r1.clone(); - let _ = s1.clone(); - } - - #[test] - fn dynamic_dispatch() { - let c = Channel::::new(); - let s: DynamicSender<'_, u32> = c.sender().into(); - let r: DynamicReceiver<'_, u32> = c.receiver().into(); - - assert!(s.try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); - } - - #[futures_test::test] - async fn receiver_receives_given_try_send_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - let c2 = c; - assert!(executor - .spawn(async move { - assert!(c2.try_send(1).is_ok()); - }) - .is_ok()); - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let c = Channel::::new(); - c.send(1).await; - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn senders_sends_wait_until_capacity() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - assert!(c.try_send(1).is_ok()); - - let c2 = c; - let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); - let c2 = c; - let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); - // Wish I could think of a means of determining that the async send is waiting instead. - // However, I've used the debugger to observe that the send does indeed wait. - Delay::new(Duration::from_millis(500)).await; - assert_eq!(c.recv().await, 1); - assert!(executor - .spawn(async move { - loop { - c.recv().await; - } - }) - .is_ok()); - send_task_1.unwrap().await; - send_task_2.unwrap().await; - } -} diff --git a/embassy-util/src/channel/pubsub/mod.rs b/embassy-util/src/channel/pubsub/mod.rs deleted file mode 100644 index ecc8fbd8..00000000 --- a/embassy-util/src/channel/pubsub/mod.rs +++ /dev/null @@ -1,542 +0,0 @@ -//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. - -#![deny(missing_docs)] - -use core::cell::RefCell; -use core::fmt::Debug; -use core::task::{Context, Poll, Waker}; - -use heapless::Deque; - -use self::publisher::{ImmediatePub, Pub}; -use self::subscriber::Sub; -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::MultiWakerRegistration; - -pub mod publisher; -pub mod subscriber; - -pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; -pub use subscriber::{DynSubscriber, Subscriber}; - -/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers -/// -/// Any published message can be read by all subscribers. -/// A publisher can choose how it sends its message. -/// -/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. -/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message -/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive -/// an error to indicate that it has lagged. -/// -/// ## Example -/// -/// ``` -/// # use embassy_util::blocking_mutex::raw::NoopRawMutex; -/// # use embassy_util::channel::pubsub::WaitResult; -/// # use embassy_util::channel::pubsub::PubSubChannel; -/// # use futures_executor::block_on; -/// # let test = async { -/// // Create the channel. This can be static as well -/// let channel = PubSubChannel::::new(); -/// -/// // This is a generic subscriber with a direct reference to the channel -/// let mut sub0 = channel.subscriber().unwrap(); -/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel -/// let mut sub1 = channel.dyn_subscriber().unwrap(); -/// -/// let pub0 = channel.publisher().unwrap(); -/// -/// // Publish a message, but wait if the queue is full -/// pub0.publish(42).await; -/// -/// // Publish a message, but if the queue is full, just kick out the oldest message. -/// // This may cause some subscribers to miss a message -/// pub0.publish_immediate(43); -/// -/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result -/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); -/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); -/// -/// // Wait again, but this time ignore any Lag results -/// assert_eq!(sub0.next_message_pure().await, 43); -/// assert_eq!(sub1.next_message_pure().await, 43); -/// -/// // There's also a polling interface -/// assert_eq!(sub0.try_next_message(), None); -/// assert_eq!(sub1.try_next_message(), None); -/// # }; -/// # -/// # block_on(test); -/// ``` -/// -pub struct PubSubChannel { - inner: Mutex>>, -} - -impl - PubSubChannel -{ - /// Create a new channel - pub const fn new() -> Self { - Self { - inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), - } - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(Subscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn dyn_subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(DynSubscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(Publisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn dyn_publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(DynPublisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn immediate_publisher(&self) -> ImmediatePublisher { - ImmediatePublisher(ImmediatePub::new(self)) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { - DynImmediatePublisher(ImmediatePub::new(self)) - } -} - -impl PubSubBehavior - for PubSubChannel -{ - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - - // Check if we can read a message - match s.get_message(*next_message_id) { - // Yes, so we are done polling - Some(WaitResult::Message(message)) => { - *next_message_id += 1; - Poll::Ready(WaitResult::Message(message)) - } - // No, so we need to reregister our waker and sleep again - None => { - if let Some(cx) = cx { - s.register_subscriber_waker(cx.waker()); - } - Poll::Pending - } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged - Some(WaitResult::Lagged(amount)) => { - *next_message_id += amount; - Poll::Ready(WaitResult::Lagged(amount)) - } - } - }) - } - - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - // Try to publish the message - match s.try_publish(message) { - // We did it, we are ready - Ok(()) => Ok(()), - // The queue is full, so we need to reregister our waker and go to sleep - Err(message) => { - if let Some(cx) = cx { - s.register_publisher_waker(cx.waker()); - } - Err(message) - } - } - }) - } - - fn publish_immediate(&self, message: T) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.publish_immediate(message) - }) - } - - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_subscriber(subscriber_next_message_id) - }) - } - - fn unregister_publisher(&self) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_publisher() - }) - } -} - -/// Internal state for the PubSub channel -struct PubSubState { - /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it - queue: Deque<(T, usize), CAP>, - /// Every message has an id. - /// Don't worry, we won't run out. - /// If a million messages were published every second, then the ID's would run out in about 584942 years. - next_message_id: u64, - /// Collection of wakers for Subscribers that are waiting. - subscriber_wakers: MultiWakerRegistration, - /// Collection of wakers for Publishers that are waiting. - publisher_wakers: MultiWakerRegistration, - /// The amount of subscribers that are active - subscriber_count: usize, - /// The amount of publishers that are active - publisher_count: usize, -} - -impl PubSubState { - /// Create a new internal channel state - const fn new() -> Self { - Self { - queue: Deque::new(), - next_message_id: 0, - subscriber_wakers: MultiWakerRegistration::new(), - publisher_wakers: MultiWakerRegistration::new(), - subscriber_count: 0, - publisher_count: 0, - } - } - - fn try_publish(&mut self, message: T) -> Result<(), T> { - if self.subscriber_count == 0 { - // We don't need to publish anything because there is no one to receive it - return Ok(()); - } - - if self.queue.is_full() { - return Err(message); - } - // We just did a check for this - self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); - - self.next_message_id += 1; - - // Wake all of the subscribers - self.subscriber_wakers.wake(); - - Ok(()) - } - - fn publish_immediate(&mut self, message: T) { - // Make space in the queue if required - if self.queue.is_full() { - self.queue.pop_front(); - } - - // This will succeed because we made sure there is space - self.try_publish(message).ok().unwrap(); - } - - fn get_message(&mut self, message_id: u64) -> Option> { - let start_id = self.next_message_id - self.queue.len() as u64; - - if message_id < start_id { - return Some(WaitResult::Lagged(start_id - message_id)); - } - - let current_message_index = (message_id - start_id) as usize; - - if current_message_index >= self.queue.len() { - return None; - } - - // We've checked that the index is valid - let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); - - // We're reading this item, so decrement the counter - queue_item.1 -= 1; - let message = queue_item.0.clone(); - - if current_message_index == 0 && queue_item.1 == 0 { - self.queue.pop_front(); - self.publisher_wakers.wake(); - } - - Some(WaitResult::Message(message)) - } - - fn register_subscriber_waker(&mut self, waker: &Waker) { - match self.subscriber_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a subscriber that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.subscriber_wakers.wake(); - self.subscriber_wakers.register(waker).unwrap(); - } - } - } - - fn register_publisher_waker(&mut self, waker: &Waker) { - match self.publisher_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a publisher that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.publisher_wakers.wake(); - self.publisher_wakers.register(waker).unwrap(); - } - } - } - - fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { - self.subscriber_count -= 1; - - // All messages that haven't been read yet by this subscriber must have their counter decremented - let start_id = self.next_message_id - self.queue.len() as u64; - if subscriber_next_message_id >= start_id { - let current_message_index = (subscriber_next_message_id - start_id) as usize; - self.queue - .iter_mut() - .skip(current_message_index) - .for_each(|(_, counter)| *counter -= 1); - } - } - - fn unregister_publisher(&mut self) { - self.publisher_count -= 1; - } -} - -/// Error type for the [PubSubChannel] -#[derive(Debug, PartialEq, Eq, Clone)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum Error { - /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or - /// the capacity of the channels must be increased. - MaximumSubscribersReached, - /// All publisher slots are used. To add another publisher, first another publisher must be dropped or - /// the capacity of the channels must be increased. - MaximumPublishersReached, -} - -/// 'Middle level' behaviour of the pubsub channel. -/// This trait is used so that Sub and Pub can be generic over the channel. -pub trait PubSubBehavior { - /// Try to get a message from the queue with the given message id. - /// - /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; - - /// Try to publish a message to the queue. - /// - /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; - - /// Publish a message immediately - fn publish_immediate(&self, message: T); - - /// Let the channel know that a subscriber has dropped - fn unregister_subscriber(&self, subscriber_next_message_id: u64); - - /// Let the channel know that a publisher has dropped - fn unregister_publisher(&self); -} - -/// The result of the subscriber wait procedure -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum WaitResult { - /// The subscriber did not receive all messages and lagged by the given amount of messages. - /// (This is the amount of messages that were missed) - Lagged(u64), - /// A message was received - Message(T), -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::blocking_mutex::raw::NoopRawMutex; - - #[futures_test::test] - async fn dyn_pub_sub_works() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.dyn_subscriber().unwrap(); - let mut sub1 = channel.dyn_subscriber().unwrap(); - let pub0 = channel.dyn_publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn all_subscribers_receive() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let mut sub1 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn lag_when_queue_full_on_immediate_publish() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish_immediate(42); - pub0.publish_immediate(43); - pub0.publish_immediate(44); - pub0.publish_immediate(45); - pub0.publish_immediate(46); - pub0.publish_immediate(47); - - assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); - assert_eq!(sub0.next_message().await, WaitResult::Message(44)); - assert_eq!(sub0.next_message().await, WaitResult::Message(45)); - assert_eq!(sub0.next_message().await, WaitResult::Message(46)); - assert_eq!(sub0.next_message().await, WaitResult::Message(47)); - assert_eq!(sub0.try_next_message(), None); - } - - #[test] - fn limited_subs_and_pubs() { - let channel = PubSubChannel::::new(); - - let sub0 = channel.subscriber(); - let sub1 = channel.subscriber(); - let sub2 = channel.subscriber(); - let sub3 = channel.subscriber(); - let sub4 = channel.subscriber(); - - assert!(sub0.is_ok()); - assert!(sub1.is_ok()); - assert!(sub2.is_ok()); - assert!(sub3.is_ok()); - assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); - - drop(sub0); - - let sub5 = channel.subscriber(); - assert!(sub5.is_ok()); - - // publishers - - let pub0 = channel.publisher(); - let pub1 = channel.publisher(); - let pub2 = channel.publisher(); - let pub3 = channel.publisher(); - let pub4 = channel.publisher(); - - assert!(pub0.is_ok()); - assert!(pub1.is_ok()); - assert!(pub2.is_ok()); - assert!(pub3.is_ok()); - assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); - - drop(pub0); - - let pub5 = channel.publisher(); - assert!(pub5.is_ok()); - } - - #[test] - fn publisher_wait_on_full_queue() { - let channel = PubSubChannel::::new(); - - let pub0 = channel.publisher().unwrap(); - - // There are no subscribers, so the queue will never be full - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - - let sub0 = channel.subscriber().unwrap(); - - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Err(0)); - - drop(sub0); - } -} diff --git a/embassy-util/src/channel/pubsub/publisher.rs b/embassy-util/src/channel/pubsub/publisher.rs deleted file mode 100644 index 705797f6..00000000 --- a/embassy-util/src/channel/pubsub/publisher.rs +++ /dev/null @@ -1,182 +0,0 @@ -//! Implementation of anything directly publisher related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel}; -use crate::blocking_mutex::raw::RawMutex; - -/// A publisher to a channel -pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - - /// Publish a message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { - PublisherWaitFuture { - message: Some(message), - publisher: self, - } - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_publisher() - } -} - -/// A publisher that holds a dynamic reference to the channel -pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynPublisher<'a, T> { - type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that holds a generic reference to the channel -pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Pub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Pub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - /// Publish the message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -/// An immediate publisher that holds a dynamic reference to the channel -pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { - type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// An immediate publisher that holds a generic reference to the channel -pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) ImmediatePub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = ImmediatePub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message we need to publish - message: Option, - publisher: &'s Pub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let message = self.message.take().unwrap(); - match self.publisher.channel.publish_with_context(message, Some(cx)) { - Ok(()) => Poll::Ready(()), - Err(message) => { - self.message = Some(message); - Poll::Pending - } - } - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-util/src/channel/pubsub/subscriber.rs b/embassy-util/src/channel/pubsub/subscriber.rs deleted file mode 100644 index b9a2cbe1..00000000 --- a/embassy-util/src/channel/pubsub/subscriber.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! Implementation of anything directly subscriber related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel, WaitResult}; -use crate::blocking_mutex::raw::RawMutex; - -/// A subscriber to a channel -pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message id of the next message we are yet to receive - next_message_id: u64, - /// The channel we are a subscriber to - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { - pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { - Self { - next_message_id, - channel, - _phantom: Default::default(), - } - } - - /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } - } - - /// Wait for a published message (ignoring lag results) - pub async fn next_message_pure(&mut self) -> T { - loop { - match self.next_message().await { - WaitResult::Lagged(_) => continue, - WaitResult::Message(message) => break message, - } - } - } - - /// Try to see if there's a published message we haven't received yet. - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message(&mut self) -> Option> { - match self.channel.get_message_with_context(&mut self.next_message_id, None) { - Poll::Ready(result) => Some(result), - Poll::Pending => None, - } - } - - /// Try to see if there's a published message we haven't received yet (ignoring lag results). - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message_pure(&mut self) -> Option { - loop { - match self.try_next_message() { - Some(WaitResult::Lagged(_)) => continue, - Some(WaitResult::Message(message)) => break Some(message), - None => break None, - } - } - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_subscriber(self.next_message_id) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} - -/// Warning: The stream implementation ignores lag results and returns all messages. -/// This might miss some messages without you knowing it. -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self - .channel - .get_message_with_context(&mut self.next_message_id, Some(cx)) - { - Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), - Poll::Ready(WaitResult::Lagged(_)) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - } -} - -/// A subscriber that holds a dynamic reference to the channel -pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { - type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A subscriber that holds a generic reference to the channel -pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Sub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Sub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - subscriber: &'s mut Sub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { - type Output = WaitResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.subscriber - .channel - .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-util/src/channel/signal.rs b/embassy-util/src/channel/signal.rs deleted file mode 100644 index 05889f5a..00000000 --- a/embassy-util/src/channel/signal.rs +++ /dev/null @@ -1,100 +0,0 @@ -//! A synchronization primitive for passing the latest value to a task. -use core::cell::UnsafeCell; -use core::future::Future; -use core::mem; -use core::task::{Context, Poll, Waker}; - -/// Single-slot signaling primitive. -/// -/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except -/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead -/// of waiting for the receiver to pop the previous value. -/// -/// It is useful for sending data between tasks when the receiver only cares about -/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" -/// updates. -/// -/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. -/// -/// Signals are generally declared as `static`s and then borrowed as required. -/// -/// ``` -/// use embassy_util::channel::signal::Signal; -/// -/// enum SomeCommand { -/// On, -/// Off, -/// } -/// -/// static SOME_SIGNAL: Signal = Signal::new(); -/// ``` -pub struct Signal { - state: UnsafeCell>, -} - -enum State { - None, - Waiting(Waker), - Signaled(T), -} - -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { - /// Create a new `Signal`. - pub const fn new() -> Self { - Self { - state: UnsafeCell::new(State::None), - } - } -} - -impl Signal { - /// Mark this Signal as signaled. - pub fn signal(&self, val: T) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { - waker.wake(); - } - }) - } - - /// Remove the queued value in this `Signal`, if any. - pub fn reset(&self) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - *state = State::None - }) - } - - /// Manually poll the Signal future. - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - match state { - State::None => { - *state = State::Waiting(cx.waker().clone()); - Poll::Pending - } - State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, - State::Waiting(_) => panic!("waker overflow"), - State::Signaled(_) => match mem::replace(state, State::None) { - State::Signaled(res) => Poll::Ready(res), - _ => unreachable!(), - }, - } - }) - } - - /// Future that completes when this Signal has been signaled. - pub fn wait(&self) -> impl Future + '_ { - futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) - } - - /// non-blocking method to check whether this signal has been signaled. - pub fn signaled(&self) -> bool { - critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) - } -} diff --git a/embassy-util/src/fmt.rs b/embassy-util/src/fmt.rs deleted file mode 100644 index f8bb0a03..00000000 --- a/embassy-util/src/fmt.rs +++ /dev/null @@ -1,228 +0,0 @@ -#![macro_use] -#![allow(unused_macros)] - -#[cfg(all(feature = "defmt", feature = "log"))] -compile_error!("You may not enable both `defmt` and `log` features."); - -macro_rules! assert { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert!($($x)*); - } - }; -} - -macro_rules! assert_eq { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert_eq!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert_eq!($($x)*); - } - }; -} - -macro_rules! assert_ne { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert_ne!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert_ne!($($x)*); - } - }; -} - -macro_rules! debug_assert { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert!($($x)*); - } - }; -} - -macro_rules! debug_assert_eq { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert_eq!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert_eq!($($x)*); - } - }; -} - -macro_rules! debug_assert_ne { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert_ne!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert_ne!($($x)*); - } - }; -} - -macro_rules! todo { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::todo!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::todo!($($x)*); - } - }; -} - -macro_rules! unreachable { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::unreachable!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::unreachable!($($x)*); - } - }; -} - -macro_rules! panic { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::panic!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::panic!($($x)*); - } - }; -} - -macro_rules! trace { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::trace!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::trace!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! debug { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::debug!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::debug!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! info { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::info!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::info!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! warn { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::warn!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::warn!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! error { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::error!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::error!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -#[cfg(feature = "defmt")] -macro_rules! unwrap { - ($($x:tt)*) => { - ::defmt::unwrap!($($x)*) - }; -} - -#[cfg(not(feature = "defmt"))] -macro_rules! unwrap { - ($arg:expr) => { - match $crate::fmt::Try::into_result($arg) { - ::core::result::Result::Ok(t) => t, - ::core::result::Result::Err(e) => { - ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); - } - } - }; - ($arg:expr, $($msg:expr),+ $(,)? ) => { - match $crate::fmt::Try::into_result($arg) { - ::core::result::Result::Ok(t) => t, - ::core::result::Result::Err(e) => { - ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); - } - } - } -} - -#[cfg(feature = "defmt-timestamp-uptime")] -defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub struct NoneError; - -pub trait Try { - type Ok; - type Error; - fn into_result(self) -> Result; -} - -impl Try for Option { - type Ok = T; - type Error = NoneError; - - #[inline] - fn into_result(self) -> Result { - self.ok_or(NoneError) - } -} - -impl Try for Result { - type Ok = T; - type Error = E; - - #[inline] - fn into_result(self) -> Self { - self - } -} diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs deleted file mode 100644 index 8ec3300d..00000000 --- a/embassy-util/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] -#![allow(clippy::new_without_default)] -#![doc = include_str!("../../README.md")] -#![warn(missing_docs)] - -// This mod MUST go first, so that the others see its macros. -pub(crate) mod fmt; - -// internal use -mod ring_buffer; - -pub mod blocking_mutex; -pub mod channel; -pub mod mutex; -pub mod pipe; -pub mod waitqueue; - -mod select; -mod yield_now; - -pub use select::*; -pub use yield_now::*; diff --git a/embassy-util/src/mutex.rs b/embassy-util/src/mutex.rs deleted file mode 100644 index 75a6e8dd..00000000 --- a/embassy-util/src/mutex.rs +++ /dev/null @@ -1,167 +0,0 @@ -//! Async mutex. -//! -//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. -use core::cell::{RefCell, UnsafeCell}; -use core::ops::{Deref, DerefMut}; -use core::task::Poll; - -use futures_util::future::poll_fn; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex as BlockingMutex; -use crate::waitqueue::WakerRegistration; - -/// Error returned by [`Mutex::try_lock`] -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct TryLockError; - -struct State { - locked: bool, - waker: WakerRegistration, -} - -/// Async mutex. -/// -/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). -/// The raw mutex is used to guard access to the internal "is locked" flag. It -/// is held for very short periods only, while locking and unlocking. It is *not* held -/// for the entire time the async Mutex is locked. -/// -/// Which implementation you select depends on the context in which you're using the mutex. -/// -/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. -/// -/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. -/// -/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. -/// -pub struct Mutex -where - M: RawMutex, - T: ?Sized, -{ - state: BlockingMutex>, - inner: UnsafeCell, -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -/// Async mutex. -impl Mutex -where - M: RawMutex, -{ - /// Create a new mutex with the given value. - pub const fn new(value: T) -> Self { - Self { - inner: UnsafeCell::new(value), - state: BlockingMutex::new(RefCell::new(State { - locked: false, - waker: WakerRegistration::new(), - })), - } - } -} - -impl Mutex -where - M: RawMutex, - T: ?Sized, -{ - /// Lock the mutex. - /// - /// This will wait for the mutex to be unlocked if it's already locked. - pub async fn lock(&self) -> MutexGuard<'_, M, T> { - poll_fn(|cx| { - let ready = self.state.lock(|s| { - let mut s = s.borrow_mut(); - if s.locked { - s.waker.register(cx.waker()); - false - } else { - s.locked = true; - true - } - }); - - if ready { - Poll::Ready(MutexGuard { mutex: self }) - } else { - Poll::Pending - } - }) - .await - } - - /// Attempt to immediately lock the mutex. - /// - /// If the mutex is already locked, this will return an error instead of waiting. - pub fn try_lock(&self) -> Result, TryLockError> { - self.state.lock(|s| { - let mut s = s.borrow_mut(); - if s.locked { - Err(TryLockError) - } else { - s.locked = true; - Ok(()) - } - })?; - - Ok(MutexGuard { mutex: self }) - } -} - -/// Async mutex guard. -/// -/// Owning an instance of this type indicates having -/// successfully locked the mutex, and grants access to the contents. -/// -/// Dropping it unlocks the mutex. -pub struct MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - mutex: &'a Mutex, -} - -impl<'a, M, T> Drop for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - fn drop(&mut self) { - self.mutex.state.lock(|s| { - let mut s = s.borrow_mut(); - s.locked = false; - s.waker.wake(); - }) - } -} - -impl<'a, M, T> Deref for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - type Target = T; - fn deref(&self) -> &Self::Target { - // Safety: the MutexGuard represents exclusive access to the contents - // of the mutex, so it's OK to get it. - unsafe { &*(self.mutex.inner.get() as *const T) } - } -} - -impl<'a, M, T> DerefMut for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - // Safety: the MutexGuard represents exclusive access to the contents - // of the mutex, so it's OK to get it. - unsafe { &mut *(self.mutex.inner.get()) } - } -} diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs deleted file mode 100644 index d85b843e..00000000 --- a/embassy-util/src/pipe.rs +++ /dev/null @@ -1,551 +0,0 @@ -//! Async byte stream pipe. - -use core::cell::RefCell; -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::ring_buffer::RingBuffer; -use crate::waitqueue::WakerRegistration; - -/// Write-only access to a [`Pipe`]. -#[derive(Copy)] -pub struct Writer<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, -} - -impl<'p, M, const N: usize> Clone for Writer<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Writer { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Writer<'p, M, N> -where - M: RawMutex, -{ - /// Writes a value. - /// - /// See [`Pipe::write()`] - pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { - self.pipe.write(buf) - } - - /// Attempt to immediately write a message. - /// - /// See [`Pipe::write()`] - pub fn try_write(&self, buf: &[u8]) -> Result { - self.pipe.try_write(buf) - } -} - -/// Future returned by [`Pipe::write`] and [`Writer::write`]. -pub struct WriteFuture<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, - buf: &'p [u8], -} - -impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> -where - M: RawMutex, -{ - type Output = usize; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.pipe.try_write_with_context(Some(cx), self.buf) { - Ok(n) => Poll::Ready(n), - Err(TryWriteError::Full) => Poll::Pending, - } - } -} - -impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} - -/// Read-only access to a [`Pipe`]. -#[derive(Copy)] -pub struct Reader<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, -} - -impl<'p, M, const N: usize> Clone for Reader<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Reader { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Reader<'p, M, N> -where - M: RawMutex, -{ - /// Reads a value. - /// - /// See [`Pipe::read()`] - pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { - self.pipe.read(buf) - } - - /// Attempt to immediately read a message. - /// - /// See [`Pipe::read()`] - pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.pipe.try_read(buf) - } -} - -/// Future returned by [`Pipe::read`] and [`Reader::read`]. -pub struct ReadFuture<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, - buf: &'p mut [u8], -} - -impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> -where - M: RawMutex, -{ - type Output = usize; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.pipe.try_read_with_context(Some(cx), self.buf) { - Ok(n) => Poll::Ready(n), - Err(TryReadError::Empty) => Poll::Pending, - } - } -} - -impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} - -/// Error returned by [`try_read`](Pipe::try_read). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryReadError { - /// No data could be read from the pipe because it is currently - /// empty, and reading would require blocking. - Empty, -} - -/// Error returned by [`try_write`](Pipe::try_write). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryWriteError { - /// No data could be written to the pipe because it is - /// currently full, and writing would require blocking. - Full, -} - -struct PipeState { - buffer: RingBuffer, - read_waker: WakerRegistration, - write_waker: WakerRegistration, -} - -impl PipeState { - const fn new() -> Self { - PipeState { - buffer: RingBuffer::new(), - read_waker: WakerRegistration::new(), - write_waker: WakerRegistration::new(), - } - } - - fn clear(&mut self) { - self.buffer.clear(); - self.write_waker.wake(); - } - - fn try_read(&mut self, buf: &mut [u8]) -> Result { - self.try_read_with_context(None, buf) - } - - fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - if self.buffer.is_full() { - self.write_waker.wake(); - } - - let available = self.buffer.pop_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.read_waker.register(cx.waker()); - } - return Err(TryReadError::Empty); - } - - let n = available.len().min(buf.len()); - buf[..n].copy_from_slice(&available[..n]); - self.buffer.pop(n); - Ok(n) - } - - fn try_write(&mut self, buf: &[u8]) -> Result { - self.try_write_with_context(None, buf) - } - - fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - if self.buffer.is_empty() { - self.read_waker.wake(); - } - - let available = self.buffer.push_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.write_waker.register(cx.waker()); - } - return Err(TryWriteError::Full); - } - - let n = available.len().min(buf.len()); - available[..n].copy_from_slice(&buf[..n]); - self.buffer.push(n); - Ok(n) - } -} - -/// A bounded pipe for communicating between asynchronous tasks -/// with backpressure. -/// -/// The pipe will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `write` new messages will wait until a message is -/// read from the pipe. -/// -/// All data written will become available in the same order as it was written. -pub struct Pipe -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Pipe -where - M: RawMutex, -{ - /// Establish a new bounded pipe. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy_util::pipe::Pipe; - /// use embassy_util::blocking_mutex::raw::NoopRawMutex; - /// - /// // Declare a bounded pipe, with a buffer of 256 bytes. - /// let mut pipe = Pipe::::new(); - /// ``` - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(PipeState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut PipeState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } - - fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read_with_context(cx, buf)) - } - - fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - self.lock(|c| c.try_write_with_context(cx, buf)) - } - - /// Get a writer for this pipe. - pub fn writer(&self) -> Writer<'_, M, N> { - Writer { pipe: self } - } - - /// Get a reader for this pipe. - pub fn reader(&self) -> Reader<'_, M, N> { - Reader { pipe: self } - } - - /// Write a value, waiting until there is capacity. - /// - /// Writeing completes when the value has been pushed to the pipe's queue. - /// This doesn't mean the value has been read yet. - pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { - WriteFuture { pipe: self, buf } - } - - /// Attempt to immediately write a message. - /// - /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the pipe capacity has been reached, i.e., the pipe has `n` - /// buffered values where `n` is the argument passed to [`Pipe`], then an - /// error is returned. - pub fn try_write(&self, buf: &[u8]) -> Result { - self.lock(|c| c.try_write(buf)) - } - - /// Receive the next value. - /// - /// If there are no messages in the pipe's buffer, this method will - /// wait until a message is written. - pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { - ReadFuture { pipe: self, buf } - } - - /// Attempt to immediately read a message. - /// - /// This method will either read a message from the pipe immediately or return an error - /// if the pipe is empty. - pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read(buf)) - } - - /// Clear the data in the pipe's buffer. - pub fn clear(&self) { - self.lock(|c| c.clear()) - } - - /// Return whether the pipe is full (no free space in the buffer) - pub fn is_full(&self) -> bool { - self.len() == N - } - - /// Return whether the pipe is empty (no data buffered) - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Total byte capacity. - /// - /// This is the same as the `N` generic param. - pub fn capacity(&self) -> usize { - N - } - - /// Used byte capacity. - pub fn len(&self) -> usize { - self.lock(|c| c.buffer.len()) - } - - /// Free byte capacity. - /// - /// This is equivalent to `capacity() - len()` - pub fn free_capacity(&self) -> usize { - N - self.len() - } -} - -#[cfg(feature = "nightly")] -mod io_impls { - use core::convert::Infallible; - - use futures_util::FutureExt; - - use super::*; - - impl embedded_io::Io for Pipe { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for Pipe { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) - } - } - - impl embedded_io::asynch::Write for Pipe { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } - - impl embedded_io::Io for &Pipe { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for &Pipe { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) - } - } - - impl embedded_io::asynch::Write for &Pipe { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } - - impl embedded_io::Io for Reader<'_, M, N> { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for Reader<'_, M, N> { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Reader::read(self, buf).map(Ok) - } - } - - impl embedded_io::Io for Writer<'_, M, N> { - type Error = Infallible; - } - - impl embedded_io::asynch::Write for Writer<'_, M, N> { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Writer::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } -} - -#[cfg(test)] -mod tests { - use futures_executor::ThreadPool; - use futures_util::task::SpawnExt; - use static_cell::StaticCell; - - use super::*; - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - - fn capacity(c: &PipeState) -> usize { - N - c.buffer.len() - } - - #[test] - fn writing_once() { - let mut c = PipeState::<3>::new(); - assert!(c.try_write(&[1]).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn writing_when_full() { - let mut c = PipeState::<3>::new(); - assert_eq!(c.try_write(&[42]), Ok(1)); - assert_eq!(c.try_write(&[43]), Ok(1)); - assert_eq!(c.try_write(&[44]), Ok(1)); - assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); - assert_eq!(capacity(&c), 0); - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = PipeState::<3>::new(); - assert!(c.try_write(&[42]).is_ok()); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Ok(1)); - assert_eq!(buf[0], 42); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = PipeState::<3>::new(); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn simple_send_and_receive() { - let c = Pipe::::new(); - assert!(c.try_write(&[42]).is_ok()); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Ok(1)); - assert_eq!(buf[0], 42); - } - - #[test] - fn cloning() { - let c = Pipe::::new(); - let r1 = c.reader(); - let w1 = c.writer(); - - let _ = r1.clone(); - let _ = w1.clone(); - } - - #[futures_test::test] - async fn receiver_receives_given_try_write_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Pipe::new()); - let c2 = c; - let f = async move { - assert_eq!(c2.try_write(&[42]), Ok(1)); - }; - executor.spawn(f).unwrap(); - let mut buf = [0; 16]; - assert_eq!(c.read(&mut buf).await, 1); - assert_eq!(buf[0], 42); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let c = Pipe::::new(); - c.write(&[42]).await; - let mut buf = [0; 16]; - assert_eq!(c.read(&mut buf).await, 1); - assert_eq!(buf[0], 42); - } -} diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs deleted file mode 100644 index 52108402..00000000 --- a/embassy-util/src/ring_buffer.rs +++ /dev/null @@ -1,146 +0,0 @@ -pub struct RingBuffer { - buf: [u8; N], - start: usize, - end: usize, - empty: bool, -} - -impl RingBuffer { - pub const fn new() -> Self { - Self { - buf: [0; N], - start: 0, - end: 0, - empty: true, - } - } - - pub fn push_buf(&mut self) -> &mut [u8] { - if self.start == self.end && !self.empty { - trace!(" ringbuf: push_buf empty"); - return &mut self.buf[..0]; - } - - let n = if self.start <= self.end { - self.buf.len() - self.end - } else { - self.start - self.end - }; - - trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); - &mut self.buf[self.end..self.end + n] - } - - pub fn push(&mut self, n: usize) { - trace!(" ringbuf: push {:?}", n); - if n == 0 { - return; - } - - self.end = self.wrap(self.end + n); - self.empty = false; - } - - pub fn pop_buf(&mut self) -> &mut [u8] { - if self.empty { - trace!(" ringbuf: pop_buf empty"); - return &mut self.buf[..0]; - } - - let n = if self.end <= self.start { - self.buf.len() - self.start - } else { - self.end - self.start - }; - - trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); - &mut self.buf[self.start..self.start + n] - } - - pub fn pop(&mut self, n: usize) { - trace!(" ringbuf: pop {:?}", n); - if n == 0 { - return; - } - - self.start = self.wrap(self.start + n); - self.empty = self.start == self.end; - } - - pub fn is_full(&self) -> bool { - self.start == self.end && !self.empty - } - - pub fn is_empty(&self) -> bool { - self.empty - } - - #[allow(unused)] - pub fn len(&self) -> usize { - if self.empty { - 0 - } else if self.start < self.end { - self.end - self.start - } else { - N + self.end - self.start - } - } - - pub fn clear(&mut self) { - self.start = 0; - self.end = 0; - self.empty = true; - } - - fn wrap(&self, n: usize) -> usize { - assert!(n <= self.buf.len()); - if n == self.buf.len() { - 0 - } else { - n - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn push_pop() { - let mut rb: RingBuffer<4> = RingBuffer::new(); - let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - buf[0] = 1; - buf[1] = 2; - buf[2] = 3; - buf[3] = 4; - rb.push(4); - - let buf = rb.pop_buf(); - assert_eq!(4, buf.len()); - assert_eq!(1, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(3, buf.len()); - assert_eq!(2, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(2, buf.len()); - assert_eq!(3, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(1, buf.len()); - assert_eq!(4, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(0, buf.len()); - - let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - } -} diff --git a/embassy-util/src/select.rs b/embassy-util/src/select.rs deleted file mode 100644 index 8cecb7fa..00000000 --- a/embassy-util/src/select.rs +++ /dev/null @@ -1,230 +0,0 @@ -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Result for [`select`]. -#[derive(Debug, Clone)] -pub enum Either { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), -} - -/// Wait for one of two futures to complete. -/// -/// This function returns a new future which polls all the futures. -/// When one of them completes, it will complete with its result value. -/// -/// The other future is dropped. -pub fn select(a: A, b: B) -> Select -where - A: Future, - B: Future, -{ - Select { a, b } -} - -/// Future for the [`select`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select { - a: A, - b: B, -} - -impl Unpin for Select {} - -impl Future for Select -where - A: Future, - B: Future, -{ - type Output = Either; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either::Second(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Result for [`select3`]. -#[derive(Debug, Clone)] -pub enum Either3 { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), - /// Third future finished first. - Third(C), -} - -/// Same as [`select`], but with more futures. -pub fn select3(a: A, b: B, c: C) -> Select3 -where - A: Future, - B: Future, - C: Future, -{ - Select3 { a, b, c } -} - -/// Future for the [`select3`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select3 { - a: A, - b: B, - c: C, -} - -impl Future for Select3 -where - A: Future, - B: Future, - C: Future, -{ - type Output = Either3; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - let c = unsafe { Pin::new_unchecked(&mut this.c) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either3::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either3::Second(x)); - } - if let Poll::Ready(x) = c.poll(cx) { - return Poll::Ready(Either3::Third(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Result for [`select4`]. -#[derive(Debug, Clone)] -pub enum Either4 { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), - /// Third future finished first. - Third(C), - /// Fourth future finished first. - Fourth(D), -} - -/// Same as [`select`], but with more futures. -pub fn select4(a: A, b: B, c: C, d: D) -> Select4 -where - A: Future, - B: Future, - C: Future, - D: Future, -{ - Select4 { a, b, c, d } -} - -/// Future for the [`select4`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select4 { - a: A, - b: B, - c: C, - d: D, -} - -impl Future for Select4 -where - A: Future, - B: Future, - C: Future, - D: Future, -{ - type Output = Either4; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - let c = unsafe { Pin::new_unchecked(&mut this.c) }; - let d = unsafe { Pin::new_unchecked(&mut this.d) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either4::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either4::Second(x)); - } - if let Poll::Ready(x) = c.poll(cx) { - return Poll::Ready(Either4::Third(x)); - } - if let Poll::Ready(x) = d.poll(cx) { - return Poll::Ready(Either4::Fourth(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Future for the [`select_all`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct SelectAll { - inner: [Fut; N], -} - -/// Creates a new future which will select over a list of futures. -/// -/// The returned future will wait for any future within `iter` to be ready. Upon -/// completion the item resolved will be returned, along with the index of the -/// future that was ready. -/// -/// # Panics -/// -/// This function will panic if the array specified contains no items. -pub fn select_all(arr: [Fut; N]) -> SelectAll { - assert!(N > 0); - SelectAll { inner: arr } -} - -impl Future for SelectAll { - type Output = (Fut::Output, usize); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safety: Since `self` is pinned, `inner` cannot move. Since `inner` cannot move, - // its elements also cannot move. Therefore it is safe to access `inner` and pin - // references to the contained futures. - let item = unsafe { - self.get_unchecked_mut() - .inner - .iter_mut() - .enumerate() - .find_map(|(i, f)| match Pin::new_unchecked(f).poll(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - }) - }; - - match item { - Some((idx, res)) => Poll::Ready((res, idx)), - None => Poll::Pending, - } - } -} diff --git a/embassy-util/src/waitqueue/mod.rs b/embassy-util/src/waitqueue/mod.rs deleted file mode 100644 index 6661a6b6..00000000 --- a/embassy-util/src/waitqueue/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Async low-level wait queues - -mod waker; -pub use waker::*; - -mod multi_waker; -pub use multi_waker::*; diff --git a/embassy-util/src/waitqueue/multi_waker.rs b/embassy-util/src/waitqueue/multi_waker.rs deleted file mode 100644 index 325d2cb3..00000000 --- a/embassy-util/src/waitqueue/multi_waker.rs +++ /dev/null @@ -1,33 +0,0 @@ -use core::task::Waker; - -use super::WakerRegistration; - -/// Utility struct to register and wake multiple wakers. -pub struct MultiWakerRegistration { - wakers: [WakerRegistration; N], -} - -impl MultiWakerRegistration { - /// Create a new empty instance - pub const fn new() -> Self { - const WAKER: WakerRegistration = WakerRegistration::new(); - Self { wakers: [WAKER; N] } - } - - /// Register a waker. If the buffer is full the function returns it in the error - pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { - if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { - waker_slot.register(w); - Ok(()) - } else { - Err(w) - } - } - - /// Wake all registered wakers. This clears the buffer - pub fn wake(&mut self) { - for waker_slot in self.wakers.iter_mut() { - waker_slot.wake() - } - } -} diff --git a/embassy-util/src/waitqueue/waker.rs b/embassy-util/src/waitqueue/waker.rs deleted file mode 100644 index 64e300eb..00000000 --- a/embassy-util/src/waitqueue/waker.rs +++ /dev/null @@ -1,92 +0,0 @@ -use core::cell::Cell; -use core::mem; -use core::task::Waker; - -use crate::blocking_mutex::raw::CriticalSectionRawMutex; -use crate::blocking_mutex::Mutex; - -/// Utility struct to register and wake a waker. -#[derive(Debug)] -pub struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - /// Create a new `WakerRegistration`. - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake() - } - } - - /// Returns true if a waker is currently registered - pub fn occupied(&self) -> bool { - self.waker.is_some() - } -} - -/// Utility struct to register and wake a waker. -pub struct AtomicWaker { - waker: Mutex>>, -} - -impl AtomicWaker { - /// Create a new `AtomicWaker`. - pub const fn new() -> Self { - Self { - waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - cell.set(match cell.replace(None) { - Some(w2) if (w2.will_wake(w)) => Some(w2), - _ => Some(w.clone()), - }) - }) - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - if let Some(w) = cell.replace(None) { - w.wake_by_ref(); - cell.set(Some(w)); - } - }) - } -} diff --git a/embassy-util/src/yield_now.rs b/embassy-util/src/yield_now.rs deleted file mode 100644 index 1ebecb91..00000000 --- a/embassy-util/src/yield_now.rs +++ /dev/null @@ -1,25 +0,0 @@ -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Yield from the current task once, allowing other tasks to run. -pub fn yield_now() -> impl Future { - YieldNowFuture { yielded: false } -} - -struct YieldNowFuture { - yielded: bool, -} - -impl Future for YieldNowFuture { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.yielded { - Poll::Ready(()) - } else { - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} diff --git a/examples/boot/application/nrf/Cargo.toml b/examples/boot/application/nrf/Cargo.toml index ef934663..b9ff9257 100644 --- a/examples/boot/application/nrf/Cargo.toml +++ b/examples/boot/application/nrf/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-nrf-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync" } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly"] } embassy-nrf = { version = "0.1.0", path = "../../../../embassy-nrf", features = ["time-driver-rtc1", "gpiote", "nightly", "nrf52840"] } diff --git a/examples/boot/application/stm32f3/Cargo.toml b/examples/boot/application/stm32f3/Cargo.toml index 27eafa65..f143d1e8 100644 --- a/examples/boot/application/stm32f3/Cargo.toml +++ b/examples/boot/application/stm32f3/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32f3-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32f303re", "time-driver-any", "exti"] } diff --git a/examples/boot/application/stm32f7/Cargo.toml b/examples/boot/application/stm32f7/Cargo.toml index 7de0b82d..29c87eee 100644 --- a/examples/boot/application/stm32f7/Cargo.toml +++ b/examples/boot/application/stm32f7/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32f7-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32f767zi", "time-driver-any", "exti"] } diff --git a/examples/boot/application/stm32h7/Cargo.toml b/examples/boot/application/stm32h7/Cargo.toml index 65d34c70..5669527f 100644 --- a/examples/boot/application/stm32h7/Cargo.toml +++ b/examples/boot/application/stm32h7/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32h7-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32h743zi", "time-driver-any", "exti"] } diff --git a/examples/boot/application/stm32l0/Cargo.toml b/examples/boot/application/stm32l0/Cargo.toml index 8f37869e..48624d5e 100644 --- a/examples/boot/application/stm32l0/Cargo.toml +++ b/examples/boot/application/stm32l0/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32l0-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32l072cz", "time-driver-any", "exti", "memory-x"] } diff --git a/examples/boot/application/stm32l1/Cargo.toml b/examples/boot/application/stm32l1/Cargo.toml index 6abf1986..00b638ca 100644 --- a/examples/boot/application/stm32l1/Cargo.toml +++ b/examples/boot/application/stm32l1/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32l1-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32l151cb-a", "time-driver-any", "exti"] } diff --git a/examples/boot/application/stm32l4/Cargo.toml b/examples/boot/application/stm32l4/Cargo.toml index 6f2d12ff..51ba730d 100644 --- a/examples/boot/application/stm32l4/Cargo.toml +++ b/examples/boot/application/stm32l4/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32l4-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32l475vg", "time-driver-any", "exti"] } diff --git a/examples/boot/application/stm32wl/Cargo.toml b/examples/boot/application/stm32wl/Cargo.toml index be97d4eb..182acf69 100644 --- a/examples/boot/application/stm32wl/Cargo.toml +++ b/examples/boot/application/stm32wl/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-boot-stm32wl-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../../../embassy-executor", features = ["nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../../../embassy-time", features = ["nightly", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../../../embassy-stm32", features = ["unstable-traits", "nightly", "stm32wl55jc-cm4", "time-driver-any", "exti"] } diff --git a/examples/nrf-rtos-trace/Cargo.toml b/examples/nrf-rtos-trace/Cargo.toml index b0907f92..87c9f33f 100644 --- a/examples/nrf-rtos-trace/Cargo.toml +++ b/examples/nrf-rtos-trace/Cargo.toml @@ -8,14 +8,14 @@ default = ["log", "nightly"] nightly = ["embassy-executor/nightly", "embassy-nrf/nightly", "embassy-nrf/unstable-traits"] log = [ "dep:log", - "embassy-util/log", + "embassy-sync/log", "embassy-executor/log", "embassy-time/log", "embassy-nrf/log", ] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util" } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync" } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features=["rtos-trace", "rtos-trace-interrupt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time" } embassy-nrf = { version = "0.1.0", path = "../../embassy-nrf", features = ["nrf52840", "time-driver-rtc1", "gpiote", "unstable-pac"] } diff --git a/examples/nrf/Cargo.toml b/examples/nrf/Cargo.toml index 17f29b8f..b0af0c86 100644 --- a/examples/nrf/Cargo.toml +++ b/examples/nrf/Cargo.toml @@ -8,7 +8,8 @@ default = ["nightly"] nightly = ["embassy-executor/nightly", "embassy-nrf/nightly", "embassy-nrf/unstable-traits", "embassy-usb", "embassy-usb-serial", "embassy-usb-hid", "embassy-usb-ncm", "embedded-io/async", "embassy-net"] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-futures = { version = "0.1.0", path = "../../embassy-futures" } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime"] } embassy-nrf = { version = "0.1.0", path = "../../embassy-nrf", features = ["defmt", "nrf52840", "time-driver-rtc1", "gpiote", "unstable-pac"] } diff --git a/examples/nrf/src/bin/channel.rs b/examples/nrf/src/bin/channel.rs index 19520098..8371fd0a 100644 --- a/examples/nrf/src/bin/channel.rs +++ b/examples/nrf/src/bin/channel.rs @@ -5,9 +5,9 @@ use defmt::unwrap; use embassy_executor::Spawner; use embassy_nrf::gpio::{Level, Output, OutputDrive}; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use embassy_time::{Duration, Timer}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; use {defmt_rtt as _, panic_probe as _}; enum LedState { diff --git a/examples/nrf/src/bin/channel_sender_receiver.rs b/examples/nrf/src/bin/channel_sender_receiver.rs index d250b6a5..55d1fccb 100644 --- a/examples/nrf/src/bin/channel_sender_receiver.rs +++ b/examples/nrf/src/bin/channel_sender_receiver.rs @@ -5,9 +5,9 @@ use defmt::unwrap; use embassy_executor::Spawner; use embassy_nrf::gpio::{AnyPin, Level, Output, OutputDrive, Pin}; +use embassy_sync::blocking_mutex::raw::NoopRawMutex; +use embassy_sync::channel::mpmc::{Channel, Receiver, Sender}; use embassy_time::{Duration, Timer}; -use embassy_util::blocking_mutex::raw::NoopRawMutex; -use embassy_util::channel::mpmc::{Channel, Receiver, Sender}; use static_cell::StaticCell; use {defmt_rtt as _, panic_probe as _}; diff --git a/examples/nrf/src/bin/mutex.rs b/examples/nrf/src/bin/mutex.rs index 87629788..c402c6ba 100644 --- a/examples/nrf/src/bin/mutex.rs +++ b/examples/nrf/src/bin/mutex.rs @@ -4,9 +4,9 @@ use defmt::{info, unwrap}; use embassy_executor::Spawner; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::mutex::Mutex; use embassy_time::{Duration, Timer}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::mutex::Mutex; use {defmt_rtt as _, panic_probe as _}; static MUTEX: Mutex = Mutex::new(0); diff --git a/examples/nrf/src/bin/pubsub.rs b/examples/nrf/src/bin/pubsub.rs index 1d90217f..64fed641 100644 --- a/examples/nrf/src/bin/pubsub.rs +++ b/examples/nrf/src/bin/pubsub.rs @@ -4,9 +4,9 @@ use defmt::unwrap; use embassy_executor::Spawner; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber}; use embassy_time::{Duration, Timer}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber}; use {defmt_rtt as _, panic_probe as _}; /// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher diff --git a/examples/nrf/src/bin/uart_split.rs b/examples/nrf/src/bin/uart_split.rs index dab8e475..88b9c0a8 100644 --- a/examples/nrf/src/bin/uart_split.rs +++ b/examples/nrf/src/bin/uart_split.rs @@ -7,8 +7,8 @@ use embassy_executor::Spawner; use embassy_nrf::peripherals::UARTE0; use embassy_nrf::uarte::UarteRx; use embassy_nrf::{interrupt, uarte}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use {defmt_rtt as _, panic_probe as _}; static CHANNEL: Channel = Channel::new(); diff --git a/examples/nrf/src/bin/usb_ethernet.rs b/examples/nrf/src/bin/usb_ethernet.rs index d427f756..0200d880 100644 --- a/examples/nrf/src/bin/usb_ethernet.rs +++ b/examples/nrf/src/bin/usb_ethernet.rs @@ -14,10 +14,10 @@ use embassy_net::{PacketBox, PacketBoxExt, PacketBuf, Stack, StackResources}; use embassy_nrf::rng::Rng; use embassy_nrf::usb::{Driver, PowerUsb}; use embassy_nrf::{interrupt, pac, peripherals}; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use embassy_usb::{Builder, Config, UsbDevice}; use embassy_usb_ncm::{CdcNcmClass, Receiver, Sender, State}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; use embedded_io::asynch::{Read, Write}; use static_cell::StaticCell; use {defmt_rtt as _, panic_probe as _}; diff --git a/examples/nrf/src/bin/usb_hid_keyboard.rs b/examples/nrf/src/bin/usb_hid_keyboard.rs index cf0078ee..d7c6dafd 100644 --- a/examples/nrf/src/bin/usb_hid_keyboard.rs +++ b/examples/nrf/src/bin/usb_hid_keyboard.rs @@ -8,14 +8,14 @@ use core::sync::atomic::{AtomicBool, Ordering}; use defmt::*; use embassy_executor::Spawner; +use embassy_futures::{select, Either}; use embassy_nrf::gpio::{Input, Pin, Pull}; use embassy_nrf::usb::{Driver, PowerUsb}; use embassy_nrf::{interrupt, pac}; +use embassy_sync::channel::signal::Signal; use embassy_usb::control::OutResponse; use embassy_usb::{Builder, Config, DeviceStateHandler}; use embassy_usb_hid::{HidReaderWriter, ReportId, RequestHandler, State}; -use embassy_util::channel::signal::Signal; -use embassy_util::{select, Either}; use futures::future::join; use usbd_hid::descriptor::{KeyboardReport, SerializedDescriptor}; use {defmt_rtt as _, panic_probe as _}; diff --git a/examples/rp/Cargo.toml b/examples/rp/Cargo.toml index c2dcf429..d804a660 100644 --- a/examples/rp/Cargo.toml +++ b/examples/rp/Cargo.toml @@ -5,7 +5,7 @@ version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime"] } embassy-rp = { version = "0.1.0", path = "../../embassy-rp", features = ["defmt", "unstable-traits", "nightly", "unstable-pac"] } diff --git a/examples/std/Cargo.toml b/examples/std/Cargo.toml index 164a2b42..c7cec6b1 100644 --- a/examples/std/Cargo.toml +++ b/examples/std/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-std-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["log"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["log"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["log", "std", "nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["log", "std", "nightly"] } embassy-net = { version = "0.1.0", path = "../../embassy-net", features=[ "std", "log", "medium-ethernet", "tcp", "udp", "dhcpv4", "pool-16"] } diff --git a/examples/stm32f0/Cargo.toml b/examples/stm32f0/Cargo.toml index 8476200d..cd2995d2 100644 --- a/examples/stm32f0/Cargo.toml +++ b/examples/stm32f0/Cargo.toml @@ -11,7 +11,7 @@ cortex-m-rt = "0.7.0" defmt = "0.3" defmt-rtt = "0.3" panic-probe = "0.3" -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "memory-x", "stm32f030f4", "time-driver-any"] } diff --git a/examples/stm32f1/Cargo.toml b/examples/stm32f1/Cargo.toml index fbc96400..8660e743 100644 --- a/examples/stm32f1/Cargo.toml +++ b/examples/stm32f1/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32f1-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32f103c8", "unstable-pac", "memory-x", "time-driver-any"] } diff --git a/examples/stm32f2/Cargo.toml b/examples/stm32f2/Cargo.toml index 27894df5..b4bff4d8 100644 --- a/examples/stm32f2/Cargo.toml +++ b/examples/stm32f2/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32f2-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32f207zg", "unstable-pac", "memory-x", "time-driver-any", "exti"] } diff --git a/examples/stm32f3/Cargo.toml b/examples/stm32f3/Cargo.toml index 4e6b0ea1..d152b145 100644 --- a/examples/stm32f3/Cargo.toml +++ b/examples/stm32f3/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32f3-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32f303ze", "unstable-pac", "memory-x", "time-driver-any", "exti"] } diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs index 61fc6dca..edf34b4d 100644 --- a/examples/stm32f3/src/bin/button_events.rs +++ b/examples/stm32f3/src/bin/button_events.rs @@ -15,9 +15,9 @@ use embassy_executor::Spawner; use embassy_stm32::exti::ExtiInput; use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed}; use embassy_stm32::peripherals::PA0; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use embassy_time::{with_timeout, Duration, Timer}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; use {defmt_rtt as _, panic_probe as _}; struct Leds<'a> { diff --git a/examples/stm32f4/Cargo.toml b/examples/stm32f4/Cargo.toml index f93a1d0f..9bfdda92 100644 --- a/examples/stm32f4/Cargo.toml +++ b/examples/stm32f4/Cargo.toml @@ -5,7 +5,7 @@ version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "unstable-traits", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "unstable-traits", "defmt", "stm32f429zi", "unstable-pac", "memory-x", "time-driver-any", "exti"] } diff --git a/examples/stm32f7/Cargo.toml b/examples/stm32f7/Cargo.toml index e286d231..a446fe3f 100644 --- a/examples/stm32f7/Cargo.toml +++ b/examples/stm32f7/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32f7-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "net", "stm32f767zi", "unstable-pac", "time-driver-any", "exti"] } diff --git a/examples/stm32g0/Cargo.toml b/examples/stm32g0/Cargo.toml index 5c80d43e..30f2b86f 100644 --- a/examples/stm32g0/Cargo.toml +++ b/examples/stm32g0/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32g0-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "time-driver-any", "stm32g071rb", "memory-x", "unstable-pac", "exti"] } diff --git a/examples/stm32g4/Cargo.toml b/examples/stm32g4/Cargo.toml index 74c645cf..f81df0b7 100644 --- a/examples/stm32g4/Cargo.toml +++ b/examples/stm32g4/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32g4-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "time-driver-any", "stm32g491re", "memory-x", "unstable-pac", "exti"] } diff --git a/examples/stm32h7/Cargo.toml b/examples/stm32h7/Cargo.toml index fc5f74f9..0f76f322 100644 --- a/examples/stm32h7/Cargo.toml +++ b/examples/stm32h7/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32h7-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "unstable-traits", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32h743bi", "net", "time-driver-any", "exti", "unstable-pac", "unstable-traits"] } diff --git a/examples/stm32h7/src/bin/signal.rs b/examples/stm32h7/src/bin/signal.rs index be2ac268..ae41b07a 100644 --- a/examples/stm32h7/src/bin/signal.rs +++ b/examples/stm32h7/src/bin/signal.rs @@ -4,8 +4,8 @@ use defmt::{info, unwrap}; use embassy_executor::Spawner; +use embassy_sync::channel::signal::Signal; use embassy_time::{Duration, Timer}; -use embassy_util::channel::signal::Signal; use {defmt_rtt as _, panic_probe as _}; static SIGNAL: Signal = Signal::new(); diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs index 64080ec4..55630dd3 100644 --- a/examples/stm32h7/src/bin/usart_split.rs +++ b/examples/stm32h7/src/bin/usart_split.rs @@ -7,8 +7,8 @@ use embassy_executor::Spawner; use embassy_stm32::dma::NoDma; use embassy_stm32::peripherals::{DMA1_CH1, UART7}; use embassy_stm32::usart::{Config, Uart, UartRx}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use {defmt_rtt as _, panic_probe as _}; #[embassy_executor::task] diff --git a/examples/stm32l0/Cargo.toml b/examples/stm32l0/Cargo.toml index 72365a64..11751a21 100644 --- a/examples/stm32l0/Cargo.toml +++ b/examples/stm32l0/Cargo.toml @@ -8,7 +8,7 @@ default = ["nightly"] nightly = ["embassy-stm32/nightly", "embassy-lora", "lorawan-device", "lorawan", "embedded-io/async"] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["defmt", "stm32l072cz", "time-driver-any", "exti", "unstable-traits", "memory-x"] } diff --git a/examples/stm32l1/Cargo.toml b/examples/stm32l1/Cargo.toml index 43f844b6..18b35b30 100644 --- a/examples/stm32l1/Cargo.toml +++ b/examples/stm32l1/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32l1-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32l151cb-a", "time-driver-any", "memory-x"] } diff --git a/examples/stm32l4/Cargo.toml b/examples/stm32l4/Cargo.toml index eaffa253..cb7238e4 100644 --- a/examples/stm32l4/Cargo.toml +++ b/examples/stm32l4/Cargo.toml @@ -6,7 +6,7 @@ version = "0.1.0" [features] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-embedded-hal = { version = "0.1.0", path = "../../embassy-embedded-hal" } diff --git a/examples/stm32l5/Cargo.toml b/examples/stm32l5/Cargo.toml index d8e78088..624c73c2 100644 --- a/examples/stm32l5/Cargo.toml +++ b/examples/stm32l5/Cargo.toml @@ -6,7 +6,7 @@ version = "0.1.0" [features] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "unstable-pac", "stm32l552ze", "time-driver-any", "exti", "unstable-traits", "memory-x"] } diff --git a/examples/stm32l5/src/bin/usb_ethernet.rs b/examples/stm32l5/src/bin/usb_ethernet.rs index 95919551..b21d8629 100644 --- a/examples/stm32l5/src/bin/usb_ethernet.rs +++ b/examples/stm32l5/src/bin/usb_ethernet.rs @@ -15,10 +15,10 @@ use embassy_stm32::rng::Rng; use embassy_stm32::time::Hertz; use embassy_stm32::usb::Driver; use embassy_stm32::{interrupt, Config}; +use embassy_sync::blocking_mutex::raw::ThreadModeRawMutex; +use embassy_sync::channel::mpmc::Channel; use embassy_usb::{Builder, UsbDevice}; use embassy_usb_ncm::{CdcNcmClass, Receiver, Sender, State}; -use embassy_util::blocking_mutex::raw::ThreadModeRawMutex; -use embassy_util::channel::mpmc::Channel; use embedded_io::asynch::{Read, Write}; use rand_core::RngCore; use static_cell::StaticCell; diff --git a/examples/stm32u5/Cargo.toml b/examples/stm32u5/Cargo.toml index 48833664..ff0ec9f4 100644 --- a/examples/stm32u5/Cargo.toml +++ b/examples/stm32u5/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32u5-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "unstable-pac", "stm32u585ai", "time-driver-any", "memory-x" ] } diff --git a/examples/stm32wb/Cargo.toml b/examples/stm32wb/Cargo.toml index b4630076..3b10da0a 100644 --- a/examples/stm32wb/Cargo.toml +++ b/examples/stm32wb/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32wb-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32wb55cc", "time-driver-any", "exti"] } diff --git a/examples/stm32wl/Cargo.toml b/examples/stm32wl/Cargo.toml index ae33478a..5f6679f4 100644 --- a/examples/stm32wl/Cargo.toml +++ b/examples/stm32wl/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-stm32wl-examples" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32wl55jc-cm4", "time-driver-any", "memory-x", "subghz", "unstable-pac", "exti"] } diff --git a/examples/stm32wl/src/bin/subghz.rs b/examples/stm32wl/src/bin/subghz.rs index d16e3f5e..6d54e850 100644 --- a/examples/stm32wl/src/bin/subghz.rs +++ b/examples/stm32wl/src/bin/subghz.rs @@ -13,7 +13,7 @@ use embassy_stm32::gpio::{Input, Level, Output, Pull, Speed}; use embassy_stm32::interrupt; use embassy_stm32::interrupt::{Interrupt, InterruptExt}; use embassy_stm32::subghz::*; -use embassy_util::channel::signal::Signal; +use embassy_sync::channel::signal::Signal; use {defmt_rtt as _, panic_probe as _}; const PING_DATA: &str = "PING"; diff --git a/examples/wasm/Cargo.toml b/examples/wasm/Cargo.toml index c7f98036..194e8f4b 100644 --- a/examples/wasm/Cargo.toml +++ b/examples/wasm/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" crate-type = ["cdylib"] [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["log"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["log"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["log", "wasm", "nightly", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["log", "wasm", "nightly"] } diff --git a/tests/rp/Cargo.toml b/tests/rp/Cargo.toml index 8740cc48..4d6877cc 100644 --- a/tests/rp/Cargo.toml +++ b/tests/rp/Cargo.toml @@ -4,7 +4,7 @@ name = "embassy-rp-tests" version = "0.1.0" [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt"] } embassy-rp = { version = "0.1.0", path = "../../embassy-rp", features = ["nightly", "defmt", "unstable-pac", "unstable-traits"] } diff --git a/tests/stm32/Cargo.toml b/tests/stm32/Cargo.toml index 1d12995a..f1441d00 100644 --- a/tests/stm32/Cargo.toml +++ b/tests/stm32/Cargo.toml @@ -13,7 +13,7 @@ stm32wb55rg = ["embassy-stm32/stm32wb55rg"] # Nucleo stm32u585ai = ["embassy-stm32/stm32u585ai"] # IoT board [dependencies] -embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] } +embassy-sync = { version = "0.1.0", path = "../../embassy-sync", features = ["defmt"] } embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "integrated-timers"] } embassy-time = { version = "0.1.0", path = "../../embassy-time", features = ["defmt", "tick-32768hz"] } embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "unstable-pac", "memory-x", "time-driver-tim2"] } -- cgit v1.2.3