summaryrefslogtreecommitdiff
path: root/src/sys
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-05-14 20:17:18 +0000
committerGitHub <noreply@github.com>2022-05-14 20:17:18 +0000
commit69738c0fd03af19053c5701a984f923ecbbfada6 (patch)
tree9a5f4ea493dffbeaffa362bb516732bdde2cb27c /src/sys
parent1c36d49c0b7ef41ab3539b22dbe86cf3863fc219 (diff)
parent0c07a9e4690fc4b2d8ceb90ac463c79e50d70947 (diff)
downloadnix-69738c0fd03af19053c5701a984f923ecbbfada6.zip
Merge #1713
1713: Rewrite the aio module r=rtzoeller a=asomers The existing AIO implementation has some problems: 1) The in_progress field is checked at runtime, not compile time. 2) The mutable field is checked at runtime, not compile time. 4) A downstream lio_listio user must store extra state to track whether the whole operation is partially, completely, or not at all submitted. 4) Nix does heap allocation itself, rather than allowing the caller to choose it. This can result in double (or triple, or quadruple) boxing. 5) There's no easy way to use lio_listio to submit multiple operations with a single syscall, but poll each individually. 6) The lio_listio usage is far from transparent and zero-cost. 7) No aio_readv or aio_writev support. 8) priority has type c_int; should be i32 9) aio_return should return a usize instead of an isize, since it only uses negative values to indicate errors, which Rust represents via the Result type. This rewrite solves several problems: 1) Unsolved. I don't think it can be solved without something like C++'s guaranteed type elision. It might require changing the signature of Future::poll too. 2) Solved. 3) Solved, by the new in_progress method and by removing the complicated lio_listio resubmit code. 4) Solved. 5) Solved. 6) Solved, by removing the lio_listo resubmit code. It can be reimplemented downstream if necessary. Or even in Nix, but it doesn't fit Nix's theme of zero-cost abstractions. 7) Solved. 8) Solved. 9) Solved. Co-authored-by: Alan Somers <asomers@gmail.com>
Diffstat (limited to 'src/sys')
-rw-r--r--src/sys/aio.rs1801
1 files changed, 958 insertions, 843 deletions
diff --git a/src/sys/aio.rs b/src/sys/aio.rs
index 4780cdee..6ff88469 100644
--- a/src/sys/aio.rs
+++ b/src/sys/aio.rs
@@ -2,9 +2,12 @@
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
-//! devices. It supports [`read`](struct.AioCb.html#method.read),
-//! [`write`](struct.AioCb.html#method.write), and
-//! [`fsync`](struct.AioCb.html#method.fsync) operations. Completion
+//! devices. It supports [`read`](struct.AioRead.html#method.new),
+//! [`write`](struct.AioWrite.html#method.new),
+//! [`fsync`](struct.AioFsync.html#method.new),
+//! [`readv`](struct.AioReadv.html#method.new), and
+//! [`writev`](struct.AioWritev.html#method.new), operations, subject to
+//! platform support. Completion
//! notifications can optionally be delivered via
//! [signals](../signal/enum.SigevNotify.html#variant.SigevSignal), via the
//! [`aio_suspend`](fn.aio_suspend.html) function, or via polling. Some
@@ -17,23 +20,30 @@
//! that they will be executed atomically.
//!
//! Outstanding operations may be cancelled with
-//! [`cancel`](struct.AioCb.html#method.cancel) or
+//! [`cancel`](trait.Aio.html#method.cancel) or
//! [`aio_cancel_all`](fn.aio_cancel_all.html), though the operating system may
//! not support this for all filesystems and devices.
+#[cfg(target_os = "freebsd")]
+use std::io::{IoSlice, IoSliceMut};
+use std::{
+ convert::TryFrom,
+ fmt::{self, Debug},
+ marker::{PhantomData, PhantomPinned},
+ mem,
+ os::unix::io::RawFd,
+ pin::Pin,
+ ptr,
+ thread,
+};
-use crate::Result;
-use crate::errno::Errno;
-use std::os::unix::io::RawFd;
-use libc::{c_void, off_t, size_t};
-use std::fmt;
-use std::fmt::Debug;
-use std::marker::PhantomData;
-use std::mem;
-use std::pin::Pin;
-use std::ptr::{null, null_mut};
-use crate::sys::signal::*;
-use std::thread;
-use crate::sys::time::TimeSpec;
+use libc::{c_void, off_t};
+use pin_utils::unsafe_pinned;
+
+use crate::{
+ errno::Errno,
+ sys::{signal::*, time::TimeSpec},
+ Result,
+};
libc_enum! {
/// Mode for `AioCb::fsync`. Controls whether only data or both data and
@@ -52,22 +62,7 @@ libc_enum! {
#[cfg_attr(docsrs, doc(cfg(all())))]
O_DSYNC
}
-}
-
-libc_enum! {
- /// When used with [`lio_listio`](fn.lio_listio.html), determines whether a
- /// given `aiocb` should be used for a read operation, a write operation, or
- /// ignored. Has no effect for any other aio functions.
- #[repr(i32)]
- #[non_exhaustive]
- pub enum LioOpcode {
- /// No operation
- LIO_NOP,
- /// Write data as if by a call to [`AioCb::write`]
- LIO_WRITE,
- /// Write data as if by a call to [`AioCb::read`]
- LIO_READ,
- }
+ impl TryFrom<i32>
}
libc_enum! {
@@ -103,354 +98,133 @@ struct LibcAiocb(libc::aiocb);
unsafe impl Send for LibcAiocb {}
unsafe impl Sync for LibcAiocb {}
-/// AIO Control Block.
-///
-/// The basic structure used by all aio functions. Each `AioCb` represents one
-/// I/O request.
-pub struct AioCb<'a> {
- aiocb: LibcAiocb,
- /// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable
- mutable: bool,
+/// Base class for all AIO operations. Should only be used directly when
+/// checking for completion.
+// We could create some kind of AsPinnedMut trait, and implement it for all aio
+// ops, allowing the crate's users to get pinned references to `AioCb`. That
+// could save some code for things like polling methods. But IMHO it would
+// provide polymorphism at the wrong level. Instead, the best place for
+// polymorphism is at the level of `Futures`.
+#[repr(C)]
+struct AioCb {
+ aiocb: LibcAiocb,
/// Could this `AioCb` potentially have any in-kernel state?
+ // It would be really nice to perform the in-progress check entirely at
+ // compile time. But I can't figure out how, because:
+ // * Future::poll takes a `Pin<&mut self>` rather than `self`, and
+ // * Rust's lack of an equivalent of C++'s Guaranteed Copy Elision means
+ // that there's no way to write an AioCb constructor that neither boxes
+ // the object itself, nor moves it during return.
in_progress: bool,
- _buffer: std::marker::PhantomData<&'a [u8]>,
- _pin: std::marker::PhantomPinned
}
-impl<'a> AioCb<'a> {
- /// Returns the underlying file descriptor associated with the `AioCb`
- pub fn fd(&self) -> RawFd {
- self.aiocb.0.aio_fildes
- }
+impl AioCb {
+ pin_utils::unsafe_unpinned!(aiocb: LibcAiocb);
- /// Constructs a new `AioCb` with no associated buffer.
- ///
- /// The resulting `AioCb` structure is suitable for use with `AioCb::fsync`.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`.
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a raw file descriptor and use it for an
- /// [`fsync`](#method.fsync) operation.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify::SigevNone;
- /// # use std::{thread, time};
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// let f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_fd( f.as_raw_fd(), 0, SigevNone);
- /// aiocb.fsync(AioFsyncMode::O_SYNC).expect("aio_fsync failed early");
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// aiocb.aio_return().expect("aio_fsync failed late");
- /// ```
- pub fn from_fd(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = 0;
- a.0.aio_nbytes = 0;
- a.0.aio_buf = null_mut();
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn aio_return(mut self: Pin<&mut Self>) -> Result<usize> {
+ self.in_progress = false;
+ unsafe {
+ let p: *mut libc::aiocb = &mut self.aiocb.0;
+ Errno::result(libc::aio_return(p))
+ }
+ .map(|r| r as usize)
}
- // Private helper
- #[cfg(not(any(target_os = "ios", target_os = "macos")))]
- fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a>
- {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
+ fn cancel(mut self: Pin<&mut Self>) -> Result<AioCancelStat> {
+ let r = unsafe {
+ libc::aio_cancel(self.aiocb.0.aio_fildes, &mut self.aiocb.0)
+ };
+ match r {
+ libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
+ libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
+ libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
+ -1 => Err(Errno::last()),
+ _ => panic!("unknown aio_cancel return value"),
+ }
+ }
+ fn common_init(fd: RawFd, prio: i32, sigev_notify: SigevNotify) -> Self {
+ // Use mem::zeroed instead of explicitly zeroing each field, because the
+ // number and name of reserved fields is OS-dependent. On some OSes,
+ // some reserved fields are used the kernel for state, and must be
+ // explicitly zeroed when allocated.
+ let mut a = unsafe { mem::zeroed::<libc::aiocb>() };
+ a.aio_fildes = fd;
+ a.aio_reqprio = prio;
+ a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
AioCb {
- aiocb: a,
- mutable: true,
+ aiocb: LibcAiocb(a),
in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
}
}
- /// Constructs a new `AioCb` from a mutable slice.
- ///
- /// The resulting `AioCb` will be suitable for both read and write
- /// operations, but only if the borrow checker can guarantee that the slice
- /// will outlive the `AioCb`. That will usually be the case if the `AioCb`
- /// is stack-allocated.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: A memory buffer
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a mutable slice and read into it.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::io::Write;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const INITIAL: &[u8] = b"abcdef123456";
- /// const LEN: usize = 4;
- /// let mut rbuf = vec![0; LEN];
- /// let mut f = tempfile().unwrap();
- /// f.write_all(INITIAL).unwrap();
- /// {
- /// let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
- /// 2, //offset
- /// &mut rbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.read().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN);
- /// }
- /// assert_eq!(rbuf, b"cdef");
- /// ```
- pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: true,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn error(self: Pin<&mut Self>) -> Result<()> {
+ let r = unsafe { libc::aio_error(&self.aiocb().0) };
+ match r {
+ 0 => Ok(()),
+ num if num > 0 => Err(Errno::from_i32(num)),
+ -1 => Err(Errno::last()),
+ num => panic!("unknown aio_error return value {:?}", num),
+ }
}
- /// Constructs a new `AioCb` from a mutable raw pointer
- ///
- /// Unlike `from_mut_slice`, this method returns a structure suitable for
- /// placement on the heap. It may be used for both reads and writes. Due
- /// to its unsafety, this method is not recommended. It is most useful when
- /// heap allocation is required.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: Pointer to the memory buffer
- /// * `len`: Length of the buffer pointed to by `buf`
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Safety
- ///
- /// The caller must ensure that the storage pointed to by `buf` outlives the
- /// `AioCb`. The lifetime checker can't help here.
- pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t,
- buf: *mut c_void, len: usize,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = len;
- a.0.aio_buf = buf;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: true,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned,
- })
+ fn in_progress(&self) -> bool {
+ self.in_progress
}
- /// Constructs a new `AioCb` from a raw pointer.
- ///
- /// Unlike `from_slice`, this method returns a structure suitable for
- /// placement on the heap. Due to its unsafety, this method is not
- /// recommended. It is most useful when heap allocation is required.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: Pointer to the memory buffer
- /// * `len`: Length of the buffer pointed to by `buf`
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Safety
- ///
- /// The caller must ensure that the storage pointed to by `buf` outlives the
- /// `AioCb`. The lifetime checker can't help here.
- pub unsafe fn from_ptr(fd: RawFd, offs: off_t,
- buf: *const c_void, len: usize,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = len;
- // casting a const ptr to a mutable ptr here is ok, because we set the
- // AioCb's mutable field to false
- a.0.aio_buf = buf as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn set_in_progress(mut self: Pin<&mut Self>) {
+ self.as_mut().in_progress = true;
}
- // Private helper
- fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb
- {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- // casting an immutable buffer to a mutable pointer looks unsafe,
- // but technically its only unsafe to dereference it, not to create
- // it.
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- }
+ /// Update the notification settings for an existing AIO operation that has
+ /// not yet been submitted.
+ // Takes a normal reference rather than a pinned one because this method is
+ // normally called before the object needs to be pinned, that is, before
+ // it's been submitted to the kernel.
+ fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
+ assert!(
+ !self.in_progress,
+ "Can't change notification settings for an in-progress operation"
+ );
+ self.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
+}
- /// Like [`AioCb::from_mut_slice`], but works on constant slices rather than
- /// mutable slices.
- ///
- /// An `AioCb` created this way cannot be used with `read`, and its
- /// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when
- /// writing a const buffer with `AioCb::write`, since `from_mut_slice` can't
- /// work with const buffers.
- ///
- /// # Examples
- ///
- /// Construct an `AioCb` from a slice and use it for writing.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
- /// ```
- // Note: another solution to the problem of writing const buffers would be
- // to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read
- // could take the former and AioCb::write could take the latter. However,
- // then lio_listio wouldn't work, because that function needs a slice of
- // AioCb, and they must all be of the same type.
- pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb>>
- {
- Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify,
- opcode))
+impl Debug for AioCb {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("AioCb")
+ .field("aiocb", &self.aiocb.0)
+ .field("in_progress", &self.in_progress)
+ .finish()
}
+}
- fn common_init(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> LibcAiocb {
- // Use mem::zeroed instead of explicitly zeroing each field, because the
- // number and name of reserved fields is OS-dependent. On some OSes,
- // some reserved fields are used the kernel for state, and must be
- // explicitly zeroed when allocated.
- let mut a = unsafe { mem::zeroed::<libc::aiocb>()};
- a.aio_fildes = fd;
- a.aio_reqprio = prio;
- a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
- LibcAiocb(a)
+impl Drop for AioCb {
+ /// If the `AioCb` has no remaining state in the kernel, just drop it.
+ /// Otherwise, dropping constitutes a resource leak, which is an error
+ fn drop(&mut self) {
+ assert!(
+ thread::panicking() || !self.in_progress,
+ "Dropped an in-progress AioCb"
+ );
}
+}
- /// Update the notification settings for an existing `aiocb`
- pub fn set_sigev_notify(self: &mut Pin<Box<Self>>,
- sigev_notify: SigevNotify)
- {
- // Safe because we don't move any of the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
- }
+/// Methods common to all AIO operations
+pub trait Aio {
+ /// The return type of [`Aio::aio_return`].
+ type Output;
+
+ /// Retrieve return status of an asynchronous operation.
+ ///
+ /// Should only be called once for each operation, after [`Aio::error`]
+ /// indicates that it has completed. The result is the same as for the
+ /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions.
+ ///
+ /// # References
+ ///
+ /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html)
+ fn aio_return(self: Pin<&mut Self>) -> Result<Self::Output>;
/// Cancels an outstanding AIO request.
///
@@ -477,51 +251,26 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+ /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// &wbuf[..],
/// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// let cs = aiocb.cancel().unwrap();
+ /// SigevNotify::SigevNone));
+ /// aiocb.as_mut().submit().unwrap();
+ /// let cs = aiocb.as_mut().cancel().unwrap();
/// if cs == AioCancelStat::AioNotCanceled {
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+ /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
/// }
/// // Must call `aio_return`, but ignore the result
- /// let _ = aiocb.aio_return();
+ /// let _ = aiocb.as_mut().aio_return();
/// ```
///
/// # References
///
/// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html)
- pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> {
- let r = unsafe {
- let selfp = self.as_mut().get_unchecked_mut();
- libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0)
- };
- match r {
- libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
- libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
- libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
- -1 => Err(Errno::last()),
- _ => panic!("unknown aio_cancel return value")
- }
- }
-
- fn error_unpinned(&mut self) -> Result<()> {
- let r = unsafe {
- libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb)
- };
- match r {
- 0 => Ok(()),
- num if num > 0 => Err(Errno::from_i32(num)),
- -1 => Err(Errno::last()),
- num => panic!("unknown aio_error return value {:?}", num)
- }
- }
+ fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat>;
/// Retrieve error status of an asynchronous operation.
///
@@ -543,60 +292,265 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+ /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+ /// SigevNotify::SigevNone));
+ /// aiocb.as_mut().submit().unwrap();
+ /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
+ /// assert_eq!(aiocb.as_mut().aio_return().unwrap(), WBUF.len());
/// ```
///
/// # References
///
/// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html)
- pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> {
- // Safe because error_unpinned doesn't move the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.error_unpinned()
- }
+ fn error(self: Pin<&mut Self>) -> Result<()>;
+
+ /// Returns the underlying file descriptor associated with the operation.
+ fn fd(&self) -> RawFd;
- /// An asynchronous version of `fsync(2)`.
+ /// Does this operation currently have any in-kernel state?
///
- /// # References
+ /// Dropping an operation that does have in-kernel state constitutes a
+ /// resource leak.
///
- /// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html)
- pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> {
- // Safe because we don't move the libc::aiocb
- unsafe {
- let selfp = self.as_mut().get_unchecked_mut();
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- libc::aio_fsync(mode as libc::c_int, p)
- }).map(|_| {
- selfp.in_progress = true;
+ /// # Examples
+ ///
+ /// ```
+ /// # use nix::errno::Errno;
+ /// # use nix::Error;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify::SigevNone;
+ /// # use std::{thread, time};
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use tempfile::tempfile;
+ /// let f = tempfile().unwrap();
+ /// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC,
+ /// 0, SigevNone));
+ /// assert!(!aiof.as_mut().in_progress());
+ /// aiof.as_mut().submit().expect("aio_fsync failed early");
+ /// assert!(aiof.as_mut().in_progress());
+ /// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) {
+ /// thread::sleep(time::Duration::from_millis(10));
+ /// }
+ /// aiof.as_mut().aio_return().expect("aio_fsync failed late");
+ /// assert!(!aiof.as_mut().in_progress());
+ /// ```
+ fn in_progress(&self) -> bool;
+
+ /// Returns the priority of the `AioCb`
+ fn priority(&self) -> i32;
+
+ /// Update the notification settings for an existing AIO operation that has
+ /// not yet been submitted.
+ fn set_sigev_notify(&mut self, sev: SigevNotify);
+
+ /// Returns the `SigEvent` that will be used for notification.
+ fn sigevent(&self) -> SigEvent;
+
+ /// Actually start the I/O operation.
+ ///
+ /// After calling this method and until [`Aio::aio_return`] returns `Ok`,
+ /// the structure may not be moved in memory.
+ fn submit(self: Pin<&mut Self>) -> Result<()>;
+}
+
+macro_rules! aio_methods {
+ () => {
+ fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat> {
+ self.aiocb().cancel()
+ }
+
+ fn error(self: Pin<&mut Self>) -> Result<()> {
+ self.aiocb().error()
+ }
+
+ fn fd(&self) -> RawFd {
+ self.aiocb.aiocb.0.aio_fildes
+ }
+
+ fn in_progress(&self) -> bool {
+ self.aiocb.in_progress()
+ }
+
+ fn priority(&self) -> i32 {
+ self.aiocb.aiocb.0.aio_reqprio
+ }
+
+ fn set_sigev_notify(&mut self, sev: SigevNotify) {
+ self.aiocb.set_sigev_notify(sev)
+ }
+
+ fn sigevent(&self) -> SigEvent {
+ SigEvent::from(&self.aiocb.aiocb.0.aio_sigevent)
+ }
+ };
+ ($func:ident) => {
+ aio_methods!();
+
+ fn aio_return(self: Pin<&mut Self>) -> Result<<Self as Aio>::Output> {
+ self.aiocb().aio_return()
+ }
+
+ fn submit(mut self: Pin<&mut Self>) -> Result<()> {
+ let p: *mut libc::aiocb = &mut self.as_mut().aiocb().aiocb.0;
+ Errno::result({ unsafe { libc::$func(p) } }).map(|_| {
+ self.aiocb().set_in_progress();
})
}
+ };
+}
+
+/// An asynchronous version of `fsync(2)`.
+///
+/// # References
+///
+/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html)
+/// # Examples
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify::SigevNone;
+/// # use std::{thread, time};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// let f = tempfile().unwrap();
+/// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC,
+/// 0, SigevNone));
+/// aiof.as_mut().submit().expect("aio_fsync failed early");
+/// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// aiof.as_mut().aio_return().expect("aio_fsync failed late");
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioFsync {
+ aiocb: AioCb,
+ _pin: PhantomPinned,
+}
+
+impl AioFsync {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the operation's fsync mode: data and metadata or data only?
+ pub fn mode(&self) -> AioFsyncMode {
+ AioFsyncMode::try_from(self.aiocb.aiocb.0.aio_lio_opcode).unwrap()
}
- /// Returns the `aiocb`'s `LioOpcode` field
+ /// Create a new `AioFsync`.
///
- /// If the value cannot be represented as an `LioOpcode`, returns `None`
- /// instead.
- pub fn lio_opcode(&self) -> Option<LioOpcode> {
- match self.aiocb.0.aio_lio_opcode {
- libc::LIO_READ => Some(LioOpcode::LIO_READ),
- libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE),
- libc::LIO_NOP => Some(LioOpcode::LIO_NOP),
- _ => None
+ /// # Arguments
+ ///
+ /// * `fd`: File descriptor to sync.
+ /// * `mode`: Whether to sync file metadata too, or just data.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`.
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ mode: AioFsyncMode,
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // To save some memory, store mode in an unused field of the AioCb.
+ // True it isn't very much memory, but downstream creates will likely
+ // create an enum containing this and other AioCb variants and pack
+ // those enums into data structures like Vec, so it adds up.
+ aiocb.aiocb.0.aio_lio_opcode = mode as libc::c_int;
+ AioFsync {
+ aiocb,
+ _pin: PhantomPinned,
}
}
+}
+
+impl Aio for AioFsync {
+ type Output = ();
+
+ aio_methods!();
+
+ fn aio_return(self: Pin<&mut Self>) -> Result<()> {
+ self.aiocb().aio_return().map(drop)
+ }
+
+ fn submit(mut self: Pin<&mut Self>) -> Result<()> {
+ let aiocb = &mut self.as_mut().aiocb().aiocb.0;
+ let mode = mem::replace(&mut aiocb.aio_lio_opcode, 0);
+ let p: *mut libc::aiocb = aiocb;
+ Errno::result(unsafe { libc::aio_fsync(mode, p) }).map(|_| {
+ self.aiocb().set_in_progress();
+ })
+ }
+}
+
+// AioFsync does not need AsMut, since it can't be used with lio_listio
+
+impl AsRef<libc::aiocb> for AioFsync {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
+ }
+}
+
+/// Asynchronously reads from a file descriptor into a buffer
+///
+/// # References
+///
+/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html)
+///
+/// # Examples
+///
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::Write;
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const INITIAL: &[u8] = b"abcdef123456";
+/// const LEN: usize = 4;
+/// let mut rbuf = vec![0; LEN];
+/// let mut f = tempfile().unwrap();
+/// f.write_all(INITIAL).unwrap();
+/// {
+/// let mut aior = Box::pin(
+/// AioRead::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &mut rbuf,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aior.as_mut().submit().unwrap();
+/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aior.as_mut().aio_return().unwrap(), LEN);
+/// }
+/// assert_eq!(rbuf, b"cdef");
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioRead<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [u8]>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> AioRead<'a> {
+ unsafe_pinned!(aiocb: AioCb);
/// Returns the requested length of the aio operation in bytes
///
@@ -604,85 +558,418 @@ impl<'a> AioCb<'a> {
/// number of bytes actually read or written by a completed operation, use
/// `aio_return` instead.
pub fn nbytes(&self) -> usize {
- self.aiocb.0.aio_nbytes
+ self.aiocb.aiocb.0.aio_nbytes
}
- /// Returns the file offset stored in the `AioCb`
+ /// Create a new `AioRead`, placing the data in a mutable slice.
+ ///
+ /// # Arguments
+ ///
+ /// * `fd`: File descriptor to read from
+ /// * `offs`: File offset
+ /// * `buf`: A memory buffer. It must outlive the `AioRead`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ buf: &'a mut [u8],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ aiocb.aiocb.0.aio_nbytes = buf.len();
+ aiocb.aiocb.0.aio_buf = buf.as_mut_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READ;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioRead {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
pub fn offset(&self) -> off_t {
- self.aiocb.0.aio_offset
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+impl<'a> Aio for AioRead<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_read);
+}
+
+impl<'a> AsMut<libc::aiocb> for AioRead<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
}
+}
- /// Returns the priority of the `AioCb`
- pub fn priority(&self) -> libc::c_int {
- self.aiocb.0.aio_reqprio
+impl<'a> AsRef<libc::aiocb> for AioRead<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
+}
- /// Asynchronously reads from a file descriptor into a buffer
+/// Asynchronously reads from a file descriptor into a scatter/gather list of buffers.
+///
+/// # References
+///
+/// [aio_readv](https://www.freebsd.org/cgi/man.cgi?query=aio_readv)
+///
+/// # Examples
+///
+///
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::{IoSliceMut, Write};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const INITIAL: &[u8] = b"abcdef123456";
+/// let mut rbuf0 = vec![0; 4];
+/// let mut rbuf1 = vec![0; 2];
+/// let expected_len = rbuf0.len() + rbuf1.len();
+/// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
+/// let mut f = tempfile().unwrap();
+/// f.write_all(INITIAL).unwrap();
+/// {
+/// let mut aior = Box::pin(
+/// AioReadv::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &mut rbufs,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aior.as_mut().submit().unwrap();
+/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aior.as_mut().aio_return().unwrap(), expected_len);
+/// }
+/// assert_eq!(rbuf0, b"cdef");
+/// assert_eq!(rbuf1, b"12");
+/// ```
+#[cfg(target_os = "freebsd")]
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioReadv<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [&'a [u8]]>,
+ _pin: PhantomPinned,
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AioReadv<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the number of buffers the operation will read into.
+ pub fn iovlen(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
+ }
+
+ /// Create a new `AioReadv`, placing the data in a list of mutable slices.
///
- /// # References
+ /// # Arguments
///
- /// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html)
- pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> {
- assert!(self.mutable, "Can't read into an immutable buffer");
- // Safe because we don't move anything
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- unsafe { libc::aio_read(p) }
- }).map(|_| {
- selfp.in_progress = true;
- })
+ /// * `fd`: File descriptor to read from
+ /// * `offs`: File offset
+ /// * `bufs`: A scatter/gather list of memory buffers. They must
+ /// outlive the `AioReadv`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ bufs: &mut [IoSliceMut<'a>],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // In vectored mode, aio_nbytes stores the length of the iovec array,
+ // not the byte count.
+ aiocb.aiocb.0.aio_nbytes = bufs.len();
+ aiocb.aiocb.0.aio_buf = bufs.as_mut_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READV;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioReadv {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
}
- /// Returns the `SigEvent` stored in the `AioCb`
- pub fn sigevent(&self) -> SigEvent {
- SigEvent::from(&self.aiocb.0.aio_sigevent)
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
}
+}
- fn aio_return_unpinned(&mut self) -> Result<isize> {
- unsafe {
- let p: *mut libc::aiocb = &mut self.aiocb.0;
- self.in_progress = false;
- Errno::result(libc::aio_return(p))
- }
+#[cfg(target_os = "freebsd")]
+impl<'a> Aio for AioReadv<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_readv);
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsMut<libc::aiocb> for AioReadv<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsRef<libc::aiocb> for AioReadv<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
+}
- /// Retrieve return status of an asynchronous operation.
+/// Asynchronously writes from a buffer to a file descriptor
+///
+/// # References
+///
+/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html)
+///
+/// # Examples
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(
+/// AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// WBUF,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aiow.as_mut().submit().unwrap();
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioWrite<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [u8]>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> AioWrite<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the requested length of the aio operation in bytes
///
- /// Should only be called once for each `AioCb`, after `AioCb::error`
- /// indicates that it has completed. The result is the same as for the
- /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions.
+ /// This method returns the *requested* length of the operation. To get the
+ /// number of bytes actually read or written by a completed operation, use
+ /// `aio_return` instead.
+ pub fn nbytes(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
+ }
+
+ /// Construct a new `AioWrite`.
///
- /// # References
+ /// # Arguments
///
- /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html)
- // Note: this should be just `return`, but that's a reserved word
- pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> {
- // Safe because aio_return_unpinned does not move the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.aio_return_unpinned()
+ /// * `fd`: File descriptor to write to
+ /// * `offs`: File offset
+ /// * `buf`: A memory buffer. It must outlive the `AioWrite`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ buf: &'a [u8],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ aiocb.aiocb.0.aio_nbytes = buf.len();
+ // casting an immutable buffer to a mutable pointer looks unsafe,
+ // but technically its only unsafe to dereference it, not to create
+ // it. Type Safety guarantees that we'll never pass aiocb to
+ // aio_read or aio_readv.
+ aiocb.aiocb.0.aio_buf = buf.as_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITE;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioWrite {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+impl<'a> Aio for AioWrite<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_write);
+}
+
+impl<'a> AsMut<libc::aiocb> for AioWrite<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+impl<'a> AsRef<libc::aiocb> for AioWrite<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
+ }
+}
+
+/// Asynchronously writes from a scatter/gather list of buffers to a file descriptor.
+///
+/// # References
+///
+/// [aio_writev](https://www.freebsd.org/cgi/man.cgi?query=aio_writev)
+///
+/// # Examples
+///
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::IoSlice;
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const wbuf0: &[u8] = b"abcdef";
+/// const wbuf1: &[u8] = b"123456";
+/// let len = wbuf0.len() + wbuf1.len();
+/// let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(
+/// AioWritev::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &wbufs,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aiow.as_mut().submit().unwrap();
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), len);
+/// ```
+#[cfg(target_os = "freebsd")]
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioWritev<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [&'a [u8]]>,
+ _pin: PhantomPinned,
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AioWritev<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the number of buffers the operation will read into.
+ pub fn iovlen(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
}
- /// Asynchronously writes from a buffer to a file descriptor
+ /// Construct a new `AioWritev`.
///
- /// # References
+ /// # Arguments
///
- /// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html)
- pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> {
- // Safe because we don't move anything
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- unsafe{ libc::aio_write(p) }
- }).map(|_| {
- selfp.in_progress = true;
- })
+ /// * `fd`: File descriptor to write to
+ /// * `offs`: File offset
+ /// * `bufs`: A scatter/gather list of memory buffers. They must
+ /// outlive the `AioWritev`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ bufs: &[IoSlice<'a>],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // In vectored mode, aio_nbytes stores the length of the iovec array,
+ // not the byte count.
+ aiocb.aiocb.0.aio_nbytes = bufs.len();
+ // casting an immutable buffer to a mutable pointer looks unsafe,
+ // but technically its only unsafe to dereference it, not to create
+ // it. Type Safety guarantees that we'll never pass aiocb to
+ // aio_read or aio_readv.
+ aiocb.aiocb.0.aio_buf = bufs.as_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITEV;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioWritev {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> Aio for AioWritev<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_writev);
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsMut<libc::aiocb> for AioWritev<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsRef<libc::aiocb> for AioWritev<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
}
@@ -704,38 +991,37 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// &wbuf[..],
/// 0, //priority
-/// SigevNotify::SigevNone,
-/// LioOpcode::LIO_NOP);
-/// aiocb.write().unwrap();
+/// SigevNotify::SigevNone));
+/// aiocb.as_mut().submit().unwrap();
/// let cs = aio_cancel_all(f.as_raw_fd()).unwrap();
/// if cs == AioCancelStat::AioNotCanceled {
-/// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+/// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
/// }
/// // Must call `aio_return`, but ignore the result
-/// let _ = aiocb.aio_return();
+/// let _ = aiocb.as_mut().aio_return();
/// ```
///
/// # References
///
/// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html)
pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
- match unsafe { libc::aio_cancel(fd, null_mut()) } {
+ match unsafe { libc::aio_cancel(fd, ptr::null_mut()) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Errno::last()),
- _ => panic!("unknown aio_cancel return value")
+ _ => panic!("unknown aio_cancel return value"),
}
}
-/// Suspends the calling process until at least one of the specified `AioCb`s
-/// has completed, a signal is delivered, or the timeout has passed.
+/// Suspends the calling process until at least one of the specified operations
+/// have completed, a signal is delivered, or the timeout has passed.
///
/// If `timeout` is `None`, `aio_suspend` will block indefinitely.
///
@@ -750,380 +1036,209 @@ pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
/// # use tempfile::tempfile;
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
-/// SigevNotify::SigevNone,
-/// LioOpcode::LIO_NOP);
-/// aiocb.write().unwrap();
-/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed");
-/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
+/// SigevNotify::SigevNone));
+/// aiocb.as_mut().submit().unwrap();
+/// aio_suspend(&[&*aiocb], None).expect("aio_suspend failed");
+/// assert_eq!(aiocb.as_mut().aio_return().unwrap() as usize, WBUF.len());
/// ```
/// # References
///
/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html)
-pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> {
- let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb];
- let p = plist as *const *const libc::aiocb;
+pub fn aio_suspend(
+ list: &[&dyn AsRef<libc::aiocb>],
+ timeout: Option<TimeSpec>,
+) -> Result<()> {
+ let p = list as *const [&dyn AsRef<libc::aiocb>]
+ as *const [*const libc::aiocb]
+ as *const *const libc::aiocb;
let timep = match timeout {
- None => null::<libc::timespec>(),
- Some(x) => x.as_ref() as *const libc::timespec
+ None => ptr::null::<libc::timespec>(),
+ Some(x) => x.as_ref() as *const libc::timespec,
};
- Errno::result(unsafe {
- libc::aio_suspend(p, list.len() as i32, timep)
- }).map(drop)
-}
-
-impl<'a> Debug for AioCb<'a> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_struct("AioCb")
- .field("aiocb", &self.aiocb.0)
- .field("mutable", &self.mutable)
- .field("in_progress", &self.in_progress)
- .finish()
- }
+ Errno::result(unsafe { libc::aio_suspend(p, list.len() as i32, timep) })
+ .map(drop)
}
-impl<'a> Drop for AioCb<'a> {
- /// If the `AioCb` has no remaining state in the kernel, just drop it.
- /// Otherwise, dropping constitutes a resource leak, which is an error
- fn drop(&mut self) {
- assert!(thread::panicking() || !self.in_progress,
- "Dropped an in-progress AioCb");
- }
-}
-
-/// LIO Control Block.
+/// Submits multiple asynchronous I/O requests with a single system call.
///
-/// The basic structure used to issue multiple AIO operations simultaneously.
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-pub struct LioCb<'a> {
- /// A collection of [`AioCb`]s. All of these will be issued simultaneously
- /// by the [`listio`] method.
- ///
- /// [`AioCb`]: struct.AioCb.html
- /// [`listio`]: #method.listio
- // Their locations in memory must be fixed once they are passed to the
- // kernel. So this field must be non-public so the user can't swap.
- aiocbs: Box<[AioCb<'a>]>,
-
- /// The actual list passed to `libc::lio_listio`.
- ///
- /// It must live for as long as any of the operations are still being
- /// processesed, because the aio subsystem uses its address as a unique
- /// identifier.
- list: Vec<*mut libc::aiocb>,
-
- /// A partial set of results. This field will get populated by
- /// `listio_resubmit` when an `LioCb` is resubmitted after an error
- results: Vec<Option<Result<isize>>>
-}
-
-/// LioCb can't automatically impl Send and Sync just because of the raw
-/// pointers in list. But that's stupid. There's no reason that raw pointers
-/// should automatically be non-Send
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-unsafe impl<'a> Send for LioCb<'a> {}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-unsafe impl<'a> Sync for LioCb<'a> {}
-
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-impl<'a> LioCb<'a> {
- /// Are no [`AioCb`]s contained?
- pub fn is_empty(&self) -> bool {
- self.aiocbs.is_empty()
- }
-
- /// Return the number of individual [`AioCb`]s contained.
- pub fn len(&self) -> usize {
- self.aiocbs.len()
- }
-
- /// Submits multiple asynchronous I/O requests with a single system call.
- ///
- /// They are not guaranteed to complete atomically, and the order in which
- /// the requests are carried out is not specified. Reads, writes, and
- /// fsyncs may be freely mixed.
- ///
- /// This function is useful for reducing the context-switch overhead of
- /// submitting many AIO operations. It can also be used with
- /// `LioMode::LIO_WAIT` to block on the result of several independent
- /// operations. Used that way, it is often useful in programs that
- /// otherwise make little use of AIO.
- ///
- /// # Examples
- ///
- /// Use `listio` to submit an aio operation and wait for its completion. In
- /// this case, there is no need to use [`aio_suspend`] to wait or
- /// [`AioCb::error`] to poll.
- ///
- /// ```
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCbBuilder::with_capacity(1)
- /// .emplace_slice(
- /// f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE
- /// ).finish();
- /// liocb.listio(LioMode::LIO_WAIT,
- /// SigevNotify::SigevNone).unwrap();
- /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- /// ```
- ///
- /// # References
- ///
- /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
- ///
- /// [`aio_suspend`]: fn.aio_suspend.html
- /// [`AioCb::error`]: struct.AioCb.html#method.error
- pub fn listio(&mut self, mode: LioMode,
- sigev_notify: SigevNotify) -> Result<()> {
- let sigev = SigEvent::new(sigev_notify);
- let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
- self.list.clear();
- for a in &mut self.aiocbs.iter_mut() {
- a.in_progress = true;
- self.list.push(a as *mut AioCb<'a>
- as *mut libc::aiocb);
- }
- let p = self.list.as_ptr();
- Errno::result(unsafe {
- libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
- }).map(drop)
- }
-
- /// Resubmits any incomplete operations with [`lio_listio`].
- ///
- /// Sometimes, due to system resource limitations, an `lio_listio` call will
- /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
- /// `EINTR`. In any of these cases, only a subset of its constituent
- /// operations will actually have been initiated. `listio_resubmit` will
- /// resubmit any operations that are still uninitiated.
- ///
- /// After calling `listio_resubmit`, results should be collected by
- /// [`LioCb::aio_return`].
- ///
- /// # Examples
- /// ```no_run
- /// # use nix::Error;
- /// # use nix::errno::Errno;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::os::unix::io::AsRawFd;
- /// # use std::{thread, time};
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCbBuilder::with_capacity(1)
- /// .emplace_slice(
- /// f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE
- /// ).finish();
- /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
- /// while err == Err(Errno::EIO) ||
- /// err == Err(Errno::EAGAIN) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
- /// }
- /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- /// ```
- ///
- /// # References
- ///
- /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
- ///
- /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
- /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return
- // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
- // changed by this method, because the kernel relies on their addresses
- // being stable.
- // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
- // sigev_notify will immediately refire.
- pub fn listio_resubmit(&mut self, mode:LioMode,
- sigev_notify: SigevNotify) -> Result<()> {
- let sigev = SigEvent::new(sigev_notify);
- let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
- self.list.clear();
-
- while self.results.len() < self.aiocbs.len() {
- self.results.push(None);
- }
-
- for (i, a) in self.aiocbs.iter_mut().enumerate() {
- if self.results[i].is_some() {
- // Already collected final status for this operation
- continue;
- }
- match a.error_unpinned() {
- Ok(()) => {
- // aiocb is complete; collect its status and don't resubmit
- self.results[i] = Some(a.aio_return_unpinned());
- },
- Err(Errno::EAGAIN) => {
- self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
- },
- Err(Errno::EINPROGRESS) => {
- // aiocb is was successfully queued; no need to do anything
- },
- Err(Errno::EINVAL) => panic!(
- "AioCb was never submitted, or already finalized"),
- _ => unreachable!()
- }
- }
- let p = self.list.as_ptr();
- Errno::result(unsafe {
- libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
- }).map(drop)
- }
-
- /// Collect final status for an individual `AioCb` submitted as part of an
- /// `LioCb`.
- ///
- /// This is just like [`AioCb::aio_return`], except it takes into account
- /// operations that were restarted by [`LioCb::listio_resubmit`]
- ///
- /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return
- /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
- pub fn aio_return(&mut self, i: usize) -> Result<isize> {
- if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].aio_return_unpinned()
- } else {
- self.results[i].unwrap()
- }
- }
-
- /// Retrieve error status of an individual `AioCb` submitted as part of an
- /// `LioCb`.
- ///
- /// This is just like [`AioCb::error`], except it takes into account
- /// operations that were restarted by [`LioCb::listio_resubmit`]
- ///
- /// [`AioCb::error`]: struct.AioCb.html#method.error
- /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
- pub fn error(&mut self, i: usize) -> Result<()> {
- if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].error_unpinned()
- } else {
- Ok(())
- }
- }
-}
-
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-impl<'a> Debug for LioCb<'a> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_struct("LioCb")
- .field("aiocbs", &self.aiocbs)
- .finish()
- }
-}
-
-/// Used to construct `LioCb`
-// This must be a separate class from LioCb due to pinning constraints. LioCb
-// must use a boxed slice of AioCbs so they will have stable storage, but
-// LioCbBuilder must use a Vec to make construction possible when the final size
-// is unknown.
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-#[derive(Debug)]
-pub struct LioCbBuilder<'a> {
- /// A collection of [`AioCb`]s.
- ///
- /// [`AioCb`]: struct.AioCb.html
- pub aiocbs: Vec<AioCb<'a>>,
+/// They are not guaranteed to complete atomically, and the order in which the
+/// requests are carried out is not specified. Reads, and writes may be freely
+/// mixed.
+///
+/// # Examples
+///
+/// Use `lio_listio` to submit an aio operation and wait for its completion. In
+/// this case, there is no need to use aio_suspend to wait or `error` to poll.
+/// This mode is useful for otherwise-synchronous programs that want to execute
+/// a handful of I/O operations in parallel.
+/// ```
+/// # use std::os::unix::io::AsRawFd;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// lio_listio(LioMode::LIO_WAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone)
+/// .unwrap();
+/// // At this point, we are guaranteed that aiow is complete.
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+///
+/// Use `lio_listio` to submit multiple asynchronous operations with a single
+/// syscall, but receive notification individually. This is an efficient
+/// technique for reducing overall context-switch overhead, especially when
+/// combined with kqueue.
+/// ```
+/// # use std::os::unix::io::AsRawFd;
+/// # use std::thread;
+/// # use std::time;
+/// # use nix::errno::Errno;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone)
+/// .unwrap();
+/// // We must wait for the completion of each individual operation
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+///
+/// Use `lio_listio` to submit multiple operations, and receive notification
+/// only when all of them are complete. This can be useful when there is some
+/// logical relationship between the operations. But beware! Errors or system
+/// resource limitations may cause `lio_listio` to return `EIO`, `EAGAIN`, or
+/// `EINTR`, in which case some but not all operations may have been submitted.
+/// In that case, you must check the status of each individual operation, and
+/// possibly resubmit some.
+/// ```
+/// # use libc::c_int;
+/// # use std::os::unix::io::AsRawFd;
+/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::thread;
+/// # use std::time;
+/// # use lazy_static::lazy_static;
+/// # use nix::errno::Errno;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::*;
+/// # use tempfile::tempfile;
+/// lazy_static! {
+/// pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
+/// }
+///
+/// extern fn sigfunc(_: c_int) {
+/// SIGNALED.store(true, Ordering::Relaxed);
+/// }
+/// let sa = SigAction::new(SigHandler::Handler(sigfunc),
+/// SaFlags::SA_RESETHAND,
+/// SigSet::empty());
+/// SIGNALED.store(false, Ordering::Relaxed);
+/// unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
+///
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// let sev = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, si_value: 0 };
+/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], sev).unwrap();
+/// while !SIGNALED.load(Ordering::Relaxed) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// // At this point, since `lio_listio` returned success and delivered its
+/// // notification, we know that all operations are complete.
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+pub fn lio_listio(
+ mode: LioMode,
+ list: &mut [Pin<&mut dyn AsMut<libc::aiocb>>],
+ sigev_notify: SigevNotify,
+) -> Result<()> {
+ let p = list as *mut [Pin<&mut dyn AsMut<libc::aiocb>>]
+ as *mut [*mut libc::aiocb]
+ as *mut *mut libc::aiocb;
+ let sigev = SigEvent::new(sigev_notify);
+ let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
+ Errno::result(unsafe {
+ libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
+ })
+ .map(drop)
}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-impl<'a> LioCbBuilder<'a> {
- /// Initialize an empty `LioCb`
- pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> {
- LioCbBuilder {
- aiocbs: Vec::with_capacity(capacity),
- }
- }
+#[cfg(test)]
+mod t {
+ use super::*;
- /// Add a new operation on an immutable slice to the [`LioCb`] under
- /// construction.
- ///
- /// Arguments are the same as for [`AioCb::from_slice`]
- ///
- /// [`LioCb`]: struct.LioCb.html
- /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice
- #[must_use]
- pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Self
- {
- self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio,
- sigev_notify, opcode));
- self
- }
+ /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb
+ /// pointers. This test ensures that such casts are valid.
+ #[test]
+ fn casting() {
+ let sev = SigevNotify::SigevNone;
+ let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev);
+ assert_eq!(
+ aiof.as_ref() as *const libc::aiocb,
+ &aiof as *const AioFsync as *const libc::aiocb
+ );
- /// Add a new operation on a mutable slice to the [`LioCb`] under
- /// construction.
- ///
- /// Arguments are the same as for [`AioCb::from_mut_slice`]
- ///
- /// [`LioCb`]: struct.LioCb.html
- /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice
- #[must_use]
- pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t,
- buf: &'a mut [u8], prio: libc::c_int,
- sigev_notify: SigevNotify, opcode: LioOpcode)
- -> Self
- {
- self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio,
- sigev_notify, opcode));
- self
- }
+ let mut rbuf = [];
+ let aior = AioRead::new(666, 0, &mut rbuf, 0, sev);
+ assert_eq!(
+ aior.as_ref() as *const libc::aiocb,
+ &aior as *const AioRead as *const libc::aiocb
+ );
- /// Finalize this [`LioCb`].
- ///
- /// Afterwards it will be possible to issue the operations with
- /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new
- /// operations with [`LioCbBuilder::emplace_slice`] or
- /// [`LioCbBuilder::emplace_mut_slice`].
- ///
- /// [`LioCb::listio`]: struct.LioCb.html#method.listio
- /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice
- /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice
- pub fn finish(self) -> LioCb<'a> {
- let len = self.aiocbs.len();
- LioCb {
- aiocbs: self.aiocbs.into(),
- list: Vec::with_capacity(len),
- results: Vec::with_capacity(len)
- }
+ let wbuf = [];
+ let aiow = AioWrite::new(666, 0, &wbuf, 0, sev);
+ assert_eq!(
+ aiow.as_ref() as *const libc::aiocb,
+ &aiow as *const AioWrite as *const libc::aiocb
+ );
}
-}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg(test)]
-mod t {
- use super::*;
-
- // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on
- // it.
+ #[cfg(target_os = "freebsd")]
#[test]
- fn liocb_is_unpin() {
- use assert_impl::assert_impl;
+ fn casting_vectored() {
+ let sev = SigevNotify::SigevNone;
+
+ let mut rbuf = [];
+ let mut rbufs = [IoSliceMut::new(&mut rbuf)];
+ let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev);
+ assert_eq!(
+ aiorv.as_ref() as *const libc::aiocb,
+ &aiorv as *const AioReadv as *const libc::aiocb
+ );
- assert_impl!(Unpin: LioCb);
+ let wbuf = [];
+ let wbufs = [IoSlice::new(&wbuf)];
+ let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev);
+ assert_eq!(
+ aiowv.as_ref() as *const libc::aiocb,
+ &aiowv as *const AioWritev as *const libc::aiocb
+ );
}
}