diff options
author | Alan Somers <asomers@gmail.com> | 2021-05-30 10:47:05 -0600 |
---|---|---|
committer | Alan Somers <asomers@gmail.com> | 2021-05-30 17:41:47 -0600 |
commit | 5ac876e17df37315585f178ec3b762786e05a092 (patch) | |
tree | 79ff8b8b8854c40d77eb83165b73de7ed56a5487 /src/sys/aio.rs | |
parent | 9f1b35b5d4a176ed7c09b576b461620c632c0182 (diff) | |
download | nix-5ac876e17df37315585f178ec3b762786e05a092.zip |
Adapt aio to the world of async/await, and fix some potential unsoundness.
* libc::aiocb must not be moved while the kernel has a pointer to it.
This change enforces that requirement by using std::pin.
* Split LioCbBuilder out of LioCb. struct LioCb relied on the
(incorrect) assumption that a Vec's elements have a stable location in
memory. That's not true; they can be moved during Vec::push. The
solution is to use a Vec in the new Builder struct, but finalize it to
a boxed slice (which doesn't support push) before allowing it to be
submitted to the kernel.
* Eliminate owned buffer types. mio-aio no longer uses owned buffers
with nix::aio. There's little need for it in the world of
async/await. I'm not aware of any other consumers. This
substantially simplifies the code.
Diffstat (limited to 'src/sys/aio.rs')
-rw-r--r-- | src/sys/aio.rs | 720 |
1 files changed, 302 insertions, 418 deletions
diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 6c8e8924..7868a294 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -25,11 +25,11 @@ use crate::{Error, Result}; use crate::errno::Errno; use std::os::unix::io::RawFd; use libc::{c_void, off_t, size_t}; -use std::borrow::{Borrow, BorrowMut}; use std::fmt; use std::fmt::Debug; use std::marker::PhantomData; use std::mem; +use std::pin::Pin; use std::ptr::{null, null_mut}; use crate::sys::signal::*; use std::thread; @@ -90,120 +90,31 @@ pub enum AioCancelStat { AioAllDone = libc::AIO_ALLDONE, } -/// Owns (uniquely or shared) a memory buffer to keep it from `Drop`ing while -/// the kernel has a pointer to it. -pub enum Buffer<'a> { - /// No buffer to own. - /// - /// Used for operations like `aio_fsync` that have no data, or for unsafe - /// operations that work with raw pointers. - None, - /// Keeps a reference to a slice - Phantom(PhantomData<&'a mut [u8]>), - /// Generic thing that keeps a buffer from dropping - BoxedSlice(Box<dyn Borrow<[u8]>>), - /// Generic thing that keeps a mutable buffer from dropping - BoxedMutSlice(Box<dyn BorrowMut<[u8]>>), -} +/// Newtype that adds Send and Sync to libc::aiocb, which contains raw pointers +#[repr(transparent)] +struct LibcAiocb(libc::aiocb); -impl<'a> Debug for Buffer<'a> { - // Note: someday it may be possible to Derive Debug for a trait object, but - // not today. - // https://github.com/rust-lang/rust/issues/1563 - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { - Buffer::None => write!(fmt, "None"), - Buffer::Phantom(p) => p.fmt(fmt), - Buffer::BoxedSlice(ref bs) => { - let borrowed : &dyn Borrow<[u8]> = bs.borrow(); - write!(fmt, "BoxedSlice({:?})", - borrowed as *const dyn Borrow<[u8]>) - }, - Buffer::BoxedMutSlice(ref bms) => { - let borrowed : &dyn BorrowMut<[u8]> = bms.borrow(); - write!(fmt, "BoxedMutSlice({:?})", - borrowed as *const dyn BorrowMut<[u8]>) - } - } - } -} +unsafe impl Send for LibcAiocb {} +unsafe impl Sync for LibcAiocb {} /// AIO Control Block. /// /// The basic structure used by all aio functions. Each `AioCb` represents one /// I/O request. pub struct AioCb<'a> { - aiocb: libc::aiocb, + aiocb: LibcAiocb, /// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable mutable: bool, /// Could this `AioCb` potentially have any in-kernel state? in_progress: bool, - /// Optionally keeps a reference to the data. - /// - /// Used to keep buffers from `Drop`'ing, and may be returned once the - /// `AioCb` is completed by [`buffer`](#method.buffer). - buffer: Buffer<'a> + _buffer: std::marker::PhantomData<&'a [u8]>, + _pin: std::marker::PhantomPinned } impl<'a> AioCb<'a> { - /// Remove the inner `Buffer` and return it - /// - /// It is an error to call this method while the `AioCb` is still in - /// progress. - pub fn buffer(&mut self) -> Buffer<'a> { - assert!(!self.in_progress); - let mut x = Buffer::None; - mem::swap(&mut self.buffer, &mut x); - x - } - - /// Remove the inner boxed slice, if any, and return it. - /// - /// The returned value will be the argument that was passed to - /// `from_boxed_slice` when this `AioCb` was created. - /// - /// It is an error to call this method while the `AioCb` is still in - /// progress. - pub fn boxed_slice(&mut self) -> Option<Box<dyn Borrow<[u8]>>> { - assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); - if let Buffer::BoxedSlice(_) = self.buffer { - let mut oldbuffer = Buffer::None; - mem::swap(&mut self.buffer, &mut oldbuffer); - if let Buffer::BoxedSlice(inner) = oldbuffer { - Some(inner) - } else { - unreachable!(); - } - } else { - None - } - } - - /// Remove the inner boxed mutable slice, if any, and return it. - /// - /// The returned value will be the argument that was passed to - /// `from_boxed_mut_slice` when this `AioCb` was created. - /// - /// It is an error to call this method while the `AioCb` is still in - /// progress. - pub fn boxed_mut_slice(&mut self) -> Option<Box<dyn BorrowMut<[u8]>>> { - assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); - if let Buffer::BoxedMutSlice(_) = self.buffer { - let mut oldbuffer = Buffer::None; - mem::swap(&mut self.buffer, &mut oldbuffer); - if let Buffer::BoxedMutSlice(inner) = oldbuffer { - Some(inner) - } else { - unreachable!(); - } - } else { - None - } - } - /// Returns the underlying file descriptor associated with the `AioCb` pub fn fd(&self) -> RawFd { - self.aiocb.aio_fildes + self.aiocb.0.aio_fildes } /// Constructs a new `AioCb` with no associated buffer. @@ -243,17 +154,39 @@ impl<'a> AioCb<'a> { /// # } /// ``` pub fn from_fd(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> AioCb<'a> { + sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> { let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.aio_offset = 0; - a.aio_nbytes = 0; - a.aio_buf = null_mut(); + a.0.aio_offset = 0; + a.0.aio_nbytes = 0; + a.0.aio_buf = null_mut(); - AioCb { + Box::pin(AioCb { aiocb: a, mutable: false, in_progress: false, - buffer: Buffer::None + _buffer: PhantomData, + _pin: std::marker::PhantomPinned + }) + } + + // Private helper + #[cfg(not(any(target_os = "ios", target_os = "macos")))] + fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8], + prio: libc::c_int, sigev_notify: SigevNotify, + opcode: LioOpcode) -> AioCb<'a> + { + let mut a = AioCb::common_init(fd, prio, sigev_notify); + a.0.aio_offset = offs; + a.0.aio_nbytes = buf.len() as size_t; + a.0.aio_buf = buf.as_ptr() as *mut c_void; + a.0.aio_lio_opcode = opcode as libc::c_int; + + AioCb { + aiocb: a, + mutable: true, + in_progress: false, + _buffer: PhantomData, + _pin: std::marker::PhantomPinned } } @@ -262,8 +195,7 @@ impl<'a> AioCb<'a> { /// The resulting `AioCb` will be suitable for both read and write /// operations, but only if the borrow checker can guarantee that the slice /// will outlive the `AioCb`. That will usually be the case if the `AioCb` - /// is stack-allocated. If the borrow checker gives you trouble, try using - /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead. + /// is stack-allocated. /// /// # Parameters /// @@ -316,210 +248,20 @@ impl<'a> AioCb<'a> { /// ``` pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8], prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { + opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.aio_offset = offs; - a.aio_nbytes = buf.len() as size_t; - a.aio_buf = buf.as_ptr() as *mut c_void; - a.aio_lio_opcode = opcode as libc::c_int; + a.0.aio_offset = offs; + a.0.aio_nbytes = buf.len() as size_t; + a.0.aio_buf = buf.as_ptr() as *mut c_void; + a.0.aio_lio_opcode = opcode as libc::c_int; - AioCb { + Box::pin(AioCb { aiocb: a, mutable: true, in_progress: false, - buffer: Buffer::Phantom(PhantomData), - } - } - - /// The safest and most flexible way to create an `AioCb`. - /// - /// Unlike [`from_slice`], this method returns a structure suitable for - /// placement on the heap. It may be used for write operations, but not - /// read operations. Unlike `from_ptr`, this method will ensure that the - /// buffer doesn't `drop` while the kernel is still processing it. Any - /// object that can be borrowed as a boxed slice will work. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: A boxed slice-like object - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Examples - /// - /// Create an `AioCb` from a Vector and use it for writing - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::io::Write; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// # fn main() { - /// let wbuf = Box::new(Vec::from("CDEF")); - /// let expected_len = wbuf.len(); - /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), - /// 2, //offset - /// wbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len); - /// # } - /// ``` - /// - /// Create an `AioCb` from a `Bytes` object - /// - /// ``` - /// # use bytes::Bytes; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// # fn main() { - /// let wbuf = Box::new(Bytes::from(&b"CDEF"[..])); - /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), - /// 2, //offset - /// wbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// # } - /// ``` - /// - /// If a library needs to work with buffers that aren't `Box`ed, it can - /// create a `Box`ed container for use with this method. Here's an example - /// using an un`Box`ed `Bytes` object. - /// - /// ``` - /// # use bytes::Bytes; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::borrow::Borrow; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// struct BytesContainer(Bytes); - /// impl Borrow<[u8]> for BytesContainer { - /// fn borrow(&self) -> &[u8] { - /// self.0.as_ref() - /// } - /// } - /// fn main() { - /// let wbuf = Bytes::from(&b"CDEF"[..]); - /// let boxed_wbuf = Box::new(BytesContainer(wbuf)); - /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), - /// 2, //offset - /// boxed_wbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// } - /// ``` - /// - /// [`from_slice`]: #method.from_slice - pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box<dyn Borrow<[u8]>>, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - { - let borrowed : &dyn Borrow<[u8]> = buf.borrow(); - let slice : &[u8] = borrowed.borrow(); - a.aio_nbytes = slice.len() as size_t; - a.aio_buf = slice.as_ptr() as *mut c_void; - } - a.aio_offset = offs; - a.aio_lio_opcode = opcode as libc::c_int; - - AioCb { - aiocb: a, - mutable: false, - in_progress: false, - buffer: Buffer::BoxedSlice(buf), - } - } - - /// The safest and most flexible way to create an `AioCb` for reading. - /// - /// Like [`from_boxed_slice`], but the slice is a mutable one. More - /// flexible than [`from_mut_slice`], because a wide range of objects can be - /// used. - /// - /// # Examples - /// - /// Create an `AioCb` from a Vector and use it for reading - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::io::Write; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// # fn main() { - /// const INITIAL: &[u8] = b"abcdef123456"; - /// const LEN: usize = 4; - /// let rbuf = Box::new(vec![0; LEN]); - /// let mut f = tempfile().unwrap(); - /// f.write_all(INITIAL).unwrap(); - /// let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(), - /// 2, //offset - /// rbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.read().unwrap(); - /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN); - /// let mut buffer = aiocb.boxed_mut_slice().unwrap(); - /// const EXPECT: &[u8] = b"cdef"; - /// assert_eq!(buffer.borrow_mut(), EXPECT); - /// # } - /// ``` - /// - /// [`from_boxed_slice`]: #method.from_boxed_slice - /// [`from_mut_slice`]: #method.from_mut_slice - pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t, - mut buf: Box<dyn BorrowMut<[u8]>>, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - { - let borrowed : &mut dyn BorrowMut<[u8]> = buf.borrow_mut(); - let slice : &mut [u8] = borrowed.borrow_mut(); - a.aio_nbytes = slice.len() as size_t; - a.aio_buf = slice.as_mut_ptr() as *mut c_void; - } - a.aio_offset = offs; - a.aio_lio_opcode = opcode as libc::c_int; - - AioCb { - aiocb: a, - mutable: true, - in_progress: false, - buffer: Buffer::BoxedMutSlice(buf), - } + _buffer: PhantomData, + _pin: std::marker::PhantomPinned + }) } /// Constructs a new `AioCb` from a mutable raw pointer @@ -527,8 +269,7 @@ impl<'a> AioCb<'a> { /// Unlike `from_mut_slice`, this method returns a structure suitable for /// placement on the heap. It may be used for both reads and writes. Due /// to its unsafety, this method is not recommended. It is most useful when - /// heap allocation is required but for some reason the data cannot be - /// wrapped in a `struct` that implements `BorrowMut<[u8]>` + /// heap allocation is required. /// /// # Parameters /// @@ -552,28 +293,27 @@ impl<'a> AioCb<'a> { pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t, buf: *mut c_void, len: usize, prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { + opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.aio_offset = offs; - a.aio_nbytes = len; - a.aio_buf = buf; - a.aio_lio_opcode = opcode as libc::c_int; + a.0.aio_offset = offs; + a.0.aio_nbytes = len; + a.0.aio_buf = buf; + a.0.aio_lio_opcode = opcode as libc::c_int; - AioCb { + Box::pin(AioCb { aiocb: a, mutable: true, in_progress: false, - buffer: Buffer::None - } + _buffer: PhantomData, + _pin: std::marker::PhantomPinned, + }) } /// Constructs a new `AioCb` from a raw pointer. /// /// Unlike `from_slice`, this method returns a structure suitable for /// placement on the heap. Due to its unsafety, this method is not - /// recommended. It is most useful when heap allocation is required but for - /// some reason the data cannot be wrapped in a `struct` that implements - /// `Borrow<[u8]>` + /// recommended. It is most useful when heap allocation is required. /// /// # Parameters /// @@ -597,24 +337,49 @@ impl<'a> AioCb<'a> { pub unsafe fn from_ptr(fd: RawFd, offs: off_t, buf: *const c_void, len: usize, prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { + opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.aio_offset = offs; - a.aio_nbytes = len; + a.0.aio_offset = offs; + a.0.aio_nbytes = len; // casting a const ptr to a mutable ptr here is ok, because we set the // AioCb's mutable field to false - a.aio_buf = buf as *mut c_void; - a.aio_lio_opcode = opcode as libc::c_int; + a.0.aio_buf = buf as *mut c_void; + a.0.aio_lio_opcode = opcode as libc::c_int; + + Box::pin(AioCb { + aiocb: a, + mutable: false, + in_progress: false, + _buffer: PhantomData, + _pin: std::marker::PhantomPinned + }) + } + + // Private helper + fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8], + prio: libc::c_int, sigev_notify: SigevNotify, + opcode: LioOpcode) -> AioCb + { + let mut a = AioCb::common_init(fd, prio, sigev_notify); + a.0.aio_offset = offs; + a.0.aio_nbytes = buf.len() as size_t; + // casting an immutable buffer to a mutable pointer looks unsafe, + // but technically its only unsafe to dereference it, not to create + // it. + a.0.aio_buf = buf.as_ptr() as *mut c_void; + assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer"); + a.0.aio_lio_opcode = opcode as libc::c_int; AioCb { aiocb: a, mutable: false, in_progress: false, - buffer: Buffer::None + _buffer: PhantomData, + _pin: std::marker::PhantomPinned } } - /// Like `from_mut_slice`, but works on constant slices rather than + /// Like [`from_mut_slice`], but works on constant slices rather than /// mutable slices. /// /// An `AioCb` created this way cannot be used with `read`, and its @@ -657,27 +422,14 @@ impl<'a> AioCb<'a> { // AioCb, and they must all be of the same type. pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8], prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.aio_offset = offs; - a.aio_nbytes = buf.len() as size_t; - // casting an immutable buffer to a mutable pointer looks unsafe, - // but technically its only unsafe to dereference it, not to create - // it. - a.aio_buf = buf.as_ptr() as *mut c_void; - assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer"); - a.aio_lio_opcode = opcode as libc::c_int; - - AioCb { - aiocb: a, - mutable: false, - in_progress: false, - buffer: Buffer::None, - } + opcode: LioOpcode) -> Pin<Box<AioCb>> + { + Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify, + opcode)) } fn common_init(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> libc::aiocb { + sigev_notify: SigevNotify) -> LibcAiocb { // Use mem::zeroed instead of explicitly zeroing each field, because the // number and name of reserved fields is OS-dependent. On some OSes, // some reserved fields are used the kernel for state, and must be @@ -686,12 +438,18 @@ impl<'a> AioCb<'a> { a.aio_fildes = fd; a.aio_reqprio = prio; a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); - a + LibcAiocb(a) } /// Update the notification settings for an existing `aiocb` - pub fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) { - self.aiocb.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); + pub fn set_sigev_notify(self: &mut Pin<Box<Self>>, + sigev_notify: SigevNotify) + { + // Safe because we don't move any of the data + let selfp = unsafe { + self.as_mut().get_unchecked_mut() + }; + selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); } /// Cancels an outstanding AIO request. @@ -741,8 +499,12 @@ impl<'a> AioCb<'a> { /// # References /// /// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) - pub fn cancel(&mut self) -> Result<AioCancelStat> { - match unsafe { libc::aio_cancel(self.aiocb.aio_fildes, &mut self.aiocb) } { + pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> { + let r = unsafe { + let selfp = self.as_mut().get_unchecked_mut(); + libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0) + }; + match r { libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), @@ -751,6 +513,18 @@ impl<'a> AioCb<'a> { } } + fn error_unpinned(self: &mut Self) -> Result<()> { + let r = unsafe { + libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb) + }; + match r { + 0 => Ok(()), + num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))), + -1 => Err(Error::last()), + num => panic!("unknown aio_error return value {:?}", num) + } + } + /// Retrieve error status of an asynchronous operation. /// /// If the request has not yet completed, returns `EINPROGRESS`. Otherwise, @@ -789,13 +563,12 @@ impl<'a> AioCb<'a> { /// # References /// /// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html) - pub fn error(&mut self) -> Result<()> { - match unsafe { libc::aio_error(&mut self.aiocb as *mut libc::aiocb) } { - 0 => Ok(()), - num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))), - -1 => Err(Error::last()), - num => panic!("unknown aio_error return value {:?}", num) - } + pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> { + // Safe because error_unpinned doesn't move the data + let selfp = unsafe { + self.as_mut().get_unchecked_mut() + }; + selfp.error_unpinned() } /// An asynchronous version of `fsync(2)`. @@ -803,13 +576,17 @@ impl<'a> AioCb<'a> { /// # References /// /// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) - pub fn fsync(&mut self, mode: AioFsyncMode) -> Result<()> { - let p: *mut libc::aiocb = &mut self.aiocb; - Errno::result(unsafe { + pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> { + // Safe because we don't move the libc::aiocb + unsafe { + let selfp = self.as_mut().get_unchecked_mut(); + Errno::result({ + let p: *mut libc::aiocb = &mut selfp.aiocb.0; libc::aio_fsync(mode as libc::c_int, p) - }).map(|_| { - self.in_progress = true; - }) + }).map(|_| { + selfp.in_progress = true; + }) + } } /// Returns the `aiocb`'s `LioOpcode` field @@ -817,7 +594,7 @@ impl<'a> AioCb<'a> { /// If the value cannot be represented as an `LioOpcode`, returns `None` /// instead. pub fn lio_opcode(&self) -> Option<LioOpcode> { - match self.aiocb.aio_lio_opcode { + match self.aiocb.0.aio_lio_opcode { libc::LIO_READ => Some(LioOpcode::LIO_READ), libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE), libc::LIO_NOP => Some(LioOpcode::LIO_NOP), @@ -831,17 +608,17 @@ impl<'a> AioCb<'a> { /// number of bytes actually read or written by a completed operation, use /// `aio_return` instead. pub fn nbytes(&self) -> usize { - self.aiocb.aio_nbytes + self.aiocb.0.aio_nbytes } /// Returns the file offset stored in the `AioCb` pub fn offset(&self) -> off_t { - self.aiocb.aio_offset + self.aiocb.0.aio_offset } /// Returns the priority of the `AioCb` pub fn priority(&self) -> libc::c_int { - self.aiocb.aio_reqprio + self.aiocb.0.aio_reqprio } /// Asynchronously reads from a file descriptor into a buffer @@ -849,19 +626,31 @@ impl<'a> AioCb<'a> { /// # References /// /// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) - pub fn read(&mut self) -> Result<()> { + pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> { assert!(self.mutable, "Can't read into an immutable buffer"); - let p: *mut libc::aiocb = &mut self.aiocb; - Errno::result(unsafe { - libc::aio_read(p) + // Safe because we don't move anything + let selfp = unsafe { + self.as_mut().get_unchecked_mut() + }; + Errno::result({ + let p: *mut libc::aiocb = &mut selfp.aiocb.0; + unsafe { libc::aio_read(p) } }).map(|_| { - self.in_progress = true; + selfp.in_progress = true; }) } /// Returns the `SigEvent` stored in the `AioCb` pub fn sigevent(&self) -> SigEvent { - SigEvent::from(&self.aiocb.aio_sigevent) + SigEvent::from(&self.aiocb.0.aio_sigevent) + } + + fn aio_return_unpinned(self: &mut Self) -> Result<isize> { + unsafe { + let p: *mut libc::aiocb = &mut self.aiocb.0; + self.in_progress = false; + Errno::result(libc::aio_return(p)) + } } /// Retrieve return status of an asynchronous operation. @@ -874,10 +663,12 @@ impl<'a> AioCb<'a> { /// /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) // Note: this should be just `return`, but that's a reserved word - pub fn aio_return(&mut self) -> Result<isize> { - let p: *mut libc::aiocb = &mut self.aiocb; - self.in_progress = false; - Errno::result(unsafe { libc::aio_return(p) }) + pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> { + // Safe because aio_return_unpinned does not move the data + let selfp = unsafe { + self.as_mut().get_unchecked_mut() + }; + selfp.aio_return_unpinned() } /// Asynchronously writes from a buffer to a file descriptor @@ -885,15 +676,18 @@ impl<'a> AioCb<'a> { /// # References /// /// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) - pub fn write(&mut self) -> Result<()> { - let p: *mut libc::aiocb = &mut self.aiocb; - Errno::result(unsafe { - libc::aio_write(p) + pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> { + // Safe because we don't move anything + let selfp = unsafe { + self.as_mut().get_unchecked_mut() + }; + Errno::result({ + let p: *mut libc::aiocb = &mut selfp.aiocb.0; + unsafe{ libc::aio_write(p) } }).map(|_| { - self.in_progress = true; + selfp.in_progress = true; }) } - } /// Cancels outstanding AIO requests for a given file descriptor. @@ -970,15 +764,15 @@ pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> { /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); /// aiocb.write().unwrap(); -/// aio_suspend(&[&aiocb], None).expect("aio_suspend failed"); +/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed"); /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); /// # } /// ``` /// # References /// /// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html) -pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> { - let plist = list as *const [&AioCb] as *const [*const libc::aiocb]; +pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> { + let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb]; let p = plist as *const *const libc::aiocb; let timep = match timeout { None => null::<libc::timespec>(), @@ -992,7 +786,7 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> { impl<'a> Debug for AioCb<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("AioCb") - .field("aiocb", &self.aiocb) + .field("aiocb", &self.aiocb.0) .field("mutable", &self.mutable) .field("in_progress", &self.in_progress) .finish() @@ -1018,7 +812,9 @@ pub struct LioCb<'a> { /// /// [`AioCb`]: struct.AioCb.html /// [`listio`]: #method.listio - pub aiocbs: Vec<AioCb<'a>>, + // Their locations in memory must be fixed once they are passed to the + // kernel. So this field must be non-public so the user can't swap. + aiocbs: Box<[AioCb<'a>]>, /// The actual list passed to `libc::lio_listio`. /// @@ -1032,15 +828,19 @@ pub struct LioCb<'a> { results: Vec<Option<Result<isize>>> } +/// LioCb can't automatically impl Send and Sync just because of the raw +/// pointers in list. But that's stupid. There's no reason that raw pointers +/// should automatically be non-Send +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +unsafe impl<'a> Send for LioCb<'a> {} +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +unsafe impl<'a> Sync for LioCb<'a> {} + #[cfg(not(any(target_os = "ios", target_os = "macos")))] impl<'a> LioCb<'a> { - /// Initialize an empty `LioCb` - pub fn with_capacity(capacity: usize) -> LioCb<'a> { - LioCb { - aiocbs: Vec::with_capacity(capacity), - list: Vec::with_capacity(capacity), - results: Vec::with_capacity(capacity) - } + /// Return the number of individual [`AioCb`]s contained. + pub fn len(&self) -> usize { + self.aiocbs.len() } /// Submits multiple asynchronous I/O requests with a single system call. @@ -1069,13 +869,15 @@ impl<'a> LioCb<'a> { /// # fn main() { /// const WBUF: &[u8] = b"abcdef123456"; /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCb::with_capacity(1); - /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE)); + /// let mut liocb = LioCbBuilder::with_capacity(1) + /// .emplace_slice( + /// f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE + /// ).finish(); /// liocb.listio(LioMode::LIO_WAIT, /// SigevNotify::SigevNone).unwrap(); /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); @@ -1093,7 +895,7 @@ impl<'a> LioCb<'a> { let sigev = SigEvent::new(sigev_notify); let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; self.list.clear(); - for a in &mut self.aiocbs { + for a in &mut self.aiocbs.iter_mut() { a.in_progress = true; self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); @@ -1127,13 +929,15 @@ impl<'a> LioCb<'a> { /// # fn main() { /// const WBUF: &[u8] = b"abcdef123456"; /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCb::with_capacity(1); - /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE)); + /// let mut liocb = LioCbBuilder::with_capacity(1) + /// .emplace_slice( + /// f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE + /// ).finish(); /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); /// while err == Err(Error::Sys(Errno::EIO)) || /// err == Err(Error::Sys(Errno::EAGAIN)) { @@ -1170,10 +974,10 @@ impl<'a> LioCb<'a> { // Already collected final status for this operation continue; } - match a.error() { + match a.error_unpinned() { Ok(()) => { // aiocb is complete; collect its status and don't resubmit - self.results[i] = Some(a.aio_return()); + self.results[i] = Some(a.aio_return_unpinned()); }, Err(Error::Sys(Errno::EAGAIN)) => { self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); @@ -1202,7 +1006,7 @@ impl<'a> LioCb<'a> { /// [`LioCb::listio_resubmit`]: #method.listio_resubmit pub fn aio_return(&mut self, i: usize) -> Result<isize> { if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].aio_return() + self.aiocbs[i].aio_return_unpinned() } else { self.results[i].unwrap() } @@ -1218,7 +1022,7 @@ impl<'a> LioCb<'a> { /// [`LioCb::listio_resubmit`]: #method.listio_resubmit pub fn error(&mut self, i: usize) -> Result<()> { if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].error() + self.aiocbs[i].error_unpinned() } else { Ok(()) } @@ -1234,13 +1038,93 @@ impl<'a> Debug for LioCb<'a> { } } +/// Used to construct `LioCb` +// This must be a separate class from LioCb due to pinning constraints. LioCb +// must use a boxed slice of AioCbs so they will have stable storage, but +// LioCbBuilder must use a Vec to make construction possible when the final size +// is unknown. +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +#[derive(Debug)] +pub struct LioCbBuilder<'a> { + /// A collection of [`AioCb`]s. + /// + /// [`AioCb`]: struct.AioCb.html + pub aiocbs: Vec<AioCb<'a>>, +} + #[cfg(not(any(target_os = "ios", target_os = "macos")))] -impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> { - fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> { +impl<'a> LioCbBuilder<'a> { + /// Initialize an empty `LioCb` + pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> { + LioCbBuilder { + aiocbs: Vec::with_capacity(capacity), + } + } + + /// Add a new operation on an immutable slice to the [`LioCb`] under + /// construction. + /// + /// Arguments are the same as for [`AioCb::from_slice`] + /// + /// [`LioCb`]: struct.LioCb.html + /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice + pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8], + prio: libc::c_int, sigev_notify: SigevNotify, + opcode: LioOpcode) -> Self + { + self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio, + sigev_notify, opcode)); + self + } + + /// Add a new operation on a mutable slice to the [`LioCb`] under + /// construction. + /// + /// Arguments are the same as for [`AioCb::from_mut_slice`] + /// + /// [`LioCb`]: struct.LioCb.html + /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice + pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t, + buf: &'a mut [u8], prio: libc::c_int, + sigev_notify: SigevNotify, opcode: LioOpcode) + -> Self + { + self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio, + sigev_notify, opcode)); + self + } + + /// Finalize this [`LioCb`]. + /// + /// Afterwards it will be possible to issue the operations with + /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new + /// operations with [`LioCb::emplace_slice`] or + /// [`LioCb::emplace_mut_slice`]. + /// + /// [`LioCb::listio`]: struct.LioCb.html#method.listio + /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice + /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice + pub fn finish(self) -> LioCb<'a> { + let len = self.aiocbs.len(); LioCb { - list: Vec::with_capacity(src.capacity()), - results: Vec::with_capacity(src.capacity()), - aiocbs: src, + aiocbs: self.aiocbs.into(), + list: Vec::with_capacity(len), + results: Vec::with_capacity(len) } } } + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +#[cfg(test)] +mod t { + use super::*; + + // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on + // it. + #[test] + fn liocb_is_unpin() { + use assert_impl::assert_impl; + + assert_impl!(Unpin: LioCb); + } +} |