diff options
Diffstat (limited to 'embassy-sync')
-rw-r--r-- | embassy-sync/Cargo.toml | 10 | ||||
-rw-r--r-- | embassy-sync/README.md | 24 | ||||
-rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 73 | ||||
-rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 17 | ||||
-rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 6 |
5 files changed, 128 insertions, 2 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 14ab1d00..584d5ba9 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml @@ -2,6 +2,16 @@ name = "embassy-sync" version = "0.1.0" edition = "2021" +description = "no-std, no-alloc synchronization primitives with async support" +repository = "https://github.com/embassy-rs/embassy" +readme = "README.md" +license = "MIT OR Apache-2.0" +categories = [ + "embedded", + "no-std", + "concurrency", + "asynchronous", +] [package.metadata.embassy_docs] src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" diff --git a/embassy-sync/README.md b/embassy-sync/README.md index 106295c0..cc65cf6e 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md @@ -1,12 +1,32 @@ # embassy-sync -Synchronization primitives and data structures with an async API: +An [Embassy](https://embassy.dev) project. + +Synchronization primitives and data structures with async support: - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. -- [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. +- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. + +## Interoperability + +Futures from this crate can run on any executor. + +## Minimum supported Rust version (MSRV) + +Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release. + +## License + +This work is licensed under either of + +- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or + <http://www.apache.org/licenses/LICENSE-2.0>) +- MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>) + +at your option. diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e476..faaf99dc 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi }) } + fn available(&self, next_message_id: u64) -> u64 { + self.inner.lock(|s| s.borrow().next_message_id - next_message_id) + } + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi }) } + fn space(&self) -> usize { + self.inner.lock(|s| { + let s = s.borrow(); + s.queue.capacity() - s.queue.len() + }) + } + fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -388,6 +399,10 @@ pub trait PubSubBehavior<T> { /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; + /// Get the amount of messages that are between the given the next_message_id and the most recent message. + /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. + fn available(&self, next_message_id: u64) -> u64; + /// Try to publish a message to the queue. /// /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. @@ -396,6 +411,9 @@ pub trait PubSubBehavior<T> { /// Publish a message immediately fn publish_immediate(&self, message: T); + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + fn space(&self) -> usize; + /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -539,4 +557,59 @@ mod tests { drop(sub0); } + + #[futures_test::test] + async fn correct_available() { + let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); + + let sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(sub0.available(), 0); + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 1); + assert_eq!(sub1.available(), 1); + + sub1.next_message().await; + + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 2); + assert_eq!(sub1.available(), 1); + } + + #[futures_test::test] + async fn correct_space() { + let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(pub0.space(), 4); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 3); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 2); + + sub0.next_message().await; + sub0.next_message().await; + + assert_eq!(pub0.space(), 2); + + sub1.next_message().await; + assert_eq!(pub0.space(), 3); + sub1.next_message().await; + assert_eq!(pub0.space(), 4); + } } diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f6..e1edc9eb 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { @@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } /// An immediate publisher that holds a dynamic reference to the channel @@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the publisher wait action +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { /// The message we need to publish message: Option<T>, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe1..f420a75f 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { } } } + + /// The amount of messages this subscriber hasn't received yet + pub fn available(&self) -> u64 { + self.channel.available(self.next_message_id) + } } impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { @@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the subscriber wait action +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } |