summaryrefslogtreecommitdiff
path: root/embassy-rp
diff options
context:
space:
mode:
authorMathias <mk@blackbird.online>2022-09-21 06:00:35 +0200
committerDario Nieuwenhuis <dirbaio@dirbaio.net>2022-09-26 20:34:55 +0200
commitb3dfd06dd6da3369813cf469a7fcd87c22047e87 (patch)
treeeec3df7a6627ddddd8a12b4f52f074fa30a92e65 /embassy-rp
parent1db9e464ff13a05d1267bb33fde490bcce35af5a (diff)
downloadembassy-b3dfd06dd6da3369813cf469a7fcd87c22047e87.zip
Remove code-duplication in async bufferedUart implementations
Diffstat (limited to 'embassy-rp')
-rw-r--r--embassy-rp/src/uart/buffered.rs215
1 files changed, 89 insertions, 126 deletions
diff --git a/embassy-rp/src/uart/buffered.rs b/embassy-rp/src/uart/buffered.rs
index 3eb96e3d..6d395b6f 100644
--- a/embassy-rp/src/uart/buffered.rs
+++ b/embassy-rp/src/uart/buffered.rs
@@ -1,5 +1,5 @@
use core::future::Future;
-use core::task::Poll;
+use core::task::{Poll, Waker};
use atomic_polyfill::{compiler_fence, Ordering};
use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage};
@@ -87,9 +87,9 @@ impl<'d, T: Instance> BufferedUart<'d, T> {
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
- // TODO: Should and more or fewer interrupts be enabled?
w.set_rxim(true);
w.set_rtim(true);
+ w.set_txim(true);
});
}
@@ -122,7 +122,6 @@ impl<'d, T: Instance> RxBufferedUart<'d, T> {
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
- // TODO: Should and more or fewer interrupts be enabled?
w.set_rxim(true);
w.set_rtim(true);
});
@@ -151,9 +150,7 @@ impl<'d, T: Instance> TxBufferedUart<'d, T> {
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
- // TODO: Should and more or fewer interrupts be enabled?
- w.set_rxim(true);
- w.set_rtim(true);
+ w.set_txim(true);
});
}
@@ -179,6 +176,51 @@ where
}
}
+impl<'d, T: Instance> RxStateInner<'d, T>
+where
+ Self: 'd,
+{
+ fn read(&mut self, buf: &mut [u8], waker: &Waker) -> (Poll<Result<usize, Error>>, bool) {
+ // We have data ready in buffer? Return it.
+ let mut do_pend = false;
+ let data = self.buf.pop_buf();
+ if !data.is_empty() {
+ let len = data.len().min(buf.len());
+ buf[..len].copy_from_slice(&data[..len]);
+
+ if self.buf.is_full() {
+ do_pend = true;
+ }
+ self.buf.pop(len);
+
+ return (Poll::Ready(Ok(len)), do_pend);
+ }
+
+ self.waker.register(waker);
+ (Poll::Pending, do_pend)
+ }
+
+ fn fill_buf<'a>(&mut self, waker: &Waker) -> Poll<Result<&'a [u8], Error>> {
+ // We have data ready in buffer? Return it.
+ let buf = self.buf.pop_buf();
+ if !buf.is_empty() {
+ let buf: &[u8] = buf;
+ // Safety: buffer lives as long as uart
+ let buf: &[u8] = unsafe { core::mem::transmute(buf) };
+ return Poll::Ready(Ok(buf));
+ }
+
+ self.waker.register(waker);
+ Poll::Pending
+ }
+
+ fn consume(&mut self, amt: usize) -> bool {
+ let full = self.buf.is_full();
+ self.buf.pop(amt);
+ full
+ }
+}
+
impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T>
where
Self: 'd,
@@ -240,6 +282,35 @@ where
}
}
+impl<'d, T: Instance> TxStateInner<'d, T>
+where
+ Self: 'd,
+{
+ fn write(&mut self, buf: &[u8], waker: &Waker) -> (Poll<Result<usize, Error>>, bool) {
+ let empty = self.buf.is_empty();
+ let tx_buf = self.buf.push_buf();
+ if tx_buf.is_empty() {
+ self.waker.register(waker);
+ return (Poll::Pending, empty);
+ }
+
+ let n = core::cmp::min(tx_buf.len(), buf.len());
+ tx_buf[..n].copy_from_slice(&buf[..n]);
+ self.buf.push(n);
+
+ (Poll::Ready(Ok(n)), empty)
+ }
+
+ fn flush(&mut self, waker: &Waker) -> Poll<Result<(), Error>> {
+ if !self.buf.is_empty() {
+ self.waker.register(waker);
+ return Poll::Pending;
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T>
where
Self: 'd,
@@ -299,26 +370,9 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
poll_fn(move |cx| {
- let mut do_pend = false;
- let res = self.inner.with(|state| {
+ let (res, do_pend) = self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
-
- // We have data ready in buffer? Return it.
- let data = state.rx.buf.pop_buf();
- if !data.is_empty() {
- let len = data.len().min(buf.len());
- buf[..len].copy_from_slice(&data[..len]);
-
- if state.rx.buf.is_full() {
- do_pend = true;
- }
- state.rx.buf.pop(len);
-
- return Poll::Ready(Ok(len));
- }
-
- state.rx.waker.register(cx.waker());
- Poll::Pending
+ state.rx.read(buf, cx.waker())
});
if do_pend {
@@ -337,26 +391,9 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Read for RxBufferedUart<'d, T> {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
poll_fn(move |cx| {
- let mut do_pend = false;
- let res = self.inner.with(|state| {
+ let (res, do_pend) = self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
-
- // We have data ready in buffer? Return it.
- let data = state.buf.pop_buf();
- if !data.is_empty() {
- let len = data.len().min(buf.len());
- buf[..len].copy_from_slice(&data[..len]);
-
- if state.buf.is_full() {
- do_pend = true;
- }
- state.buf.pop(len);
-
- return Poll::Ready(Ok(len));
- }
-
- state.waker.register(cx.waker());
- Poll::Pending
+ state.read(buf, cx.waker())
});
if do_pend {
@@ -377,28 +414,13 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T>
poll_fn(move |cx| {
self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
-
- // We have data ready in buffer? Return it.
- let buf = state.rx.buf.pop_buf();
- if !buf.is_empty() {
- let buf: &[u8] = buf;
- // Safety: buffer lives as long as uart
- let buf: &[u8] = unsafe { core::mem::transmute(buf) };
- return Poll::Ready(Ok(buf));
- }
-
- state.rx.waker.register(cx.waker());
- Poll::<Result<&[u8], Self::Error>>::Pending
+ state.rx.fill_buf(cx.waker())
})
})
}
fn consume(&mut self, amt: usize) {
- let signal = self.inner.with(|state| {
- let full = state.rx.buf.is_full();
- state.rx.buf.pop(amt);
- full
- });
+ let signal = self.inner.with(|state| state.rx.consume(amt));
if signal {
self.inner.pend();
}
@@ -414,28 +436,13 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for RxBufferedUart<'d, T
poll_fn(move |cx| {
self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
-
- // We have data ready in buffer? Return it.
- let buf = state.buf.pop_buf();
- if !buf.is_empty() {
- let buf: &[u8] = buf;
- // Safety: buffer lives as long as uart
- let buf: &[u8] = unsafe { core::mem::transmute(buf) };
- return Poll::Ready(Ok(buf));
- }
-
- state.waker.register(cx.waker());
- Poll::<Result<&[u8], Self::Error>>::Pending
+ state.fill_buf(cx.waker())
})
})
}
fn consume(&mut self, amt: usize) {
- let signal = self.inner.with(|state| {
- let full = state.buf.is_full();
- state.buf.pop(amt);
- full
- });
+ let signal = self.inner.with(|state| state.consume(amt));
if signal {
self.inner.pend();
}
@@ -449,20 +456,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> {
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
poll_fn(move |cx| {
- let (poll, empty) = self.inner.with(|state| {
- let empty = state.tx.buf.is_empty();
- let tx_buf = state.tx.buf.push_buf();
- if tx_buf.is_empty() {
- state.tx.waker.register(cx.waker());
- return (Poll::Pending, empty);
- }
-
- let n = core::cmp::min(tx_buf.len(), buf.len());
- tx_buf[..n].copy_from_slice(&buf[..n]);
- state.tx.buf.push(n);
-
- (Poll::Ready(Ok(n)), empty)
- });
+ let (poll, empty) = self.inner.with(|state| state.tx.write(buf, cx.waker()));
if empty {
self.inner.pend();
}
@@ -475,16 +469,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> {
Self: 'a;
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
- poll_fn(move |cx| {
- self.inner.with(|state| {
- if !state.tx.buf.is_empty() {
- state.tx.waker.register(cx.waker());
- return Poll::Pending;
- }
-
- Poll::Ready(Ok(()))
- })
- })
+ poll_fn(move |cx| self.inner.with(|state| state.tx.flush(cx.waker())))
}
}
@@ -495,20 +480,7 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for TxBufferedUart<'d, T>
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
poll_fn(move |cx| {
- let (poll, empty) = self.inner.with(|state| {
- let empty = state.buf.is_empty();
- let tx_buf = state.buf.push_buf();
- if tx_buf.is_empty() {
- state.waker.register(cx.waker());
- return (Poll::Pending, empty);
- }
-
- let n = core::cmp::min(tx_buf.len(), buf.len());
- tx_buf[..n].copy_from_slice(&buf[..n]);
- state.buf.push(n);
-
- (Poll::Ready(Ok(n)), empty)
- });
+ let (poll, empty) = self.inner.with(|state| state.write(buf, cx.waker()));
if empty {
self.inner.pend();
}
@@ -521,15 +493,6 @@ impl<'d, T: Instance + 'd> embedded_io::asynch::Write for TxBufferedUart<'d, T>
Self: 'a;
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
- poll_fn(move |cx| {
- self.inner.with(|state| {
- if !state.buf.is_empty() {
- state.waker.register(cx.waker());
- return Poll::Pending;
- }
-
- Poll::Ready(Ok(()))
- })
- })
+ poll_fn(move |cx| self.inner.with(|state| state.flush(cx.waker())))
}
}