summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <dirbaio@dirbaio.net>2022-08-03 13:55:46 +0200
committerDario Nieuwenhuis <dirbaio@dirbaio.net>2022-08-03 13:55:46 +0200
commit3967c4194b1f28d2179fd30ec7fa688bcf23ab1b (patch)
tree015391fb93a3e96eeb0b09ce259506684638b5a9
parent1924f2d67d32a4466e71ef0aabc84305a9e8e165 (diff)
downloadembassy-3967c4194b1f28d2179fd30ec7fa688bcf23ab1b.zip
util: add pipe
-rw-r--r--embassy-util/src/lib.rs4
-rw-r--r--embassy-util/src/pipe.rs413
-rw-r--r--embassy-util/src/ring_buffer.rs146
3 files changed, 563 insertions, 0 deletions
diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs
index 07b1633e..a65ebd51 100644
--- a/embassy-util/src/lib.rs
+++ b/embassy-util/src/lib.rs
@@ -8,9 +8,13 @@
// This mod MUST go first, so that the others see its macros.
pub(crate) mod fmt;
+// internal use
+mod ring_buffer;
+
pub mod blocking_mutex;
pub mod channel;
pub mod mutex;
+pub mod pipe;
pub mod waitqueue;
mod forever;
diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs
new file mode 100644
index 00000000..e4f21732
--- /dev/null
+++ b/embassy-util/src/pipe.rs
@@ -0,0 +1,413 @@
+//! Async byte stream pipe.
+
+use core::cell::RefCell;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+use crate::blocking_mutex::raw::RawMutex;
+use crate::blocking_mutex::Mutex;
+use crate::ring_buffer::RingBuffer;
+use crate::waitqueue::WakerRegistration;
+
+/// Write-only access to a [`Pipe`].
+#[derive(Copy)]
+pub struct Writer<'p, M, const N: usize>
+where
+ M: RawMutex,
+{
+ pipe: &'p Pipe<M, N>,
+}
+
+impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
+where
+ M: RawMutex,
+{
+ fn clone(&self) -> Self {
+ Writer { pipe: self.pipe }
+ }
+}
+
+impl<'p, M, const N: usize> Writer<'p, M, N>
+where
+ M: RawMutex,
+{
+ /// Writes a value.
+ ///
+ /// See [`Pipe::write()`]
+ pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
+ self.pipe.write(buf)
+ }
+
+ /// Attempt to immediately write a message.
+ ///
+ /// See [`Pipe::write()`]
+ pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
+ self.pipe.try_write(buf)
+ }
+}
+
+/// Future returned by [`Pipe::write`] and [`Writer::write`].
+pub struct WriteFuture<'p, M, const N: usize>
+where
+ M: RawMutex,
+{
+ pipe: &'p Pipe<M, N>,
+ buf: &'p [u8],
+}
+
+impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
+where
+ M: RawMutex,
+{
+ type Output = usize;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.pipe.try_write_with_context(Some(cx), self.buf) {
+ Ok(n) => Poll::Ready(n),
+ Err(TryWriteError::Full) => Poll::Pending,
+ }
+ }
+}
+
+impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
+
+/// Read-only access to a [`Pipe`].
+#[derive(Copy)]
+pub struct Reader<'p, M, const N: usize>
+where
+ M: RawMutex,
+{
+ pipe: &'p Pipe<M, N>,
+}
+
+impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
+where
+ M: RawMutex,
+{
+ fn clone(&self) -> Self {
+ Reader { pipe: self.pipe }
+ }
+}
+
+impl<'p, M, const N: usize> Reader<'p, M, N>
+where
+ M: RawMutex,
+{
+ /// Reads a value.
+ ///
+ /// See [`Pipe::read()`]
+ pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
+ self.pipe.read(buf)
+ }
+
+ /// Attempt to immediately read a message.
+ ///
+ /// See [`Pipe::read()`]
+ pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
+ self.pipe.try_read(buf)
+ }
+}
+
+/// Future returned by [`Pipe::read`] and [`Reader::read`].
+pub struct ReadFuture<'p, M, const N: usize>
+where
+ M: RawMutex,
+{
+ pipe: &'p Pipe<M, N>,
+ buf: &'p mut [u8],
+}
+
+impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
+where
+ M: RawMutex,
+{
+ type Output = usize;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.pipe.try_read_with_context(Some(cx), self.buf) {
+ Ok(n) => Poll::Ready(n),
+ Err(TryReadError::Empty) => Poll::Pending,
+ }
+ }
+}
+
+impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
+
+/// Error returned by [`try_read`](Pipe::try_read).
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum TryReadError {
+ /// No data could be read from the pipe because it is currently
+ /// empty, and reading would require blocking.
+ Empty,
+}
+
+/// Error returned by [`try_write`](Pipe::try_write).
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum TryWriteError {
+ /// No data could be written to the pipe because it is
+ /// currently full, and writing would require blocking.
+ Full,
+}
+
+struct PipeState<const N: usize> {
+ buffer: RingBuffer<N>,
+ read_waker: WakerRegistration,
+ write_waker: WakerRegistration,
+}
+
+impl<const N: usize> PipeState<N> {
+ const fn new() -> Self {
+ PipeState {
+ buffer: RingBuffer::new(),
+ read_waker: WakerRegistration::new(),
+ write_waker: WakerRegistration::new(),
+ }
+ }
+
+ fn clear(&mut self) {
+ self.buffer.clear();
+ self.write_waker.wake();
+ }
+
+ fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
+ self.try_read_with_context(None, buf)
+ }
+
+ fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
+ if self.buffer.is_full() {
+ self.write_waker.wake();
+ }
+
+ let available = self.buffer.pop_buf();
+ if available.is_empty() {
+ if let Some(cx) = cx {
+ self.read_waker.register(cx.waker());
+ }
+ return Err(TryReadError::Empty);
+ }
+
+ let n = available.len().min(buf.len());
+ buf[..n].copy_from_slice(&available[..n]);
+ self.buffer.pop(n);
+ Ok(n)
+ }
+
+ fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
+ self.try_write_with_context(None, buf)
+ }
+
+ fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
+ if self.buffer.is_empty() {
+ self.read_waker.wake();
+ }
+
+ let available = self.buffer.push_buf();
+ if available.is_empty() {
+ if let Some(cx) = cx {
+ self.write_waker.register(cx.waker());
+ }
+ return Err(TryWriteError::Full);
+ }
+
+ let n = available.len().min(buf.len());
+ available[..n].copy_from_slice(&buf[..n]);
+ self.buffer.push(n);
+ Ok(n)
+ }
+}
+
+/// A bounded pipe for communicating between asynchronous tasks
+/// with backpressure.
+///
+/// The pipe will buffer up to the provided number of messages. Once the
+/// buffer is full, attempts to `write` new messages will wait until a message is
+/// read from the pipe.
+///
+/// All data written will become available in the same order as it was written.
+pub struct Pipe<M, const N: usize>
+where
+ M: RawMutex,
+{
+ inner: Mutex<M, RefCell<PipeState<N>>>,
+}
+
+impl<M, const N: usize> Pipe<M, N>
+where
+ M: RawMutex,
+{
+ /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
+ ///
+ /// ```
+ /// use embassy_util::pipe::Pipe;
+ /// use embassy_util::blocking_mutex::raw::NoopRawMutex;
+ ///
+ /// // Declare a bounded pipe, with a buffer of 256 bytes.
+ /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
+ /// ```
+ pub const fn new() -> Self {
+ Self {
+ inner: Mutex::new(RefCell::new(PipeState::new())),
+ }
+ }
+
+ fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
+ self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
+ }
+
+ fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
+ self.lock(|c| c.try_read_with_context(cx, buf))
+ }
+
+ fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
+ self.lock(|c| c.try_write_with_context(cx, buf))
+ }
+
+ /// Get a writer for this pipe.
+ pub fn writer(&self) -> Writer<'_, M, N> {
+ Writer { pipe: self }
+ }
+
+ /// Get a reader for this pipe.
+ pub fn reader(&self) -> Reader<'_, M, N> {
+ Reader { pipe: self }
+ }
+
+ /// Write a value, waiting until there is capacity.
+ ///
+ /// Writeing completes when the value has been pushed to the pipe's queue.
+ /// This doesn't mean the value has been read yet.
+ pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
+ WriteFuture { pipe: self, buf }
+ }
+
+ /// Attempt to immediately write a message.
+ ///
+ /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
+ /// buffer is full, instead of waiting.
+ ///
+ /// # Errors
+ ///
+ /// If the pipe capacity has been reached, i.e., the pipe has `n`
+ /// buffered values where `n` is the argument passed to [`Pipe`], then an
+ /// error is returned.
+ pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
+ self.lock(|c| c.try_write(buf))
+ }
+
+ /// Receive the next value.
+ ///
+ /// If there are no messages in the pipe's buffer, this method will
+ /// wait until a message is written.
+ pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
+ ReadFuture { pipe: self, buf }
+ }
+
+ /// Attempt to immediately read a message.
+ ///
+ /// This method will either read a message from the pipe immediately or return an error
+ /// if the pipe is empty.
+ pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
+ self.lock(|c| c.try_read(buf))
+ }
+
+ /// Clear the data in the pipe's buffer.
+ pub fn clear(&self) {
+ self.lock(|c| c.clear())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures_executor::ThreadPool;
+ use futures_util::task::SpawnExt;
+
+ use super::*;
+ use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
+ use crate::Forever;
+
+ fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
+ N - c.buffer.len()
+ }
+
+ #[test]
+ fn writing_once() {
+ let mut c = PipeState::<3>::new();
+ assert!(c.try_write(&[1]).is_ok());
+ assert_eq!(capacity(&c), 2);
+ }
+
+ #[test]
+ fn writing_when_full() {
+ let mut c = PipeState::<3>::new();
+ assert_eq!(c.try_write(&[42]), Ok(1));
+ assert_eq!(c.try_write(&[43]), Ok(1));
+ assert_eq!(c.try_write(&[44]), Ok(1));
+ assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
+ assert_eq!(capacity(&c), 0);
+ }
+
+ #[test]
+ fn receiving_once_with_one_send() {
+ let mut c = PipeState::<3>::new();
+ assert!(c.try_write(&[42]).is_ok());
+ let mut buf = [0; 16];
+ assert_eq!(c.try_read(&mut buf), Ok(1));
+ assert_eq!(buf[0], 42);
+ assert_eq!(capacity(&c), 3);
+ }
+
+ #[test]
+ fn receiving_when_empty() {
+ let mut c = PipeState::<3>::new();
+ let mut buf = [0; 16];
+ assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
+ assert_eq!(capacity(&c), 3);
+ }
+
+ #[test]
+ fn simple_send_and_receive() {
+ let c = Pipe::<NoopRawMutex, 3>::new();
+ assert!(c.try_write(&[42]).is_ok());
+ let mut buf = [0; 16];
+ assert_eq!(c.try_read(&mut buf), Ok(1));
+ assert_eq!(buf[0], 42);
+ }
+
+ #[test]
+ fn cloning() {
+ let c = Pipe::<NoopRawMutex, 3>::new();
+ let r1 = c.reader();
+ let w1 = c.writer();
+
+ let _ = r1.clone();
+ let _ = w1.clone();
+ }
+
+ #[futures_test::test]
+ async fn receiver_receives_given_try_write_async() {
+ let executor = ThreadPool::new().unwrap();
+
+ static CHANNEL: Forever<Pipe<CriticalSectionRawMutex, 3>> = Forever::new();
+ let c = &*CHANNEL.put(Pipe::new());
+ let c2 = c;
+ let f = async move {
+ assert_eq!(c2.try_write(&[42]), Ok(1));
+ };
+ executor.spawn(f).unwrap();
+ let mut buf = [0; 16];
+ assert_eq!(c.read(&mut buf).await, 1);
+ assert_eq!(buf[0], 42);
+ }
+
+ #[futures_test::test]
+ async fn sender_send_completes_if_capacity() {
+ let c = Pipe::<CriticalSectionRawMutex, 1>::new();
+ c.write(&[42]).await;
+ let mut buf = [0; 16];
+ assert_eq!(c.read(&mut buf).await, 1);
+ assert_eq!(buf[0], 42);
+ }
+}
diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs
new file mode 100644
index 00000000..52108402
--- /dev/null
+++ b/embassy-util/src/ring_buffer.rs
@@ -0,0 +1,146 @@
+pub struct RingBuffer<const N: usize> {
+ buf: [u8; N],
+ start: usize,
+ end: usize,
+ empty: bool,
+}
+
+impl<const N: usize> RingBuffer<N> {
+ pub const fn new() -> Self {
+ Self {
+ buf: [0; N],
+ start: 0,
+ end: 0,
+ empty: true,
+ }
+ }
+
+ pub fn push_buf(&mut self) -> &mut [u8] {
+ if self.start == self.end && !self.empty {
+ trace!(" ringbuf: push_buf empty");
+ return &mut self.buf[..0];
+ }
+
+ let n = if self.start <= self.end {
+ self.buf.len() - self.end
+ } else {
+ self.start - self.end
+ };
+
+ trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
+ &mut self.buf[self.end..self.end + n]
+ }
+
+ pub fn push(&mut self, n: usize) {
+ trace!(" ringbuf: push {:?}", n);
+ if n == 0 {
+ return;
+ }
+
+ self.end = self.wrap(self.end + n);
+ self.empty = false;
+ }
+
+ pub fn pop_buf(&mut self) -> &mut [u8] {
+ if self.empty {
+ trace!(" ringbuf: pop_buf empty");
+ return &mut self.buf[..0];
+ }
+
+ let n = if self.end <= self.start {
+ self.buf.len() - self.start
+ } else {
+ self.end - self.start
+ };
+
+ trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
+ &mut self.buf[self.start..self.start + n]
+ }
+
+ pub fn pop(&mut self, n: usize) {
+ trace!(" ringbuf: pop {:?}", n);
+ if n == 0 {
+ return;
+ }
+
+ self.start = self.wrap(self.start + n);
+ self.empty = self.start == self.end;
+ }
+
+ pub fn is_full(&self) -> bool {
+ self.start == self.end && !self.empty
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.empty
+ }
+
+ #[allow(unused)]
+ pub fn len(&self) -> usize {
+ if self.empty {
+ 0
+ } else if self.start < self.end {
+ self.end - self.start
+ } else {
+ N + self.end - self.start
+ }
+ }
+
+ pub fn clear(&mut self) {
+ self.start = 0;
+ self.end = 0;
+ self.empty = true;
+ }
+
+ fn wrap(&self, n: usize) -> usize {
+ assert!(n <= self.buf.len());
+ if n == self.buf.len() {
+ 0
+ } else {
+ n
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn push_pop() {
+ let mut rb: RingBuffer<4> = RingBuffer::new();
+ let buf = rb.push_buf();
+ assert_eq!(4, buf.len());
+ buf[0] = 1;
+ buf[1] = 2;
+ buf[2] = 3;
+ buf[3] = 4;
+ rb.push(4);
+
+ let buf = rb.pop_buf();
+ assert_eq!(4, buf.len());
+ assert_eq!(1, buf[0]);
+ rb.pop(1);
+
+ let buf = rb.pop_buf();
+ assert_eq!(3, buf.len());
+ assert_eq!(2, buf[0]);
+ rb.pop(1);
+
+ let buf = rb.pop_buf();
+ assert_eq!(2, buf.len());
+ assert_eq!(3, buf[0]);
+ rb.pop(1);
+
+ let buf = rb.pop_buf();
+ assert_eq!(1, buf.len());
+ assert_eq!(4, buf[0]);
+ rb.pop(1);
+
+ let buf = rb.pop_buf();
+ assert_eq!(0, buf.len());
+
+ let buf = rb.push_buf();
+ assert_eq!(4, buf.len());
+ }
+}