summaryrefslogtreecommitdiff
path: root/src/sys/aio.rs
diff options
context:
space:
mode:
authorAlan Somers <asomers@gmail.com>2021-05-30 10:47:05 -0600
committerAlan Somers <asomers@gmail.com>2021-05-30 17:41:47 -0600
commit5ac876e17df37315585f178ec3b762786e05a092 (patch)
tree79ff8b8b8854c40d77eb83165b73de7ed56a5487 /src/sys/aio.rs
parent9f1b35b5d4a176ed7c09b576b461620c632c0182 (diff)
downloadnix-5ac876e17df37315585f178ec3b762786e05a092.zip
Adapt aio to the world of async/await, and fix some potential unsoundness.
* libc::aiocb must not be moved while the kernel has a pointer to it. This change enforces that requirement by using std::pin. * Split LioCbBuilder out of LioCb. struct LioCb relied on the (incorrect) assumption that a Vec's elements have a stable location in memory. That's not true; they can be moved during Vec::push. The solution is to use a Vec in the new Builder struct, but finalize it to a boxed slice (which doesn't support push) before allowing it to be submitted to the kernel. * Eliminate owned buffer types. mio-aio no longer uses owned buffers with nix::aio. There's little need for it in the world of async/await. I'm not aware of any other consumers. This substantially simplifies the code.
Diffstat (limited to 'src/sys/aio.rs')
-rw-r--r--src/sys/aio.rs720
1 files changed, 302 insertions, 418 deletions
diff --git a/src/sys/aio.rs b/src/sys/aio.rs
index 6c8e8924..7868a294 100644
--- a/src/sys/aio.rs
+++ b/src/sys/aio.rs
@@ -25,11 +25,11 @@ use crate::{Error, Result};
use crate::errno::Errno;
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
-use std::borrow::{Borrow, BorrowMut};
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
+use std::pin::Pin;
use std::ptr::{null, null_mut};
use crate::sys::signal::*;
use std::thread;
@@ -90,120 +90,31 @@ pub enum AioCancelStat {
AioAllDone = libc::AIO_ALLDONE,
}
-/// Owns (uniquely or shared) a memory buffer to keep it from `Drop`ing while
-/// the kernel has a pointer to it.
-pub enum Buffer<'a> {
- /// No buffer to own.
- ///
- /// Used for operations like `aio_fsync` that have no data, or for unsafe
- /// operations that work with raw pointers.
- None,
- /// Keeps a reference to a slice
- Phantom(PhantomData<&'a mut [u8]>),
- /// Generic thing that keeps a buffer from dropping
- BoxedSlice(Box<dyn Borrow<[u8]>>),
- /// Generic thing that keeps a mutable buffer from dropping
- BoxedMutSlice(Box<dyn BorrowMut<[u8]>>),
-}
+/// Newtype that adds Send and Sync to libc::aiocb, which contains raw pointers
+#[repr(transparent)]
+struct LibcAiocb(libc::aiocb);
-impl<'a> Debug for Buffer<'a> {
- // Note: someday it may be possible to Derive Debug for a trait object, but
- // not today.
- // https://github.com/rust-lang/rust/issues/1563
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- match *self {
- Buffer::None => write!(fmt, "None"),
- Buffer::Phantom(p) => p.fmt(fmt),
- Buffer::BoxedSlice(ref bs) => {
- let borrowed : &dyn Borrow<[u8]> = bs.borrow();
- write!(fmt, "BoxedSlice({:?})",
- borrowed as *const dyn Borrow<[u8]>)
- },
- Buffer::BoxedMutSlice(ref bms) => {
- let borrowed : &dyn BorrowMut<[u8]> = bms.borrow();
- write!(fmt, "BoxedMutSlice({:?})",
- borrowed as *const dyn BorrowMut<[u8]>)
- }
- }
- }
-}
+unsafe impl Send for LibcAiocb {}
+unsafe impl Sync for LibcAiocb {}
/// AIO Control Block.
///
/// The basic structure used by all aio functions. Each `AioCb` represents one
/// I/O request.
pub struct AioCb<'a> {
- aiocb: libc::aiocb,
+ aiocb: LibcAiocb,
/// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable
mutable: bool,
/// Could this `AioCb` potentially have any in-kernel state?
in_progress: bool,
- /// Optionally keeps a reference to the data.
- ///
- /// Used to keep buffers from `Drop`'ing, and may be returned once the
- /// `AioCb` is completed by [`buffer`](#method.buffer).
- buffer: Buffer<'a>
+ _buffer: std::marker::PhantomData<&'a [u8]>,
+ _pin: std::marker::PhantomPinned
}
impl<'a> AioCb<'a> {
- /// Remove the inner `Buffer` and return it
- ///
- /// It is an error to call this method while the `AioCb` is still in
- /// progress.
- pub fn buffer(&mut self) -> Buffer<'a> {
- assert!(!self.in_progress);
- let mut x = Buffer::None;
- mem::swap(&mut self.buffer, &mut x);
- x
- }
-
- /// Remove the inner boxed slice, if any, and return it.
- ///
- /// The returned value will be the argument that was passed to
- /// `from_boxed_slice` when this `AioCb` was created.
- ///
- /// It is an error to call this method while the `AioCb` is still in
- /// progress.
- pub fn boxed_slice(&mut self) -> Option<Box<dyn Borrow<[u8]>>> {
- assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
- if let Buffer::BoxedSlice(_) = self.buffer {
- let mut oldbuffer = Buffer::None;
- mem::swap(&mut self.buffer, &mut oldbuffer);
- if let Buffer::BoxedSlice(inner) = oldbuffer {
- Some(inner)
- } else {
- unreachable!();
- }
- } else {
- None
- }
- }
-
- /// Remove the inner boxed mutable slice, if any, and return it.
- ///
- /// The returned value will be the argument that was passed to
- /// `from_boxed_mut_slice` when this `AioCb` was created.
- ///
- /// It is an error to call this method while the `AioCb` is still in
- /// progress.
- pub fn boxed_mut_slice(&mut self) -> Option<Box<dyn BorrowMut<[u8]>>> {
- assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
- if let Buffer::BoxedMutSlice(_) = self.buffer {
- let mut oldbuffer = Buffer::None;
- mem::swap(&mut self.buffer, &mut oldbuffer);
- if let Buffer::BoxedMutSlice(inner) = oldbuffer {
- Some(inner)
- } else {
- unreachable!();
- }
- } else {
- None
- }
- }
-
/// Returns the underlying file descriptor associated with the `AioCb`
pub fn fd(&self) -> RawFd {
- self.aiocb.aio_fildes
+ self.aiocb.0.aio_fildes
}
/// Constructs a new `AioCb` with no associated buffer.
@@ -243,17 +154,39 @@ impl<'a> AioCb<'a> {
/// # }
/// ```
pub fn from_fd(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> AioCb<'a> {
+ sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.aio_offset = 0;
- a.aio_nbytes = 0;
- a.aio_buf = null_mut();
+ a.0.aio_offset = 0;
+ a.0.aio_nbytes = 0;
+ a.0.aio_buf = null_mut();
- AioCb {
+ Box::pin(AioCb {
aiocb: a,
mutable: false,
in_progress: false,
- buffer: Buffer::None
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned
+ })
+ }
+
+ // Private helper
+ #[cfg(not(any(target_os = "ios", target_os = "macos")))]
+ fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8],
+ prio: libc::c_int, sigev_notify: SigevNotify,
+ opcode: LioOpcode) -> AioCb<'a>
+ {
+ let mut a = AioCb::common_init(fd, prio, sigev_notify);
+ a.0.aio_offset = offs;
+ a.0.aio_nbytes = buf.len() as size_t;
+ a.0.aio_buf = buf.as_ptr() as *mut c_void;
+ a.0.aio_lio_opcode = opcode as libc::c_int;
+
+ AioCb {
+ aiocb: a,
+ mutable: true,
+ in_progress: false,
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned
}
}
@@ -262,8 +195,7 @@ impl<'a> AioCb<'a> {
/// The resulting `AioCb` will be suitable for both read and write
/// operations, but only if the borrow checker can guarantee that the slice
/// will outlive the `AioCb`. That will usually be the case if the `AioCb`
- /// is stack-allocated. If the borrow checker gives you trouble, try using
- /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead.
+ /// is stack-allocated.
///
/// # Parameters
///
@@ -316,210 +248,20 @@ impl<'a> AioCb<'a> {
/// ```
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
+ opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.aio_offset = offs;
- a.aio_nbytes = buf.len() as size_t;
- a.aio_buf = buf.as_ptr() as *mut c_void;
- a.aio_lio_opcode = opcode as libc::c_int;
+ a.0.aio_offset = offs;
+ a.0.aio_nbytes = buf.len() as size_t;
+ a.0.aio_buf = buf.as_ptr() as *mut c_void;
+ a.0.aio_lio_opcode = opcode as libc::c_int;
- AioCb {
+ Box::pin(AioCb {
aiocb: a,
mutable: true,
in_progress: false,
- buffer: Buffer::Phantom(PhantomData),
- }
- }
-
- /// The safest and most flexible way to create an `AioCb`.
- ///
- /// Unlike [`from_slice`], this method returns a structure suitable for
- /// placement on the heap. It may be used for write operations, but not
- /// read operations. Unlike `from_ptr`, this method will ensure that the
- /// buffer doesn't `drop` while the kernel is still processing it. Any
- /// object that can be borrowed as a boxed slice will work.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: A boxed slice-like object
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a Vector and use it for writing
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::io::Write;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// # fn main() {
- /// let wbuf = Box::new(Vec::from("CDEF"));
- /// let expected_len = wbuf.len();
- /// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
- /// 2, //offset
- /// wbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len);
- /// # }
- /// ```
- ///
- /// Create an `AioCb` from a `Bytes` object
- ///
- /// ```
- /// # use bytes::Bytes;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// # fn main() {
- /// let wbuf = Box::new(Bytes::from(&b"CDEF"[..]));
- /// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
- /// 2, //offset
- /// wbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// # }
- /// ```
- ///
- /// If a library needs to work with buffers that aren't `Box`ed, it can
- /// create a `Box`ed container for use with this method. Here's an example
- /// using an un`Box`ed `Bytes` object.
- ///
- /// ```
- /// # use bytes::Bytes;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::borrow::Borrow;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// struct BytesContainer(Bytes);
- /// impl Borrow<[u8]> for BytesContainer {
- /// fn borrow(&self) -> &[u8] {
- /// self.0.as_ref()
- /// }
- /// }
- /// fn main() {
- /// let wbuf = Bytes::from(&b"CDEF"[..]);
- /// let boxed_wbuf = Box::new(BytesContainer(wbuf));
- /// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
- /// 2, //offset
- /// boxed_wbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// }
- /// ```
- ///
- /// [`from_slice`]: #method.from_slice
- pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box<dyn Borrow<[u8]>>,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- {
- let borrowed : &dyn Borrow<[u8]> = buf.borrow();
- let slice : &[u8] = borrowed.borrow();
- a.aio_nbytes = slice.len() as size_t;
- a.aio_buf = slice.as_ptr() as *mut c_void;
- }
- a.aio_offset = offs;
- a.aio_lio_opcode = opcode as libc::c_int;
-
- AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- buffer: Buffer::BoxedSlice(buf),
- }
- }
-
- /// The safest and most flexible way to create an `AioCb` for reading.
- ///
- /// Like [`from_boxed_slice`], but the slice is a mutable one. More
- /// flexible than [`from_mut_slice`], because a wide range of objects can be
- /// used.
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a Vector and use it for reading
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::io::Write;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// # fn main() {
- /// const INITIAL: &[u8] = b"abcdef123456";
- /// const LEN: usize = 4;
- /// let rbuf = Box::new(vec![0; LEN]);
- /// let mut f = tempfile().unwrap();
- /// f.write_all(INITIAL).unwrap();
- /// let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(),
- /// 2, //offset
- /// rbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.read().unwrap();
- /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN);
- /// let mut buffer = aiocb.boxed_mut_slice().unwrap();
- /// const EXPECT: &[u8] = b"cdef";
- /// assert_eq!(buffer.borrow_mut(), EXPECT);
- /// # }
- /// ```
- ///
- /// [`from_boxed_slice`]: #method.from_boxed_slice
- /// [`from_mut_slice`]: #method.from_mut_slice
- pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t,
- mut buf: Box<dyn BorrowMut<[u8]>>,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- {
- let borrowed : &mut dyn BorrowMut<[u8]> = buf.borrow_mut();
- let slice : &mut [u8] = borrowed.borrow_mut();
- a.aio_nbytes = slice.len() as size_t;
- a.aio_buf = slice.as_mut_ptr() as *mut c_void;
- }
- a.aio_offset = offs;
- a.aio_lio_opcode = opcode as libc::c_int;
-
- AioCb {
- aiocb: a,
- mutable: true,
- in_progress: false,
- buffer: Buffer::BoxedMutSlice(buf),
- }
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned
+ })
}
/// Constructs a new `AioCb` from a mutable raw pointer
@@ -527,8 +269,7 @@ impl<'a> AioCb<'a> {
/// Unlike `from_mut_slice`, this method returns a structure suitable for
/// placement on the heap. It may be used for both reads and writes. Due
/// to its unsafety, this method is not recommended. It is most useful when
- /// heap allocation is required but for some reason the data cannot be
- /// wrapped in a `struct` that implements `BorrowMut<[u8]>`
+ /// heap allocation is required.
///
/// # Parameters
///
@@ -552,28 +293,27 @@ impl<'a> AioCb<'a> {
pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t,
buf: *mut c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
+ opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.aio_offset = offs;
- a.aio_nbytes = len;
- a.aio_buf = buf;
- a.aio_lio_opcode = opcode as libc::c_int;
+ a.0.aio_offset = offs;
+ a.0.aio_nbytes = len;
+ a.0.aio_buf = buf;
+ a.0.aio_lio_opcode = opcode as libc::c_int;
- AioCb {
+ Box::pin(AioCb {
aiocb: a,
mutable: true,
in_progress: false,
- buffer: Buffer::None
- }
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned,
+ })
}
/// Constructs a new `AioCb` from a raw pointer.
///
/// Unlike `from_slice`, this method returns a structure suitable for
/// placement on the heap. Due to its unsafety, this method is not
- /// recommended. It is most useful when heap allocation is required but for
- /// some reason the data cannot be wrapped in a `struct` that implements
- /// `Borrow<[u8]>`
+ /// recommended. It is most useful when heap allocation is required.
///
/// # Parameters
///
@@ -597,24 +337,49 @@ impl<'a> AioCb<'a> {
pub unsafe fn from_ptr(fd: RawFd, offs: off_t,
buf: *const c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
+ opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.aio_offset = offs;
- a.aio_nbytes = len;
+ a.0.aio_offset = offs;
+ a.0.aio_nbytes = len;
// casting a const ptr to a mutable ptr here is ok, because we set the
// AioCb's mutable field to false
- a.aio_buf = buf as *mut c_void;
- a.aio_lio_opcode = opcode as libc::c_int;
+ a.0.aio_buf = buf as *mut c_void;
+ a.0.aio_lio_opcode = opcode as libc::c_int;
+
+ Box::pin(AioCb {
+ aiocb: a,
+ mutable: false,
+ in_progress: false,
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned
+ })
+ }
+
+ // Private helper
+ fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8],
+ prio: libc::c_int, sigev_notify: SigevNotify,
+ opcode: LioOpcode) -> AioCb
+ {
+ let mut a = AioCb::common_init(fd, prio, sigev_notify);
+ a.0.aio_offset = offs;
+ a.0.aio_nbytes = buf.len() as size_t;
+ // casting an immutable buffer to a mutable pointer looks unsafe,
+ // but technically its only unsafe to dereference it, not to create
+ // it.
+ a.0.aio_buf = buf.as_ptr() as *mut c_void;
+ assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
+ a.0.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
- buffer: Buffer::None
+ _buffer: PhantomData,
+ _pin: std::marker::PhantomPinned
}
}
- /// Like `from_mut_slice`, but works on constant slices rather than
+ /// Like [`from_mut_slice`], but works on constant slices rather than
/// mutable slices.
///
/// An `AioCb` created this way cannot be used with `read`, and its
@@ -657,27 +422,14 @@ impl<'a> AioCb<'a> {
// AioCb, and they must all be of the same type.
pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.aio_offset = offs;
- a.aio_nbytes = buf.len() as size_t;
- // casting an immutable buffer to a mutable pointer looks unsafe,
- // but technically its only unsafe to dereference it, not to create
- // it.
- a.aio_buf = buf.as_ptr() as *mut c_void;
- assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
- a.aio_lio_opcode = opcode as libc::c_int;
-
- AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- buffer: Buffer::None,
- }
+ opcode: LioOpcode) -> Pin<Box<AioCb>>
+ {
+ Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify,
+ opcode))
}
fn common_init(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> libc::aiocb {
+ sigev_notify: SigevNotify) -> LibcAiocb {
// Use mem::zeroed instead of explicitly zeroing each field, because the
// number and name of reserved fields is OS-dependent. On some OSes,
// some reserved fields are used the kernel for state, and must be
@@ -686,12 +438,18 @@ impl<'a> AioCb<'a> {
a.aio_fildes = fd;
a.aio_reqprio = prio;
a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
- a
+ LibcAiocb(a)
}
/// Update the notification settings for an existing `aiocb`
- pub fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
- self.aiocb.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
+ pub fn set_sigev_notify(self: &mut Pin<Box<Self>>,
+ sigev_notify: SigevNotify)
+ {
+ // Safe because we don't move any of the data
+ let selfp = unsafe {
+ self.as_mut().get_unchecked_mut()
+ };
+ selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
/// Cancels an outstanding AIO request.
@@ -741,8 +499,12 @@ impl<'a> AioCb<'a> {
/// # References
///
/// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html)
- pub fn cancel(&mut self) -> Result<AioCancelStat> {
- match unsafe { libc::aio_cancel(self.aiocb.aio_fildes, &mut self.aiocb) } {
+ pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> {
+ let r = unsafe {
+ let selfp = self.as_mut().get_unchecked_mut();
+ libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0)
+ };
+ match r {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
@@ -751,6 +513,18 @@ impl<'a> AioCb<'a> {
}
}
+ fn error_unpinned(self: &mut Self) -> Result<()> {
+ let r = unsafe {
+ libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb)
+ };
+ match r {
+ 0 => Ok(()),
+ num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
+ -1 => Err(Error::last()),
+ num => panic!("unknown aio_error return value {:?}", num)
+ }
+ }
+
/// Retrieve error status of an asynchronous operation.
///
/// If the request has not yet completed, returns `EINPROGRESS`. Otherwise,
@@ -789,13 +563,12 @@ impl<'a> AioCb<'a> {
/// # References
///
/// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html)
- pub fn error(&mut self) -> Result<()> {
- match unsafe { libc::aio_error(&mut self.aiocb as *mut libc::aiocb) } {
- 0 => Ok(()),
- num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
- -1 => Err(Error::last()),
- num => panic!("unknown aio_error return value {:?}", num)
- }
+ pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> {
+ // Safe because error_unpinned doesn't move the data
+ let selfp = unsafe {
+ self.as_mut().get_unchecked_mut()
+ };
+ selfp.error_unpinned()
}
/// An asynchronous version of `fsync(2)`.
@@ -803,13 +576,17 @@ impl<'a> AioCb<'a> {
/// # References
///
/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html)
- pub fn fsync(&mut self, mode: AioFsyncMode) -> Result<()> {
- let p: *mut libc::aiocb = &mut self.aiocb;
- Errno::result(unsafe {
+ pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> {
+ // Safe because we don't move the libc::aiocb
+ unsafe {
+ let selfp = self.as_mut().get_unchecked_mut();
+ Errno::result({
+ let p: *mut libc::aiocb = &mut selfp.aiocb.0;
libc::aio_fsync(mode as libc::c_int, p)
- }).map(|_| {
- self.in_progress = true;
- })
+ }).map(|_| {
+ selfp.in_progress = true;
+ })
+ }
}
/// Returns the `aiocb`'s `LioOpcode` field
@@ -817,7 +594,7 @@ impl<'a> AioCb<'a> {
/// If the value cannot be represented as an `LioOpcode`, returns `None`
/// instead.
pub fn lio_opcode(&self) -> Option<LioOpcode> {
- match self.aiocb.aio_lio_opcode {
+ match self.aiocb.0.aio_lio_opcode {
libc::LIO_READ => Some(LioOpcode::LIO_READ),
libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE),
libc::LIO_NOP => Some(LioOpcode::LIO_NOP),
@@ -831,17 +608,17 @@ impl<'a> AioCb<'a> {
/// number of bytes actually read or written by a completed operation, use
/// `aio_return` instead.
pub fn nbytes(&self) -> usize {
- self.aiocb.aio_nbytes
+ self.aiocb.0.aio_nbytes
}
/// Returns the file offset stored in the `AioCb`
pub fn offset(&self) -> off_t {
- self.aiocb.aio_offset
+ self.aiocb.0.aio_offset
}
/// Returns the priority of the `AioCb`
pub fn priority(&self) -> libc::c_int {
- self.aiocb.aio_reqprio
+ self.aiocb.0.aio_reqprio
}
/// Asynchronously reads from a file descriptor into a buffer
@@ -849,19 +626,31 @@ impl<'a> AioCb<'a> {
/// # References
///
/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html)
- pub fn read(&mut self) -> Result<()> {
+ pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> {
assert!(self.mutable, "Can't read into an immutable buffer");
- let p: *mut libc::aiocb = &mut self.aiocb;
- Errno::result(unsafe {
- libc::aio_read(p)
+ // Safe because we don't move anything
+ let selfp = unsafe {
+ self.as_mut().get_unchecked_mut()
+ };
+ Errno::result({
+ let p: *mut libc::aiocb = &mut selfp.aiocb.0;
+ unsafe { libc::aio_read(p) }
}).map(|_| {
- self.in_progress = true;
+ selfp.in_progress = true;
})
}
/// Returns the `SigEvent` stored in the `AioCb`
pub fn sigevent(&self) -> SigEvent {
- SigEvent::from(&self.aiocb.aio_sigevent)
+ SigEvent::from(&self.aiocb.0.aio_sigevent)
+ }
+
+ fn aio_return_unpinned(self: &mut Self) -> Result<isize> {
+ unsafe {
+ let p: *mut libc::aiocb = &mut self.aiocb.0;
+ self.in_progress = false;
+ Errno::result(libc::aio_return(p))
+ }
}
/// Retrieve return status of an asynchronous operation.
@@ -874,10 +663,12 @@ impl<'a> AioCb<'a> {
///
/// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html)
// Note: this should be just `return`, but that's a reserved word
- pub fn aio_return(&mut self) -> Result<isize> {
- let p: *mut libc::aiocb = &mut self.aiocb;
- self.in_progress = false;
- Errno::result(unsafe { libc::aio_return(p) })
+ pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> {
+ // Safe because aio_return_unpinned does not move the data
+ let selfp = unsafe {
+ self.as_mut().get_unchecked_mut()
+ };
+ selfp.aio_return_unpinned()
}
/// Asynchronously writes from a buffer to a file descriptor
@@ -885,15 +676,18 @@ impl<'a> AioCb<'a> {
/// # References
///
/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html)
- pub fn write(&mut self) -> Result<()> {
- let p: *mut libc::aiocb = &mut self.aiocb;
- Errno::result(unsafe {
- libc::aio_write(p)
+ pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> {
+ // Safe because we don't move anything
+ let selfp = unsafe {
+ self.as_mut().get_unchecked_mut()
+ };
+ Errno::result({
+ let p: *mut libc::aiocb = &mut selfp.aiocb.0;
+ unsafe{ libc::aio_write(p) }
}).map(|_| {
- self.in_progress = true;
+ selfp.in_progress = true;
})
}
-
}
/// Cancels outstanding AIO requests for a given file descriptor.
@@ -970,15 +764,15 @@ pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_NOP);
/// aiocb.write().unwrap();
-/// aio_suspend(&[&aiocb], None).expect("aio_suspend failed");
+/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed");
/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
/// # }
/// ```
/// # References
///
/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html)
-pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
- let plist = list as *const [&AioCb] as *const [*const libc::aiocb];
+pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> {
+ let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb];
let p = plist as *const *const libc::aiocb;
let timep = match timeout {
None => null::<libc::timespec>(),
@@ -992,7 +786,7 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
- .field("aiocb", &self.aiocb)
+ .field("aiocb", &self.aiocb.0)
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.finish()
@@ -1018,7 +812,9 @@ pub struct LioCb<'a> {
///
/// [`AioCb`]: struct.AioCb.html
/// [`listio`]: #method.listio
- pub aiocbs: Vec<AioCb<'a>>,
+ // Their locations in memory must be fixed once they are passed to the
+ // kernel. So this field must be non-public so the user can't swap.
+ aiocbs: Box<[AioCb<'a>]>,
/// The actual list passed to `libc::lio_listio`.
///
@@ -1032,15 +828,19 @@ pub struct LioCb<'a> {
results: Vec<Option<Result<isize>>>
}
+/// LioCb can't automatically impl Send and Sync just because of the raw
+/// pointers in list. But that's stupid. There's no reason that raw pointers
+/// should automatically be non-Send
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+unsafe impl<'a> Send for LioCb<'a> {}
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+unsafe impl<'a> Sync for LioCb<'a> {}
+
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> LioCb<'a> {
- /// Initialize an empty `LioCb`
- pub fn with_capacity(capacity: usize) -> LioCb<'a> {
- LioCb {
- aiocbs: Vec::with_capacity(capacity),
- list: Vec::with_capacity(capacity),
- results: Vec::with_capacity(capacity)
- }
+ /// Return the number of individual [`AioCb`]s contained.
+ pub fn len(&self) -> usize {
+ self.aiocbs.len()
}
/// Submits multiple asynchronous I/O requests with a single system call.
@@ -1069,13 +869,15 @@ impl<'a> LioCb<'a> {
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCb::with_capacity(1);
- /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE));
+ /// let mut liocb = LioCbBuilder::with_capacity(1)
+ /// .emplace_slice(
+ /// f.as_raw_fd(),
+ /// 2, //offset
+ /// WBUF,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_WRITE
+ /// ).finish();
/// liocb.listio(LioMode::LIO_WAIT,
/// SigevNotify::SigevNone).unwrap();
/// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
@@ -1093,7 +895,7 @@ impl<'a> LioCb<'a> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
- for a in &mut self.aiocbs {
+ for a in &mut self.aiocbs.iter_mut() {
a.in_progress = true;
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
@@ -1127,13 +929,15 @@ impl<'a> LioCb<'a> {
/// # fn main() {
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCb::with_capacity(1);
- /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE));
+ /// let mut liocb = LioCbBuilder::with_capacity(1)
+ /// .emplace_slice(
+ /// f.as_raw_fd(),
+ /// 2, //offset
+ /// WBUF,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_WRITE
+ /// ).finish();
/// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
/// while err == Err(Error::Sys(Errno::EIO)) ||
/// err == Err(Error::Sys(Errno::EAGAIN)) {
@@ -1170,10 +974,10 @@ impl<'a> LioCb<'a> {
// Already collected final status for this operation
continue;
}
- match a.error() {
+ match a.error_unpinned() {
Ok(()) => {
// aiocb is complete; collect its status and don't resubmit
- self.results[i] = Some(a.aio_return());
+ self.results[i] = Some(a.aio_return_unpinned());
},
Err(Error::Sys(Errno::EAGAIN)) => {
self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
@@ -1202,7 +1006,7 @@ impl<'a> LioCb<'a> {
/// [`LioCb::listio_resubmit`]: #method.listio_resubmit
pub fn aio_return(&mut self, i: usize) -> Result<isize> {
if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].aio_return()
+ self.aiocbs[i].aio_return_unpinned()
} else {
self.results[i].unwrap()
}
@@ -1218,7 +1022,7 @@ impl<'a> LioCb<'a> {
/// [`LioCb::listio_resubmit`]: #method.listio_resubmit
pub fn error(&mut self, i: usize) -> Result<()> {
if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].error()
+ self.aiocbs[i].error_unpinned()
} else {
Ok(())
}
@@ -1234,13 +1038,93 @@ impl<'a> Debug for LioCb<'a> {
}
}
+/// Used to construct `LioCb`
+// This must be a separate class from LioCb due to pinning constraints. LioCb
+// must use a boxed slice of AioCbs so they will have stable storage, but
+// LioCbBuilder must use a Vec to make construction possible when the final size
+// is unknown.
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+#[derive(Debug)]
+pub struct LioCbBuilder<'a> {
+ /// A collection of [`AioCb`]s.
+ ///
+ /// [`AioCb`]: struct.AioCb.html
+ pub aiocbs: Vec<AioCb<'a>>,
+}
+
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
- fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
+impl<'a> LioCbBuilder<'a> {
+ /// Initialize an empty `LioCb`
+ pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> {
+ LioCbBuilder {
+ aiocbs: Vec::with_capacity(capacity),
+ }
+ }
+
+ /// Add a new operation on an immutable slice to the [`LioCb`] under
+ /// construction.
+ ///
+ /// Arguments are the same as for [`AioCb::from_slice`]
+ ///
+ /// [`LioCb`]: struct.LioCb.html
+ /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice
+ pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8],
+ prio: libc::c_int, sigev_notify: SigevNotify,
+ opcode: LioOpcode) -> Self
+ {
+ self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio,
+ sigev_notify, opcode));
+ self
+ }
+
+ /// Add a new operation on a mutable slice to the [`LioCb`] under
+ /// construction.
+ ///
+ /// Arguments are the same as for [`AioCb::from_mut_slice`]
+ ///
+ /// [`LioCb`]: struct.LioCb.html
+ /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice
+ pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t,
+ buf: &'a mut [u8], prio: libc::c_int,
+ sigev_notify: SigevNotify, opcode: LioOpcode)
+ -> Self
+ {
+ self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio,
+ sigev_notify, opcode));
+ self
+ }
+
+ /// Finalize this [`LioCb`].
+ ///
+ /// Afterwards it will be possible to issue the operations with
+ /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new
+ /// operations with [`LioCb::emplace_slice`] or
+ /// [`LioCb::emplace_mut_slice`].
+ ///
+ /// [`LioCb::listio`]: struct.LioCb.html#method.listio
+ /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice
+ /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice
+ pub fn finish(self) -> LioCb<'a> {
+ let len = self.aiocbs.len();
LioCb {
- list: Vec::with_capacity(src.capacity()),
- results: Vec::with_capacity(src.capacity()),
- aiocbs: src,
+ aiocbs: self.aiocbs.into(),
+ list: Vec::with_capacity(len),
+ results: Vec::with_capacity(len)
}
}
}
+
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+#[cfg(test)]
+mod t {
+ use super::*;
+
+ // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on
+ // it.
+ #[test]
+ fn liocb_is_unpin() {
+ use assert_impl::assert_impl;
+
+ assert_impl!(Unpin: LioCb);
+ }
+}