summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/channel.rs430
-rw-r--r--embassy/src/channel/mod.rs2
-rw-r--r--embassy/src/channel/mpsc.rs822
-rw-r--r--embassy/src/channel/signal.rs2
-rw-r--r--examples/nrf/src/bin/channel.rs45
-rw-r--r--examples/nrf/src/bin/channel_sender_receiver.rs52
-rw-r--r--examples/nrf/src/bin/mpsc.rs60
-rw-r--r--examples/nrf/src/bin/uart_split.rs23
-rw-r--r--examples/stm32f3/src/bin/button_events.rs59
-rw-r--r--examples/stm32h7/src/bin/usart_split.rs26
10 files changed, 571 insertions, 950 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs
new file mode 100644
index 00000000..9084cd57
--- /dev/null
+++ b/embassy/src/channel/channel.rs
@@ -0,0 +1,430 @@
+//! A queue for sending values between asynchronous tasks.
+//!
+//! It can be used concurrently by multiple producers (senders) and multiple
+//! consumers (receivers), i.e. it is an "MPMC channel".
+//!
+//! This queue takes a Mutex type so that various
+//! targets can be attained. For example, a ThreadModeMutex can be used
+//! for single-core Cortex-M targets where messages are only passed
+//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
+//! can also be used for single-core targets where messages are to be
+//! passed from exception mode e.g. out of an interrupt handler.
+//!
+//! This module provides a bounded channel that has a limit on the number of
+//! messages that it can store, and if this limit is reached, trying to send
+//! another message will result in an error being returned.
+//!
+
+use core::cell::RefCell;
+use core::pin::Pin;
+use core::task::Context;
+use core::task::Poll;
+
+use futures::Future;
+use heapless::Deque;
+
+use crate::blocking_mutex::raw::RawMutex;
+use crate::blocking_mutex::Mutex;
+use crate::waitqueue::WakerRegistration;
+
+/// Send-only access to a [`Channel`].
+#[derive(Copy, Clone)]
+pub struct Sender<'ch, M, T, const N: usize>
+where
+ M: RawMutex,
+{
+ channel: &'ch Channel<M, T, N>,
+}
+
+impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
+where
+ M: RawMutex,
+{
+ /// Sends a value.
+ ///
+ /// See [`Channel::send()`]
+ pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
+ self.channel.send(message)
+ }
+
+ /// Attempt to immediately send a message.
+ ///
+ /// See [`Channel::send()`]
+ pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
+ self.channel.try_send(message)
+ }
+}
+
+/// Receive-only access to a [`Channel`].
+#[derive(Copy, Clone)]
+pub struct Receiver<'ch, M, T, const N: usize>
+where
+ M: RawMutex,
+{
+ channel: &'ch Channel<M, T, N>,
+}
+
+impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
+where
+ M: RawMutex,
+{
+ /// Receive the next value.
+ ///
+ /// See [`Channel::recv()`].
+ pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
+ self.channel.recv()
+ }
+
+ /// Attempt to immediately receive the next value.
+ ///
+ /// See [`Channel::try_recv()`]
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ self.channel.try_recv()
+ }
+}
+
+pub struct RecvFuture<'ch, M, T, const N: usize>
+where
+ M: RawMutex,
+{
+ channel: &'ch Channel<M, T, N>,
+}
+
+impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
+where
+ M: RawMutex,
+{
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
+ self.channel
+ .lock(|c| match c.try_recv_with_context(Some(cx)) {
+ Ok(v) => Poll::Ready(v),
+ Err(TryRecvError::Empty) => Poll::Pending,
+ })
+ }
+}
+
+pub struct SendFuture<'ch, M, T, const N: usize>
+where
+ M: RawMutex,
+{
+ channel: &'ch Channel<M, T, N>,
+ message: Option<T>,
+}
+
+impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
+where
+ M: RawMutex,
+{
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.message.take() {
+ Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
+ Ok(..) => Poll::Ready(()),
+ Err(TrySendError::Full(m)) => {
+ self.message = Some(m);
+ Poll::Pending
+ }
+ },
+ None => panic!("Message cannot be None"),
+ }
+ }
+}
+
+impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
+
+/// Error returned by [`try_recv`](Channel::try_recv).
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum TryRecvError {
+ /// A message could not be received because the channel is empty.
+ Empty,
+}
+
+/// Error returned by [`try_send`](Channel::try_send).
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum TrySendError<T> {
+ /// The data could not be sent on the channel because the channel is
+ /// currently full and sending would require blocking.
+ Full(T),
+}
+
+struct ChannelState<T, const N: usize> {
+ queue: Deque<T, N>,
+ receiver_waker: WakerRegistration,
+ senders_waker: WakerRegistration,
+}
+
+impl<T, const N: usize> ChannelState<T, N> {
+ const fn new() -> Self {
+ ChannelState {
+ queue: Deque::new(),
+ receiver_waker: WakerRegistration::new(),
+ senders_waker: WakerRegistration::new(),
+ }
+ }
+
+ fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.try_recv_with_context(None)
+ }
+
+ fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
+ if self.queue.is_full() {
+ self.senders_waker.wake();
+ }
+
+ if let Some(message) = self.queue.pop_front() {
+ Ok(message)
+ } else {
+ if let Some(cx) = cx {
+ self.receiver_waker.register(cx.waker());
+ }
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
+ self.try_send_with_context(message, None)
+ }
+
+ fn try_send_with_context(
+ &mut self,
+ message: T,
+ cx: Option<&mut Context<'_>>,
+ ) -> Result<(), TrySendError<T>> {
+ match self.queue.push_back(message) {
+ Ok(()) => {
+ self.receiver_waker.wake();
+ Ok(())
+ }
+ Err(message) => {
+ if let Some(cx) = cx {
+ self.senders_waker.register(cx.waker());
+ }
+ Err(TrySendError::Full(message))
+ }
+ }
+ }
+}
+
+/// A bounded channel for communicating between asynchronous tasks
+/// with backpressure.
+///
+/// The channel will buffer up to the provided number of messages. Once the
+/// buffer is full, attempts to `send` new messages will wait until a message is
+/// received from the channel.
+///
+/// All data sent will become available in the same order as it was sent.
+pub struct Channel<M, T, const N: usize>
+where
+ M: RawMutex,
+{
+ inner: Mutex<M, RefCell<ChannelState<T, N>>>,
+}
+
+impl<M, T, const N: usize> Channel<M, T, N>
+where
+ M: RawMutex,
+{
+ /// Establish a new bounded channel. For example, to create one with a NoopMutex:
+ ///
+ /// ```
+ /// use embassy::channel::channel::Channel;
+ /// use embassy::blocking_mutex::raw::NoopRawMutex;
+ ///
+ /// // Declare a bounded channel of 3 u32s.
+ /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
+ /// ```
+ #[cfg(feature = "nightly")]
+ pub const fn new() -> Self {
+ Self {
+ inner: Mutex::new(RefCell::new(ChannelState::new())),
+ }
+ }
+
+ /// Establish a new bounded channel. For example, to create one with a NoopMutex:
+ ///
+ /// ```
+ /// use embassy::channel::channel::Channel;
+ /// use embassy::blocking_mutex::raw::NoopRawMutex;
+ ///
+ /// // Declare a bounded channel of 3 u32s.
+ /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
+ /// ```
+ #[cfg(not(feature = "nightly"))]
+ pub fn new() -> Self {
+ Self {
+ inner: Mutex::new(RefCell::new(ChannelState::new())),
+ }
+ }
+
+ fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
+ self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
+ }
+
+ /// Get a sender for this channel.
+ pub fn sender(&self) -> Sender<'_, M, T, N> {
+ Sender { channel: self }
+ }
+
+ /// Get a receiver for this channel.
+ pub fn receiver(&self) -> Receiver<'_, M, T, N> {
+ Receiver { channel: self }
+ }
+
+ /// Send a value, waiting until there is capacity.
+ ///
+ /// Sending completes when the value has been pushed to the channel's queue.
+ /// This doesn't mean the value has been received yet.
+ pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
+ SendFuture {
+ channel: self,
+ message: Some(message),
+ }
+ }
+
+ /// Attempt to immediately send a message.
+ ///
+ /// This method differs from [`send`] by returning immediately if the channel's
+ /// buffer is full, instead of waiting.
+ ///
+ /// # Errors
+ ///
+ /// If the channel capacity has been reached, i.e., the channel has `n`
+ /// buffered values where `n` is the argument passed to [`Channel`], then an
+ /// error is returned.
+ pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
+ self.lock(|c| c.try_send(message))
+ }
+
+ /// Receive the next value.
+ ///
+ /// If there are no messages in the channel's buffer, this method will
+ /// wait until a message is sent.
+ pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
+ RecvFuture { channel: self }
+ }
+
+ /// Attempt to immediately receive a message.
+ ///
+ /// This method will either receive a message from the channel immediately or return an error
+ /// if the channel is empty.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ self.lock(|c| c.try_recv())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use core::time::Duration;
+
+ use futures::task::SpawnExt;
+ use futures_executor::ThreadPool;
+ use futures_timer::Delay;
+
+ use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
+ use crate::util::Forever;
+
+ use super::*;
+
+ fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
+ c.queue.capacity() - c.queue.len()
+ }
+
+ #[test]
+ fn sending_once() {
+ let mut c = ChannelState::<u32, 3>::new();
+ assert!(c.try_send(1).is_ok());
+ assert_eq!(capacity(&c), 2);
+ }
+
+ #[test]
+ fn sending_when_full() {
+ let mut c = ChannelState::<u32, 3>::new();
+ let _ = c.try_send(1);
+ let _ = c.try_send(1);
+ let _ = c.try_send(1);
+ match c.try_send(2) {
+ Err(TrySendError::Full(2)) => assert!(true),
+ _ => assert!(false),
+ }
+ assert_eq!(capacity(&c), 0);
+ }
+
+ #[test]
+ fn receiving_once_with_one_send() {
+ let mut c = ChannelState::<u32, 3>::new();
+ assert!(c.try_send(1).is_ok());
+ assert_eq!(c.try_recv().unwrap(), 1);
+ assert_eq!(capacity(&c), 3);
+ }
+
+ #[test]
+ fn receiving_when_empty() {
+ let mut c = ChannelState::<u32, 3>::new();
+ match c.try_recv() {
+ Err(TryRecvError::Empty) => assert!(true),
+ _ => assert!(false),
+ }
+ assert_eq!(capacity(&c), 3);
+ }
+
+ #[test]
+ fn simple_send_and_receive() {
+ let c = Channel::<NoopRawMutex, u32, 3>::new();
+ assert!(c.try_send(1).is_ok());
+ assert_eq!(c.try_recv().unwrap(), 1);
+ }
+
+ #[futures_test::test]
+ async fn receiver_receives_given_try_send_async() {
+ let executor = ThreadPool::new().unwrap();
+
+ static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
+ let c = &*CHANNEL.put(Channel::new());
+ let c2 = c;
+ assert!(executor
+ .spawn(async move {
+ assert!(c2.try_send(1).is_ok());
+ })
+ .is_ok());
+ assert_eq!(c.recv().await, 1);
+ }
+
+ #[futures_test::test]
+ async fn sender_send_completes_if_capacity() {
+ let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
+ c.send(1).await;
+ assert_eq!(c.recv().await, 1);
+ }
+
+ #[futures_test::test]
+ async fn senders_sends_wait_until_capacity() {
+ let executor = ThreadPool::new().unwrap();
+
+ static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
+ let c = &*CHANNEL.put(Channel::new());
+ assert!(c.try_send(1).is_ok());
+
+ let c2 = c;
+ let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
+ let c2 = c;
+ let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
+ // Wish I could think of a means of determining that the async send is waiting instead.
+ // However, I've used the debugger to observe that the send does indeed wait.
+ Delay::new(Duration::from_millis(500)).await;
+ assert_eq!(c.recv().await, 1);
+ assert!(executor
+ .spawn(async move {
+ loop {
+ c.recv().await;
+ }
+ })
+ .is_ok());
+ send_task_1.unwrap().await;
+ send_task_2.unwrap().await;
+ }
+}
diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs
index 9e8c67ee..e51a442d 100644
--- a/embassy/src/channel/mod.rs
+++ b/embassy/src/channel/mod.rs
@@ -1,4 +1,4 @@
//! Async channels
-pub mod mpsc;
+pub mod channel;
pub mod signal;
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs
deleted file mode 100644
index 32787d81..00000000
--- a/embassy/src/channel/mpsc.rs
+++ /dev/null
@@ -1,822 +0,0 @@
-//! A multi-producer, single-consumer queue for sending values between
-//! asynchronous tasks. This queue takes a Mutex type so that various
-//! targets can be attained. For example, a ThreadModeMutex can be used
-//! for single-core Cortex-M targets where messages are only passed
-//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
-//! can also be used for single-core targets where messages are to be
-//! passed from exception mode e.g. out of an interrupt handler.
-//!
-//! This module provides a bounded channel that has a limit on the number of
-//! messages that it can store, and if this limit is reached, trying to send
-//! another message will result in an error being returned.
-//!
-//! Similar to the `mpsc` channels provided by `std`, the channel constructor
-//! functions provide separate send and receive handles, [`Sender`] and
-//! [`Receiver`]. If there is no message to read, the current task will be
-//! notified when a new value is sent. [`Sender`] allows sending values into
-//! the channel. If the bounded channel is at capacity, the send is rejected.
-//!
-//! # Disconnection
-//!
-//! When all [`Sender`] handles have been dropped, it is no longer
-//! possible to send values into the channel. This is considered the termination
-//! event of the stream.
-//!
-//! If the [`Receiver`] handle is dropped, then messages can no longer
-//! be read out of the channel. In this case, all further attempts to send will
-//! result in an error.
-//!
-//! # Clean Shutdown
-//!
-//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to
-//! remain in the channel. Instead, it is usually desirable to perform a "clean"
-//! shutdown. To do this, the receiver first calls `close`, which will prevent
-//! any further messages to be sent into the channel. Then, the receiver
-//! consumes the channel to completion, at which point the receiver can be
-//! dropped.
-//!
-//! This channel and its associated types were derived from <https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html>
-
-use core::cell::RefCell;
-use core::fmt;
-use core::pin::Pin;
-use core::task::Context;
-use core::task::Poll;
-use core::task::Waker;
-
-use futures::Future;
-use heapless::Deque;
-
-use crate::blocking_mutex::raw::RawMutex;
-use crate::blocking_mutex::Mutex;
-use crate::waitqueue::WakerRegistration;
-
-/// Send values to the associated `Receiver`.
-///
-/// Instances are created by the [`split`](split) function.
-pub struct Sender<'ch, M, T, const N: usize>
-where
- M: RawMutex,
-{
- channel: &'ch Channel<M, T, N>,
-}
-
-/// Receive values from the associated `Sender`.
-///
-/// Instances are created by the [`split`](split) function.
-pub struct Receiver<'ch, M, T, const N: usize>
-where
- M: RawMutex,
-{
- channel: &'ch Channel<M, T, N>,
-}
-
-/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
-///
-/// All data sent on `Sender` will become available on `Receiver` in the same
-/// order as it was sent.
-///
-/// The `Sender` can be cloned to `send` to the same channel from multiple code
-/// locations. Only one `Receiver` is valid.
-///
-/// If the `Receiver` is disconnected while trying to `send`, the `send` method
-/// will return a `SendError`. Similarly, if `Sender` is disconnected while
-/// trying to `recv`, the `recv` method will return a `RecvError`.
-///
-/// Note that when splitting the channel, the sender and receiver cannot outlive
-/// their channel. The following will therefore fail compilation:
-////
-/// ```compile_fail
-/// use embassy::channel::mpsc;
-/// use embassy::channel::mpsc::{Channel, WithThreadModeOnly};
-///
-/// let (sender, receiver) = {
-/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
-/// mpsc::split(&mut channel)
-/// };
-/// ```
-pub fn split<M, T, const N: usize>(
- channel: &mut Channel<M, T, N>,
-) -> (Sender<M, T, N>, Receiver<M, T, N>)
-where
- M: RawMutex,
-{
- let sender = Sender { channel };
- let receiver = Receiver { channel };
- channel.lock(|c| {
- c.register_receiver();
- c.register_sender();
- });
- (sender, receiver)
-}
-
-impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
-where
- M: RawMutex,
-{
- /// Receives the next value for this receiver.
- ///
- /// This method returns `None` if the channel has been closed and there are
- /// no remaining messages in the channel's buffer. This indicates that no
- /// further values can ever be received from this `Receiver`. The channel is
- /// closed when all senders have been dropped, or when [`close`] is called.
- ///
- /// If there are no messages in the channel's buffer, but the channel has
- /// not yet been closed, this method will sleep until a message is sent or
- /// the channel is closed.
- ///
- /// Note that if [`close`] is called, but there are still outstanding
- /// messages from before it was closed, the channel is not considered
- /// closed by `recv` until they are all consumed.
- ///
- /// [`close`]: Self::close
- pub fn recv(&mut self) -> RecvFuture<'_, M, T, N> {
- RecvFuture {
- channel: self.channel,
- }
- }
-
- /// Attempts to immediately receive a message on this `Receiver`
- ///
- /// This method will either receive a message from the channel immediately or return an error
- /// if the channel is empty.
- pub fn try_recv(&self) -> Result<T, TryRecvError> {
- self.channel.lock(|c| c.try_recv())
- }
-
- /// Closes the receiving half of a channel without dropping it.
- ///
- /// This prevents any further messages from being sent on the channel while
- /// still enabling the receiver to drain messages that are buffered.
- ///
- /// To guarantee that no messages are dropped, after calling `close()`,
- /// `recv()` must be called until `None` is returned. If there are
- /// outstanding messages, the `recv` method will not return `None`
- /// until those are released.
- ///
- pub fn close(&mut self) {
- self.channel.lock(|c| c.close())
- }
-}
-
-impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
-where
- M: RawMutex,
-{
- fn drop(&mut self) {
- self.channel.lock(|c| c.deregister_receiver())
- }
-}
-
-pub struct RecvFuture<'ch, M, T, const N: usize>
-where
- M: RawMutex,
-{
- channel: &'ch Channel<M, T, N>,
-}
-
-impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
-where
- M: RawMutex,
-{
- type Output = Option<T>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.channel
- .lock(|c| match c.try_recv_with_context(Some(cx)) {
- Ok(v) => Poll::Ready(Some(v)),
- Err(TryRecvError::Closed) => Poll::Ready(None),
- Err(TryRecvError::Empty) => Poll::Pending,
- })
- }
-}
-
-impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
-where
- M: RawMutex,
-{
- /// Sends a value, waiting until there is capacity.
- ///
- /// A successful send occurs when it is determined that the other end of the
- /// channel has not hung up already. An unsuccessful send would be one where
- /// the corresponding receiver has already been closed. Note that a return
- /// value of `Err` means that the data will never be received, but a return
- /// value of `Ok` does not mean that the data will be received. It is
- /// possible for the corresponding receiver to hang up immediately after
- /// this function returns `Ok`.
- ///
- /// # Errors
- ///
- /// If the receive half of the channel is closed, either due to [`close`]
- /// being called or the [`Receiver`] handle dropping, the function returns
- /// an error. The error includes the value passed to `send`.
- ///
- /// [`close`]: Receiver::close
- /// [`Receiver`]: Receiver
- pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
- SendFuture {
- channel: self.channel,
- message: Some(message),
- }
- }
-
- /// Attempts to immediately send a message on this `Sender`
- ///
- /// This method differs from [`send`] by returning immediately if the channel's
- /// buffer is full or no receiver is waiting to acquire some data. Compared
- /// with [`send`], this function has two failure cases instead of one (one for
- /// disconnection, one for a full buffer).
- ///
- /// # Errors
- ///
- /// If the channel capacity has been reached, i.e., the channel has `n`
- /// buffered values where `n` is the argument passed to [`channel`], then an
- /// error is returned.
- ///
- /// If the receive half of the channel is closed, either due to [`close`]
- /// being called or the [`Receiver`] handle dropping, the function returns
- /// an error. The error includes the value passed to `send`.
- ///
- /// [`send`]: Sender::send
- /// [`channel`]: channel
- /// [`close`]: Receiver::close
- pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
- self.channel.lock(|c| c.try_send(message))
- }
-
- /// Completes when the receiver has dropped.
- ///
- /// This allows the producers to get notified when interest in the produced
- /// values is canceled and immediately stop doing work.
- pub async fn closed(&self) {
- CloseFuture {
- channel: self.channel,
- }
- .await
- }
-
- /// Checks if the channel has been closed. This happens when the
- /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
- /// called.
- ///
- /// [`Receiver`]: Receiver
- /// [`Receiver::close`]: Receiver::close
- pub fn is_closed(&self) -> bool {
- self.channel.lock(|c| c.is_closed())
- }
-}
-
-pub struct SendFuture<'ch, M, T, const N: usize>
-where
- M: RawMutex,
-{
- channel: &'ch Channel<M, T, N>,
- message: Option<T>,
-}
-
-impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
-where
- M: RawMutex,
-{
- type Output = Result<(), SendError<T>>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.message.take() {
- Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
- Ok(..) => Poll::Ready(Ok(())),
- Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
- Err(TrySendError::Full(m)) => {
- self.message = Some(m);
- Poll::Pending
- }
- },
- None => panic!("Message cannot be None"),
- }
- }
-}
-
-impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
-
-struct CloseFuture<'ch, M, T, const N: usize>
-where
- M: RawMutex,
-{
- channel: &'ch Channel<M, T, N>,
-}
-
-impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
-where
- M: RawMutex,
-{
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) {
- Poll::Ready(())
- } else {
- Poll::Pending
- }
- }
-}
-
-impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
-where
- M: RawMutex,
-{
- fn drop(&mut self) {
- self.channel.lock(|c| c.deregister_sender())
- }
-}
-
-impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
-where
- M: RawMutex,
-{
- fn clone(&self) -> Self {
- self.channel.lock(|c| c.register_sender());
- Sender {
- channel: self.channel,
- }
- }
-}
-
-/// An error returned from the [`try_recv`] method.
-///
-/// [`try_recv`]: Receiver::try_recv
-#[derive(PartialEq, Eq, Clone, Copy, Debug)]
-#[cfg_attr(feature = "defmt", derive(defmt::Format))]
-pub enum TryRecvError {
- /// A message could not be received because the channel is empty.
- Empty,
-
- /// The message could not be received because the channel is empty and closed.
- Closed,
-}
-
-/// Error returned by the `Sender`.
-#[derive(Debug)]
-pub struct SendError<T>(pub T);
-
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-#[cfg(feature = "defmt")]
-impl<T> defmt::Format for SendError<T> {
- fn format(&self, fmt: defmt::Formatter<'_>) {
- defmt::write!(fmt, "channel closed")
- }
-}
-
-/// This enumeration is the list of the possible error outcomes for the
-/// [try_send](Sender::try_send) method.
-#[derive(Debug)]
-pub enum TrySendError<T> {
- /// The data could not be sent on the channel because the channel is
- /// currently full and sending would require blocking.
- Full(T),
-
- /// The receive half of the channel was explicitly closed or has been
- /// dropped.
- Closed(T),
-}
-
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- fmt,
- "{}",
- match self {
- TrySendError::Full(..) => "no available capacity",
- TrySendError::Closed(..) => "channel closed",
- }
- )
- }
-}
-
-#[cfg(feature = "defmt")]
-impl<T> defmt::Format for TrySendError<T> {
- fn format(&self, fmt: defmt::Formatter<'_>) {
- match self {
- TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"),
- TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"),
- }
- }
-}
-
-struct ChannelState<T, const N: usize> {
- queue: Deque<T, N>,
- closed: bool,
- receiver_registered: bool,
- senders_registered: u32,
- receiver_waker: WakerRegistration,
- senders_waker: WakerRegistration,
-}
-
-impl<T, const N: usize> ChannelState<T, N> {
- const fn new() -> Self {
- ChannelState {
- queue: Deque::new(),
- closed: false,
- receiver_registered: false,
- senders_registered: 0,
- receiver_waker: WakerRegistration::new(),
- senders_waker: WakerRegistration::new(),
- }
- }
-
- fn try_recv(&mut self) -> Result<T, TryRecvError> {
- self.try_recv_with_context(None)
- }
-
- fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
- if self.queue.is_full() {
- self.senders_waker.wake();
- }
-
- if let Some(message) = self.queue.pop_front() {
- Ok(message)
- } else if !self.closed {
- if let Some(cx) = cx {
- self.set_receiver_waker(cx.waker());
- }
- Err(TryRecvError::Empty)
- } else {
- Err(TryRecvError::Closed)
- }
- }
-
- fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
- self.try_send_with_context(message, None)
- }
-
- fn try_send_with_context(
- &mut self,
- message: T,
- cx: Option<&mut Context<'_>>,
- ) -> Result<(), TrySendError<T>> {
- if self.closed {
- return Err(TrySendError::Closed(message));
- }
-
- match self.queue.push_back(message) {
- Ok(()) => {
- self.receiver_waker.wake();
-
- Ok(())
- }
- Err(message) => {
- cx.into_iter()
- .for_each(|cx| self.set_senders_waker(cx.waker()));
- Err(TrySendError::Full(message))
- }
- }
- }
-
- fn close(&mut self) {
- self.receiver_waker.wake();
- self.closed = true;
- }
-
- fn is_closed(&mut self) -> bool {
- self.is_closed_with_context(None)
- }
-
- fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool {
- if self.closed {
- cx.into_iter()
- .for_each(|cx| self.set_senders_waker(cx.waker()));
- true
- } else {
- false
- }
- }
-
- fn register_receiver(&mut self) {
- assert!(!self.receiver_registered);
- self.receiver_registered = true;
- }
-
- fn deregister_receiver(&mut self) {
- if self.receiver_registered {
- self.closed = true;
- self.senders_waker.wake();
- }
- self.receiver_registered = false;
- }
-
- fn register_sender(&mut self) {
- self.senders_registered += 1;
- }
-
- fn deregister_sender(&mut self) {
- assert!(self.senders_registered > 0);
- self.senders_registered -= 1;
- if self.senders_registered == 0 {
- self.receiver_waker.wake();
- self.closed = true;
- }
- }
-
- fn set_receiver_waker(&mut self, receiver_waker: &Waker) {
- self.receiver_waker.register(receiver_waker);
- }
-
- fn set_senders_waker(&mut self, senders_waker: &Waker) {
- // Dispose of any existing sender causing them to be polled again.
- // This could cause a spin given multiple concurrent senders, however given that
- // most sends only block waiting for the receiver to become active, this should
- // be a short-lived activity. The upside is a greatly simplified implementation
- // that avoids the need for intrusive linked-lists and unsafe operations on pinned
- // pointers.
- self.senders_waker.wake();
- self.senders_waker.register(senders_waker);
- }
-}
-
-/// A a bounded mpsc channel for communicating between asynchronous tasks
-/// with backpressure.
-///
-/// The channel will buffer up to the provided number of messages. Once the
-/// buffer is full, attempts to `send` new messages will wait until a message is
-/// received from the channel.
-///
-/// All data sent will become available in the same order as it was sent.
-pub struct Channel<M, T, const N: usize>
-where
- M: RawMutex,
-{
- inner: Mutex<M, RefCell<ChannelState<T, N>>>,
-}
-
-impl<M, T, const N: usize> Channel<M, T, N>
-where
- M: RawMutex,
-{
- /// Establish a new bounded channel. For example, to create one with a NoopMutex:
- ///
- /// ```
- /// use embassy::channel::mpsc;
- /// use embassy::blocking_mutex::raw::NoopRawMutex;
- /// use embassy::channel::mpsc::Channel;
- ///
- /// // Declare a bounded channel of 3 u32s.
- /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
- /// // once we have a channel, obtain its sender and receiver
- /// let (sender, receiver) = mpsc::split(&mut channel);
- /// ```
- #[cfg(feature = "nightly")]
- pub const fn new() -> Self {
- Self {
- inner: Mutex::new(RefCell::new(ChannelState::new())),
- }
- }
-
- /// Establish a new bounded channel. For example, to create one with a NoopMutex:
- ///
- /// ```
- /// use embassy::channel::mpsc;
- /// use embassy::blocking_mutex::raw::NoopRawMutex;
- /// use embassy::channel::mpsc::Channel;
- ///
- /// // Declare a bounded channel of 3 u32s.
- /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
- /// // once we have a channel, obtain its sender and receiver
- /// let (sender, receiver) = mpsc::split(&mut channel);
- /// ```
- #[cfg(not(feature = "nightly"))]
- pub fn new() -> Self {
- Self {
- inner: Mutex::new(RefCell::new(ChannelState::new())),
- }
- }
-
- fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
- self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
- }
-}
-
-#[cfg(test)]
-mod tests {
- use core::time::Duration;
-
- use futures::task::SpawnExt;
- use futures_executor::ThreadPool;
- use futures_timer::Delay;
-
- use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
- use crate::util::Forever;
-
- use super::*;
-
- fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
- c.queue.capacity() - c.queue.len()
- }
-
- #[test]
- fn sending_once() {
- let mut c = ChannelState::<u32, 3>::new();
- assert!(c.try_send(1).is_ok());
- assert_eq!(capacity(&c), 2);
- }
-
- #[test]
- fn sending_when_full() {
- let mut c = ChannelState::<u32, 3>::new();
- let _ = c.try_send(1);
- let _ = c.try_send(1);
- let _ = c.try_send(1);
- match c.try_send(2) {
- Err(TrySendError::Full(2)) => assert!(true),
- _ => assert!(false),
- }
- assert_eq!(capacity(&c), 0);
- }
-
- #[test]
- fn sending_when_closed() {
- let mut c = ChannelState::<u32, 3>::new();
- c.closed = true;
- match c.try_send(2) {
- Err(TrySendError::Closed(2)) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[test]
- fn receiving_once_with_one_send() {
- let mut c = ChannelState::<u32, 3>::new();
- assert!(c.try_send(1).is_ok());
- assert_eq!(c.try_recv().unwrap(), 1);
- assert_eq!(capacity(&c), 3);
- }
-
- #[test]
- fn receiving_when_empty() {
- let mut c = ChannelState::<u32, 3>::new();
- match c.try_recv() {
- Err(TryRecvError::Empty) => assert!(true),
- _ => assert!(false),
- }
- assert_eq!(capacity(&c), 3);
- }
-
- #[test]
- fn receiving_when_closed() {
- let mut c = ChannelState::<u32, 3>::new();
- c.closed = true;
- match c.try_recv() {
- Err(TryRecvError::Closed) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[test]
- fn simple_send_and_receive() {
- let mut c = Channel::<NoopRawMutex, u32, 3>::new();
- let (s, r) = split(&mut c);
- assert!(s.clone().try_send(1).is_ok());
- assert_eq!(r.try_recv().unwrap(), 1);
- }
-
- #[test]
- fn should_close_without_sender() {
- let mut c = Channel::<NoopRawMutex, u32, 3>::new();
- let (s, r) = split(&mut c);
- drop(s);
- match r.try_recv() {
- Err(TryRecvError::Closed) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[test]
- fn should_close_once_drained() {
- let mut c = Channel::<NoopRawMutex, u32, 3>::new();
- let (s, r) = split(&mut c);
- assert!(s.try_send(1).is_ok());
- drop(s);
- assert_eq!(r.try_recv().unwrap(), 1);
- match r.try_recv() {
- Err(TryRecvError::Closed) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[test]
- fn should_reject_send_when_receiver_dropped() {
- let mut c = Channel::<NoopRawMutex, u32, 3>::new();
- let (s, r) = split(&mut c);
- drop(r);
- match s.try_send(1) {
- Err(TrySendError::Closed(1)) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[test]
- fn should_reject_send_when_channel_closed() {
- let mut c = Channel::<NoopRawMutex, u32, 3>::new();
- let (s, mut r) = split(&mut c);
- assert!(s.try_send(1).is_ok());
- r.close();
- assert_eq!(r.try_recv().unwrap(), 1);
- match r.try_recv() {
- Err(TryRecvError::Closed) => assert!(true),
- _ => assert!(false),
- }
- assert!(s.is_closed());
- }
-
- #[futures_test::test]
- async fn receiver_closes_when_sender_dropped_async() {
- let executor = ThreadPool::new().unwrap();
-
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s, mut r) = split(c);
- assert!(executor
- .spawn(async move {
- drop(s);
- })
- .is_ok());
- assert_eq!(r.recv().await, None);
- }
-
- #[futures_test::test]
- async fn receiver_receives_given_try_send_async() {
- let executor = ThreadPool::new().unwrap();
-
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s, mut r) = split(c);
- assert!(executor
- .spawn(async move {
- assert!(s.try_send(1).is_ok());
- })
- .is_ok());
- assert_eq!(r.recv().await, Some(1));
- }
-
- #[futures_test::test]
- async fn sender_send_completes_if_capacity() {
- let mut c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
- let (s, mut r) = split(&mut c);
- assert!(s.send(1).await.is_ok());
- assert_eq!(r.recv().await, Some(1));
- }
-
- #[futures_test::test]
- async fn sender_send_completes_if_closed() {
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s, r) = split(c);
- drop(r);
- match s.send(1).await {
- Err(SendError(1)) => assert!(true),
- _ => assert!(false),
- }
- }
-
- #[futures_test::test]
- async fn senders_sends_wait_until_capacity() {
- let executor = ThreadPool::new().unwrap();
-
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s0, mut r) = split(c);
- assert!(s0.try_send(1).is_ok());
- let s1 = s0.clone();
- let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
- let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await });
- // Wish I could think of a means of determining that the async send is waiting instead.
- // However, I've used the debugger to observe that the send does indeed wait.
- Delay::new(Duration::from_millis(500)).await;
- assert_eq!(r.recv().await, Some(1));
- assert!(executor
- .spawn(async move { while let Some(_) = r.recv().await {} })
- .is_ok());
- assert!(send_task_1.unwrap().await.is_ok());
- assert!(send_task_2.unwrap().await.is_ok());
- }
-
- #[futures_test::test]
- async fn sender_close_completes_if_closing() {
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s, mut r) = split(c);
- r.close();
- s.closed().await;
- }
-
- #[futures_test::test]
- async fn sender_close_completes_if_closed() {
- static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
- let c = CHANNEL.put(Channel::new());
- let (s, r) = split(c);
- drop(r);
- s.closed().await;
- }
-}
diff --git a/embassy/src/channel/signal.rs b/embassy/src/channel/signal.rs
index 027f4f47..e1f6c4b1 100644
--- a/embassy/src/channel/signal.rs
+++ b/embassy/src/channel/signal.rs
@@ -5,7 +5,7 @@ use core::task::{Context, Poll, Waker};
/// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks.
/// For a simple use-case where the receiver is only ever interested in the latest value of
-/// something, Signals work well. For more advanced use cases, please consider [crate::channel::mpsc].
+/// something, Signals work well. For more advanced use cases, you might want to use [`Channel`](crate::channel::channel::Channel) instead..
///
/// Signals are generally declared as being a static const and then borrowed as required.
///
diff --git a/examples/nrf/src/bin/channel.rs b/examples/nrf/src/bin/channel.rs
new file mode 100644
index 00000000..476ec09a
--- /dev/null
+++ b/examples/nrf/src/bin/channel.rs
@@ -0,0 +1,45 @@
+#![no_std]
+#![no_main]
+#![feature(type_alias_impl_trait)]
+
+use defmt::unwrap;
+use embassy::blocking_mutex::raw::ThreadModeRawMutex;
+use embassy::channel::channel::Channel;
+use embassy::executor::Spawner;
+use embassy::time::{Duration, Timer};
+use embassy_nrf::gpio::{Level, Output, OutputDrive};
+use embassy_nrf::Peripherals;
+
+use defmt_rtt as _; // global logger
+use panic_probe as _;
+
+enum LedState {
+ On,
+ Off,
+}
+
+static CHANNEL: Channel<ThreadModeRawMutex, LedState, 1> = Channel::new();
+
+#[embassy::task]
+async fn my_task() {
+ loop {
+ CHANNEL.send(LedState::On).await;
+ Timer::after(Duration::from_secs(1)).await;
+ CHANNEL.send(LedState::Off).await;
+ Timer::after(Duration::from_secs(1)).await;
+ }
+}
+
+#[embassy::main]
+async fn main(spawner: Spawner, p: Peripherals) {
+ let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
+
+ unwrap!(spawner.spawn(my_task()));
+
+ loop {
+ match CHANNEL.recv().await {
+ LedState::On => led.set_high(),
+ LedState::Off => led.set_low(),
+ }
+ }
+}
diff --git a/examples/nrf/src/bin/channel_sender_receiver.rs b/examples/nrf/src/bin/channel_sender_receiver.rs
new file mode 100644
index 00000000..c79f2fd6
--- /dev/null
+++ b/examples/nrf/src/bin/channel_sender_receiver.rs
@@ -0,0 +1,52 @@
+#![no_std]
+#![no_main]
+#![feature(type_alias_impl_trait)]
+
+use defmt::unwrap;
+use embassy::blocking_mutex::raw::NoopRawMutex;
+use embassy::channel::channel::{Channel, Receiver, Sender};
+use embassy::executor::Spawner;
+use embassy::time::{Duration, Timer};
+use embassy::util::Forever;
+use embassy_nrf::gpio::{AnyPin, Level, Output, OutputDrive, Pin};
+use embassy_nrf::Peripherals;
+
+use defmt_rtt as _; // global logger
+use panic_probe as _;
+
+enum LedState {
+ On,
+ Off,
+}
+
+static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new();
+
+#[embassy::task]
+async fn send_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) {
+ loop {
+ sender.send(LedState::On).await;
+ Timer::after(Duration::from_secs(1)).await;
+ sender.send(LedState::Off).await;
+ Timer::after(Duration::from_secs(1)).await;
+ }
+}
+
+#[embassy::task]
+async fn recv_task(led: AnyPin, receiver: Receiver<'static, NoopRawMutex, LedState, 1>) {
+ let mut led = Output::new(led, Level::Low, OutputDrive::Standard);
+
+ loop {
+ match receiver.recv().await {
+ LedState::On => led.set_high(),
+ LedState::Off => led.set_low(),
+ }
+ }
+}
+
+#[embassy::main]
+async fn main(spawner: Spawner, p: Peripherals) {
+ let channel = CHANNEL.put(Channel::new());
+
+ unwrap!(spawner.spawn(send_task(channel.sender())));
+ unwrap!(spawner.spawn(recv_task(p.P0_13.degrade(), channel.receiver())));
+}
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
deleted file mode 100644
index 0cb18275..00000000
--- a/examples/nrf/src/bin/mpsc.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-#![no_std]
-#![no_main]
-#![feature(type_alias_impl_trait)]
-
-use defmt::unwrap;
-use embassy::blocking_mutex::raw::NoopRawMutex;
-use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError};
-use embassy::executor::Spawner;
-use embassy::time::{Duration, Timer};
-use embassy::util::Forever;
-use embassy_nrf::gpio::{Level, Output, OutputDrive};
-use embassy_nrf::Peripherals;
-
-use defmt_rtt as _; // global logger
-use panic_probe as _;
-
-enum LedState {
- On,
- Off,
-}
-
-static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new();
-
-#[embassy::task(pool_size = 1)]
-async fn my_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) {
- loop {
- let _ = sender.send(LedState::On).await;
- Timer::after(Duration::from_secs(1)).await;
- let _ = sender.send(LedState::Off).await;
- Timer::after(Duration::from_secs(1)).await;
- }
-}
-
-#[embassy::main]
-async fn main(spawner: Spawner, p: Peripherals) {
- let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
-
- let channel = CHANNEL.put(Channel::new());
- let (sender, mut receiver) = mpsc::split(channel);
-
- unwrap!(spawner.spawn(my_task(sender)));
-
- // We could just loop on `receiver.recv()` for simplicity. The code below
- // is optimized to drain the queue as fast as possible in the spirit of
- // handling events as fast as possible. This optimization is benign when in
- // thread mode, but can be useful when interrupts are sending messages
- // with the channel having been created via with_critical_sections.
- loop {
- let maybe_message = match receiver.try_recv() {
- m @ Ok(..) => m.ok(),
- Err(TryRecvError::Empty) => receiver.recv().await,
- Err(TryRecvError::Closed) => break,
- };
- match maybe_message {
- Some(LedState::On) => led.set_high(),
- Some(LedState::Off) => led.set_low(),
- _ => (),
- }
- }
-}
diff --git a/examples/nrf/src/bin/uart_split.rs b/examples/nrf/src/bin/uart_split.rs
index 909429b1..3fde2f0d 100644
--- a/examples/nrf/src/bin/uart_split.rs
+++ b/examples/nrf/src/bin/uart_split.rs
@@ -3,10 +3,9 @@
#![feature(type_alias_impl_trait)]
use defmt::*;
-use embassy::blocking_mutex::raw::NoopRawMutex;
-use embassy::channel::mpsc::{self, Channel, Sender};
+use embassy::blocking_mutex::raw::ThreadModeRawMutex;
+use embassy::channel::channel::Channel;
use embassy::executor::Spawner;
-use embassy::util::Forever;
use embassy_nrf::peripherals::UARTE0;
use embassy_nrf::uarte::UarteRx;
use embassy_nrf::{interrupt, uarte, Peripherals};
@@ -14,7 +13,7 @@ use embassy_nrf::{interrupt, uarte, Peripherals};
use defmt_rtt as _; // global logger
use panic_probe as _;
-static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new();
+static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new();
#[embassy::main]
async fn main(spawner: Spawner, p: Peripherals) {
@@ -26,14 +25,11 @@ async fn main(spawner: Spawner, p: Peripherals) {
let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config);
let (mut tx, rx) = uart.split();
- let c = CHANNEL.put(Channel::new());
- let (s, mut r) = mpsc::split(c);
-
info!("uarte initialized!");
// Spawn a task responsible purely for reading
- unwrap!(spawner.spawn(reader(rx, s)));
+ unwrap!(spawner.spawn(reader(rx)));
// Message must be in SRAM
{
@@ -48,19 +44,18 @@ async fn main(spawner: Spawner, p: Peripherals) {
// back out the buffer we receive from the read
// task.
loop {
- if let Some(buf) = r.recv().await {
- info!("writing...");
- unwrap!(tx.write(&buf).await);
- }
+ let buf = CHANNEL.recv().await;
+ info!("writing...");
+ unwrap!(tx.write(&buf).await);
}
}
#[embassy::task]
-async fn reader(mut rx: UarteRx<'static, UARTE0>, s: Sender<'static, NoopRawMutex, [u8; 8], 1>) {
+async fn reader(mut rx: UarteRx<'static, UARTE0>) {
let mut buf = [0; 8];
loop {
info!("reading...");
unwrap!(rx.read(&mut buf).await);
- unwrap!(s.send(buf).await);
+ CHANNEL.send(buf).await;
}
}
diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs
index 99aab302..06e8eec1 100644
--- a/examples/stm32f3/src/bin/button_events.rs
+++ b/examples/stm32f3/src/bin/button_events.rs
@@ -11,11 +11,10 @@
#![feature(type_alias_impl_trait)]
use defmt::*;
-use embassy::blocking_mutex::raw::NoopRawMutex;
-use embassy::channel::mpsc::{self, Channel, Receiver, Sender};
+use embassy::blocking_mutex::raw::ThreadModeRawMutex;
+use embassy::channel::channel::Channel;
use embassy::executor::Spawner;
use embassy::time::{with_timeout, Duration, Timer};
-use embassy::util::Forever;
use embassy_stm32::exti::ExtiInput;
use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed};
use embassy_stm32::peripherals::PA0;
@@ -51,14 +50,15 @@ impl<'a> Leds<'a> {
}
}
- async fn show(&mut self, queue: &mut Receiver<'static, NoopRawMutex, ButtonEvent, 4>) {
+ async fn show(&mut self) {
self.leds[self.current_led].set_high();
- if let Ok(new_message) = with_timeout(Duration::from_millis(500), queue.recv()).await {
+ if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.recv()).await {
self.leds[self.current_led].set_low();
self.process_event(new_message).await;
} else {
self.leds[self.current_led].set_low();
- if let Ok(new_message) = with_timeout(Duration::from_millis(200), queue.recv()).await {
+ if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.recv()).await
+ {
self.process_event(new_message).await;
}
}
@@ -77,15 +77,18 @@ impl<'a> Leds<'a> {
}
}
- async fn process_event(&mut self, event: Option<ButtonEvent>) {
+ async fn process_event(&mut self, event: ButtonEvent) {
match event {
- Some(ButtonEvent::SingleClick) => self.move_next(),
- Some(ButtonEvent::DoubleClick) => {
+ ButtonEvent::SingleClick => {
+ self.move_next();
+ }
+ ButtonEvent::DoubleClick => {
self.change_direction();
- self.move_next()
+ self.move_next();
+ }
+ ButtonEvent::Hold => {
+ self.flash().await;
}
- Some(ButtonEvent::Hold) => self.flash().await,
- _ => {}
}
}
}
@@ -97,7 +100,7 @@ enum ButtonEvent {
Hold,
}
-static BUTTON_EVENTS_QUEUE: Forever<Channel<NoopRawMutex, ButtonEvent, 4>> = Forever::new();
+static CHANNEL: Channel<ThreadModeRawMutex, ButtonEvent, 4> = Channel::new();
#[embassy::main]
async fn main(spawner: Spawner, p: Peripherals) {
@@ -116,27 +119,19 @@ async fn main(spawner: Spawner, p: Peripherals) {
];
let leds = Leds::new(leds);
- let buttons_queue = BUTTON_EVENTS_QUEUE.put(Channel::new());
- let (sender, receiver) = mpsc::split(buttons_queue);
- spawner.spawn(button_waiter(button, sender)).unwrap();
- spawner.spawn(led_blinker(leds, receiver)).unwrap();
+ spawner.spawn(button_waiter(button)).unwrap();
+ spawner.spawn(led_blinker(leds)).unwrap();
}
#[embassy::task]
-async fn led_blinker(
- mut leds: Leds<'static>,
- mut queue: Receiver<'static, NoopRawMutex, ButtonEvent, 4>,
-) {
+async fn led_blinker(mut leds: Leds<'static>) {
loop {
- leds.show(&mut queue).await;
+ leds.show().await;
}
}
#[embassy::task]
-async fn button_waiter(
- mut button: ExtiInput<'static, PA0>,
- queue: Sender<'static, NoopRawMutex, ButtonEvent, 4>,
-) {
+async fn button_waiter(mut button: ExtiInput<'static, PA0>) {
const DOUBLE_CLICK_DELAY: u64 = 250;
const HOLD_DELAY: u64 = 1000;
@@ -150,9 +145,7 @@ async fn button_waiter(
.is_err()
{
info!("Hold");
- if queue.send(ButtonEvent::Hold).await.is_err() {
- break;
- }
+ CHANNEL.send(ButtonEvent::Hold).await;
button.wait_for_falling_edge().await;
} else if with_timeout(
Duration::from_millis(DOUBLE_CLICK_DELAY),
@@ -161,15 +154,11 @@ async fn button_waiter(
.await
.is_err()
{
- if queue.send(ButtonEvent::SingleClick).await.is_err() {
- break;
- }
info!("Single click");
+ CHANNEL.send(ButtonEvent::SingleClick).await;
} else {
info!("Double click");
- if queue.send(ButtonEvent::DoubleClick).await.is_err() {
- break;
- }
+ CHANNEL.send(ButtonEvent::DoubleClick).await;
button.wait_for_falling_edge().await;
}
button.wait_for_rising_edge().await;
diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs
index ee1763aa..40a7c3e4 100644
--- a/examples/stm32h7/src/bin/usart_split.rs
+++ b/examples/stm32h7/src/bin/usart_split.rs
@@ -4,10 +4,9 @@
use defmt::*;
use defmt_rtt as _; // global logger
-use embassy::blocking_mutex::raw::NoopRawMutex;
-use embassy::channel::mpsc::{self, Channel, Sender};
+use embassy::blocking_mutex::raw::ThreadModeRawMutex;
+use embassy::channel::channel::Channel;
use embassy::executor::Spawner;
-use embassy::util::Forever;
use embassy_stm32::dma::NoDma;
use embassy_stm32::{
peripherals::{DMA1_CH1, UART7},
@@ -28,7 +27,7 @@ async fn writer(mut usart: Uart<'static, UART7, NoDma, NoDma>) {
}
}
-static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new();
+static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new();
#[embassy::main]
async fn main(spawner: Spawner, p: Peripherals) -> ! {
@@ -40,28 +39,21 @@ async fn main(spawner: Spawner, p: Peripherals) -> ! {
let (mut tx, rx) = usart.split();
- let c = CHANNEL.put(Channel::new());
- let (s, mut r) = mpsc::split(c);
-
- unwrap!(spawner.spawn(reader(rx, s)));
+ unwrap!(spawner.spawn(reader(rx)));
loop {
- if let Some(buf) = r.recv().await {
- info!("writing...");
- unwrap!(tx.write(&buf).await);
- }
+ let buf = CHANNEL.recv().await;
+ info!("writing...");
+ unwrap!(tx.write(&buf).await);
}
}
#[embassy::task]
-async fn reader(
- mut rx: UartRx<'static, UART7, DMA1_CH1>,
- s: Sender<'static, NoopRawMutex, [u8; 8], 1>,
-) {
+async fn reader(mut rx: UartRx<'static, UART7, DMA1_CH1>) {
let mut buf = [0; 8];
loop {
info!("reading...");
unwrap!(rx.read(&mut buf).await);
- unwrap!(s.send(buf).await);
+ CHANNEL.send(buf).await;
}
}