diff options
author | Mathias <mk@blackbird.online> | 2022-09-09 10:36:27 +0200 |
---|---|---|
committer | Dario Nieuwenhuis <dirbaio@dirbaio.net> | 2022-09-26 20:34:55 +0200 |
commit | f2239d34cc26bb147d136d312b8b6af1020d4e0f (patch) | |
tree | 26a271e379520926e4acb2f14ebf4f4fc103b880 | |
parent | ee76831f93e792757bf43136be712c343c4d5336 (diff) | |
download | embassy-f2239d34cc26bb147d136d312b8b6af1020d4e0f.zip |
Add bufferedUart, including a split version for only Rx or Tx
-rw-r--r-- | embassy-rp/src/uart/buffered.rs | 369 | ||||
-rw-r--r-- | embassy-rp/src/uart/mod.rs | 2 |
2 files changed, 310 insertions, 61 deletions
diff --git a/embassy-rp/src/uart/buffered.rs b/embassy-rp/src/uart/buffered.rs index c31af801..3eb96e3d 100644 --- a/embassy-rp/src/uart/buffered.rs +++ b/embassy-rp/src/uart/buffered.rs @@ -9,31 +9,70 @@ use futures::future::poll_fn; use super::*; -pub struct State<'d, T: Instance>(StateStorage<StateInner<'d, T>>); +pub struct State<'d, T: Instance>(StateStorage<FullStateInner<'d, T>>); impl<'d, T: Instance> State<'d, T> { - pub fn new() -> Self { + pub const fn new() -> Self { Self(StateStorage::new()) } } -struct StateInner<'d, T: Instance> { +pub struct RxState<'d, T: Instance>(StateStorage<RxStateInner<'d, T>>); +impl<'d, T: Instance> RxState<'d, T> { + pub const fn new() -> Self { + Self(StateStorage::new()) + } +} + +pub struct TxState<'d, T: Instance>(StateStorage<TxStateInner<'d, T>>); +impl<'d, T: Instance> TxState<'d, T> { + pub const fn new() -> Self { + Self(StateStorage::new()) + } +} + +struct RxStateInner<'d, T: Instance> { + phantom: PhantomData<&'d mut T>, + + waker: WakerRegistration, + buf: RingBuffer<'d>, +} + +struct TxStateInner<'d, T: Instance> { phantom: PhantomData<&'d mut T>, - rx_waker: WakerRegistration, - rx: RingBuffer<'d>, + waker: WakerRegistration, + buf: RingBuffer<'d>, +} - tx_waker: WakerRegistration, - tx: RingBuffer<'d>, +struct FullStateInner<'d, T: Instance> { + rx: RxStateInner<'d, T>, + tx: TxStateInner<'d, T>, } -unsafe impl<'d, T: Instance> Send for StateInner<'d, T> {} -unsafe impl<'d, T: Instance> Sync for StateInner<'d, T> {} +unsafe impl<'d, T: Instance> Send for RxStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for RxStateInner<'d, T> {} + +unsafe impl<'d, T: Instance> Send for TxStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for TxStateInner<'d, T> {} + +unsafe impl<'d, T: Instance> Send for FullStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for FullStateInner<'d, T> {} pub struct BufferedUart<'d, T: Instance> { - inner: PeripheralMutex<'d, StateInner<'d, T>>, + inner: PeripheralMutex<'d, FullStateInner<'d, T>>, +} + +pub struct RxBufferedUart<'d, T: Instance> { + inner: PeripheralMutex<'d, RxStateInner<'d, T>>, +} + +pub struct TxBufferedUart<'d, T: Instance> { + inner: PeripheralMutex<'d, TxStateInner<'d, T>>, } impl<'d, T: Instance> Unpin for BufferedUart<'d, T> {} +impl<'d, T: Instance> Unpin for RxBufferedUart<'d, T> {} +impl<'d, T: Instance> Unpin for TxBufferedUart<'d, T> {} impl<'d, T: Instance> BufferedUart<'d, T> { pub fn new<M: Mode>( @@ -55,66 +94,158 @@ impl<'d, T: Instance> BufferedUart<'d, T> { } Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || StateInner { + inner: PeripheralMutex::new(irq, &mut state.0, move || FullStateInner { + tx: TxStateInner { + phantom: PhantomData, + waker: WakerRegistration::new(), + buf: RingBuffer::new(tx_buffer), + }, + rx: RxStateInner { + phantom: PhantomData, + waker: WakerRegistration::new(), + buf: RingBuffer::new(rx_buffer), + }, + }), + } + } +} + +impl<'d, T: Instance> RxBufferedUart<'d, T> { + pub fn new<M: Mode>( + state: &'d mut RxState<'d, T>, + _uart: UartRx<'d, T, M>, + irq: impl Peripheral<P = T::Interrupt> + 'd, + rx_buffer: &'d mut [u8], + ) -> RxBufferedUart<'d, T> { + into_ref!(irq); + + let r = T::regs(); + unsafe { + r.uartimsc().modify(|w| { + // TODO: Should and more or fewer interrupts be enabled? + w.set_rxim(true); + w.set_rtim(true); + }); + } + + Self { + inner: PeripheralMutex::new(irq, &mut state.0, move || RxStateInner { + phantom: PhantomData, + + buf: RingBuffer::new(rx_buffer), + waker: WakerRegistration::new(), + }), + } + } +} + +impl<'d, T: Instance> TxBufferedUart<'d, T> { + pub fn new<M: Mode>( + state: &'d mut TxState<'d, T>, + _uart: UartTx<'d, T, M>, + irq: impl Peripheral<P = T::Interrupt> + 'd, + tx_buffer: &'d mut [u8], + ) -> TxBufferedUart<'d, T> { + into_ref!(irq); + + let r = T::regs(); + unsafe { + r.uartimsc().modify(|w| { + // TODO: Should and more or fewer interrupts be enabled? + w.set_rxim(true); + w.set_rtim(true); + }); + } + + Self { + inner: PeripheralMutex::new(irq, &mut state.0, move || TxStateInner { phantom: PhantomData, - tx: RingBuffer::new(tx_buffer), - tx_waker: WakerRegistration::new(), - rx: RingBuffer::new(rx_buffer), - rx_waker: WakerRegistration::new(), + buf: RingBuffer::new(tx_buffer), + waker: WakerRegistration::new(), }), } } } -impl<'d, T: Instance> StateInner<'d, T> +impl<'d, T: Instance> PeripheralState for FullStateInner<'d, T> where Self: 'd, { - fn on_rx(&mut self) { + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { + self.rx.on_interrupt(); + self.tx.on_interrupt(); + } +} + +impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T> +where + Self: 'd, +{ + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { let r = T::regs(); unsafe { - let ris = r.uartris().read(); + let ris = r.uartmis().read(); // Clear interrupt flags - r.uarticr().write(|w| { + r.uarticr().modify(|w| { w.set_rxic(true); w.set_rtic(true); }); - if ris.rxris() { - if ris.peris() { + if ris.rxmis() { + if ris.pemis() { warn!("Parity error"); + r.uarticr().modify(|w| { + w.set_peic(true); + }); } - if ris.feris() { + if ris.femis() { warn!("Framing error"); + r.uarticr().modify(|w| { + w.set_feic(true); + }); } - if ris.beris() { + if ris.bemis() { warn!("Break error"); + r.uarticr().modify(|w| { + w.set_beic(true); + }); } - if ris.oeris() { + if ris.oemis() { warn!("Overrun error"); + r.uarticr().modify(|w| { + w.set_oeic(true); + }); } - let buf = self.rx.push_buf(); + let buf = self.buf.push_buf(); if !buf.is_empty() { buf[0] = r.uartdr().read().data(); - self.rx.push(1); + self.buf.push(1); } else { warn!("RX buffer full, discard received byte"); } - if self.rx.is_full() { - self.rx_waker.wake(); + if self.buf.is_full() { + self.waker.wake(); } } - if ris.rtris() { - self.rx_waker.wake(); + if ris.rtmis() { + self.waker.wake(); }; } } +} - fn on_tx(&mut self) { +impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T> +where + Self: 'd, +{ + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { let r = T::regs(); unsafe { let ris = r.uartris().read(); @@ -124,14 +255,14 @@ where }); if ris.txris() { - let buf = self.tx.pop_buf(); + let buf = self.buf.pop_buf(); if !buf.is_empty() { r.uartimsc().modify(|w| { w.set_txim(true); }); r.uartdr().write(|w| w.set_data(buf[0].into())); - self.tx.pop(1); - self.tx_waker.wake(); + self.buf.pop(1); + self.waker.wake(); } else { // Disable interrupt until we have something to transmit again r.uartimsc().modify(|w| { @@ -143,17 +274,6 @@ where } } -impl<'d, T: Instance> PeripheralState for StateInner<'d, T> -where - Self: 'd, -{ - type Interrupt = T::Interrupt; - fn on_interrupt(&mut self) { - self.on_rx(); - self.on_tx(); - } -} - impl embedded_io::Error for Error { fn kind(&self) -> embedded_io::ErrorKind { embedded_io::ErrorKind::Other @@ -164,8 +284,54 @@ impl<'d, T: Instance> embedded_io::Io for BufferedUart<'d, T> { type Error = Error; } +impl<'d, T: Instance> embedded_io::Io for RxBufferedUart<'d, T> { + type Error = Error; +} + +impl<'d, T: Instance> embedded_io::Io for TxBufferedUart<'d, T> { + type Error = Error; +} + impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> { - type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> + type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + poll_fn(move |cx| { + let mut do_pend = false; + let res = self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + + // We have data ready in buffer? Return it. + let data = state.rx.buf.pop_buf(); + if !data.is_empty() { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + + if state.rx.buf.is_full() { + do_pend = true; + } + state.rx.buf.pop(len); + + return Poll::Ready(Ok(len)); + } + + state.rx.waker.register(cx.waker()); + Poll::Pending + }); + + if do_pend { + self.inner.pend(); + } + + res + }) + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Read for RxBufferedUart<'d, T> { + type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> where Self: 'a; @@ -176,20 +342,20 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> { compiler_fence(Ordering::SeqCst); // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); + let data = state.buf.pop_buf(); if !data.is_empty() { let len = data.len().min(buf.len()); buf[..len].copy_from_slice(&data[..len]); - if state.rx.is_full() { + if state.buf.is_full() { do_pend = true; } - state.rx.pop(len); + state.buf.pop(len); return Poll::Ready(Ok(len)); } - state.rx_waker.register(cx.waker()); + state.waker.register(cx.waker()); Poll::Pending }); @@ -213,7 +379,44 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> compiler_fence(Ordering::SeqCst); // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); + let buf = state.rx.buf.pop_buf(); + if !buf.is_empty() { + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + state.rx.waker.register(cx.waker()); + Poll::<Result<&[u8], Self::Error>>::Pending + }) + }) + } + + fn consume(&mut self, amt: usize) { + let signal = self.inner.with(|state| { + let full = state.rx.buf.is_full(); + state.rx.buf.pop(amt); + full + }); + if signal { + self.inner.pend(); + } + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for RxBufferedUart<'d, T> { + type FillBufFuture<'a> = impl Future<Output = Result<&'a [u8], Self::Error>> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + poll_fn(move |cx| { + self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + + // We have data ready in buffer? Return it. + let buf = state.buf.pop_buf(); if !buf.is_empty() { let buf: &[u8] = buf; // Safety: buffer lives as long as uart @@ -221,7 +424,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> return Poll::Ready(Ok(buf)); } - state.rx_waker.register(cx.waker()); + state.waker.register(cx.waker()); Poll::<Result<&[u8], Self::Error>>::Pending }) }) @@ -229,8 +432,8 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> fn consume(&mut self, amt: usize) { let signal = self.inner.with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); + let full = state.buf.is_full(); + state.buf.pop(amt); full }); if signal { @@ -247,16 +450,62 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { poll_fn(move |cx| { let (poll, empty) = self.inner.with(|state| { - let empty = state.tx.is_empty(); - let tx_buf = state.tx.push_buf(); + let empty = state.tx.buf.is_empty(); + let tx_buf = state.tx.buf.push_buf(); + if tx_buf.is_empty() { + state.tx.waker.register(cx.waker()); + return (Poll::Pending, empty); + } + + let n = core::cmp::min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.buf.push(n); + + (Poll::Ready(Ok(n)), empty) + }); + if empty { + self.inner.pend(); + } + poll + }) + } + + type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + poll_fn(move |cx| { + self.inner.with(|state| { + if !state.tx.buf.is_empty() { + state.tx.waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Write for TxBufferedUart<'d, T> { + type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + poll_fn(move |cx| { + let (poll, empty) = self.inner.with(|state| { + let empty = state.buf.is_empty(); + let tx_buf = state.buf.push_buf(); if tx_buf.is_empty() { - state.tx_waker.register(cx.waker()); + state.waker.register(cx.waker()); return (Poll::Pending, empty); } let n = core::cmp::min(tx_buf.len(), buf.len()); tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); + state.buf.push(n); (Poll::Ready(Ok(n)), empty) }); @@ -274,8 +523,8 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { poll_fn(move |cx| { self.inner.with(|state| { - if !state.tx.is_empty() { - state.tx_waker.register(cx.waker()); + if !state.buf.is_empty() { + state.waker.register(cx.waker()); return Poll::Pending; } diff --git a/embassy-rp/src/uart/mod.rs b/embassy-rp/src/uart/mod.rs index 3b71d87b..67e24b60 100644 --- a/embassy-rp/src/uart/mod.rs +++ b/embassy-rp/src/uart/mod.rs @@ -343,7 +343,7 @@ impl<'d, T: Instance, M: Mode> Uart<'d, T, M> { w.set_stp2(config.stop_bits == StopBits::STOP2); w.set_pen(pen); w.set_eps(eps); - w.set_fen(true); + w.set_fen(false); }); r.uartcr().write(|w| { |