summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.cirrus.yml13
-rw-r--r--CHANGELOG.md16
-rw-r--r--Cargo.toml9
-rw-r--r--bors.toml3
-rw-r--r--src/fcntl.rs8
-rw-r--r--src/sys/aio.rs1801
-rw-r--r--test/sys/test_aio.rs1086
-rw-r--r--test/sys/test_aio_drop.rs13
-rw-r--r--test/sys/test_lio_listio_resubmit.rs106
9 files changed, 1550 insertions, 1505 deletions
diff --git a/.cirrus.yml b/.cirrus.yml
index 1045f4f5..231cc38f 100644
--- a/.cirrus.yml
+++ b/.cirrus.yml
@@ -36,11 +36,18 @@ test: &TEST
# 64-bit kernel and in a 64-bit environment. Our tests don't execute any of
# the system's binaries, so the environment shouldn't matter.
task:
- name: FreeBSD amd64 & i686
env:
TARGET: x86_64-unknown-freebsd
- freebsd_instance:
- image: freebsd-12-3-release-amd64
+ matrix:
+ - name: FreeBSD 12 amd64 & i686
+ freebsd_instance:
+ image: freebsd-12-3-release-amd64
+ - name: FreeBSD 14 amd64 & i686
+ freebsd_instance:
+ image_family: freebsd-14-0-snap
+ # Enable tests that would fail on FreeBSD 12
+ RUSTFLAGS: --cfg fbsd14 -D warnings
+ RUSTDOCFLAGS: --cfg fbsd14
setup_script:
- kldload mqueuefs
- fetch https://sh.rustup.rs -o rustup.sh
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8e9baf90..de4d22e2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,14 +6,30 @@ This project adheres to [Semantic Versioning](https://semver.org/).
## [Unreleased] - ReleaseDate
### Added
+- Added `aio_writev` and `aio_readv`.
+ (#[1713](https://github.com/nix-rust/nix/pull/1713))
+
- impl From<SockaddrIn> for std::net::SocketAddrV4 and
impl From<SockaddrIn6> for std::net::SocketAddrV6.
(#[1711](https://github.com/nix-rust/nix/pull/1711))
### Changed
+
+- Rewrote the aio module. The new module:
+ * Does more type checking at compile time rather than runtime.
+ * Gives the caller control over whether and when to `Box` an aio operation.
+ * Changes the type of the `priority` arguments to `i32`.
+ * Changes the return type of `aio_return` to `usize`.
+ (#[1713](https://github.com/nix-rust/nix/pull/1713))
+
### Fixed
### Removed
+- Removed support for resubmitting partially complete `lio_listio` operations.
+ It was too complicated, and didn't fit Nix's theme of zero-cost abstractions.
+ Instead, it can be reimplemented downstream.
+ (#[1713](https://github.com/nix-rust/nix/pull/1713))
+
## [0.24.1] - 2022-04-22
### Added
### Changed
diff --git a/Cargo.toml b/Cargo.toml
index 628759ea..ebd9182a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,10 @@ targets = [
]
[dependencies]
-libc = { version = "0.2.124", features = [ "extra_traits" ] }
+libc = { git = "http://github.com/rust-lang/libc.git", rev = "cd99f681181c310abfba742aef11115d2eff03dc", features = [ "extra_traits" ] }
bitflags = "1.1"
cfg-if = "1.0"
+pin-utils = { version = "0.1.0", optional = true }
[target.'cfg(not(target_os = "redox"))'.dependencies]
memoffset = { version = "0.6.3", optional = true }
@@ -44,7 +45,7 @@ default = [
]
acct = []
-aio = []
+aio = ["pin-utils"]
dir = ["fs"]
env = []
event = []
@@ -103,10 +104,6 @@ name = "test-clearenv"
path = "test/test_clearenv.rs"
[[test]]
-name = "test-lio-listio-resubmit"
-path = "test/sys/test_lio_listio_resubmit.rs"
-
-[[test]]
name = "test-mount"
path = "test/test_mount.rs"
harness = false
diff --git a/bors.toml b/bors.toml
index b22877a7..b020ca38 100644
--- a/bors.toml
+++ b/bors.toml
@@ -5,7 +5,8 @@ status = [
"Android i686",
"Android x86_64",
"DragonFly BSD x86_64",
- "FreeBSD amd64 & i686",
+ "FreeBSD 12 amd64 & i686",
+ "FreeBSD 14 amd64 & i686",
"Fuchsia x86_64",
"Linux MIPS",
"Linux MIPS64 el",
diff --git a/src/fcntl.rs b/src/fcntl.rs
index fa64c8ea..5272c809 100644
--- a/src/fcntl.rs
+++ b/src/fcntl.rs
@@ -742,8 +742,8 @@ impl SpacectlRange {
///
/// # Example
///
-// no_run because it fails to link until FreeBSD 14.0
-/// ```no_run
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
/// # use std::io::Write;
/// # use std::os::unix::fs::FileExt;
/// # use std::os::unix::io::AsRawFd;
@@ -788,8 +788,8 @@ pub fn fspacectl(fd: RawFd, range: SpacectlRange) -> Result<SpacectlRange> {
///
/// # Example
///
-// no_run because it fails to link until FreeBSD 14.0
-/// ```no_run
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
/// # use std::io::Write;
/// # use std::os::unix::fs::FileExt;
/// # use std::os::unix::io::AsRawFd;
diff --git a/src/sys/aio.rs b/src/sys/aio.rs
index 4780cdee..6ff88469 100644
--- a/src/sys/aio.rs
+++ b/src/sys/aio.rs
@@ -2,9 +2,12 @@
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
-//! devices. It supports [`read`](struct.AioCb.html#method.read),
-//! [`write`](struct.AioCb.html#method.write), and
-//! [`fsync`](struct.AioCb.html#method.fsync) operations. Completion
+//! devices. It supports [`read`](struct.AioRead.html#method.new),
+//! [`write`](struct.AioWrite.html#method.new),
+//! [`fsync`](struct.AioFsync.html#method.new),
+//! [`readv`](struct.AioReadv.html#method.new), and
+//! [`writev`](struct.AioWritev.html#method.new), operations, subject to
+//! platform support. Completion
//! notifications can optionally be delivered via
//! [signals](../signal/enum.SigevNotify.html#variant.SigevSignal), via the
//! [`aio_suspend`](fn.aio_suspend.html) function, or via polling. Some
@@ -17,23 +20,30 @@
//! that they will be executed atomically.
//!
//! Outstanding operations may be cancelled with
-//! [`cancel`](struct.AioCb.html#method.cancel) or
+//! [`cancel`](trait.Aio.html#method.cancel) or
//! [`aio_cancel_all`](fn.aio_cancel_all.html), though the operating system may
//! not support this for all filesystems and devices.
+#[cfg(target_os = "freebsd")]
+use std::io::{IoSlice, IoSliceMut};
+use std::{
+ convert::TryFrom,
+ fmt::{self, Debug},
+ marker::{PhantomData, PhantomPinned},
+ mem,
+ os::unix::io::RawFd,
+ pin::Pin,
+ ptr,
+ thread,
+};
-use crate::Result;
-use crate::errno::Errno;
-use std::os::unix::io::RawFd;
-use libc::{c_void, off_t, size_t};
-use std::fmt;
-use std::fmt::Debug;
-use std::marker::PhantomData;
-use std::mem;
-use std::pin::Pin;
-use std::ptr::{null, null_mut};
-use crate::sys::signal::*;
-use std::thread;
-use crate::sys::time::TimeSpec;
+use libc::{c_void, off_t};
+use pin_utils::unsafe_pinned;
+
+use crate::{
+ errno::Errno,
+ sys::{signal::*, time::TimeSpec},
+ Result,
+};
libc_enum! {
/// Mode for `AioCb::fsync`. Controls whether only data or both data and
@@ -52,22 +62,7 @@ libc_enum! {
#[cfg_attr(docsrs, doc(cfg(all())))]
O_DSYNC
}
-}
-
-libc_enum! {
- /// When used with [`lio_listio`](fn.lio_listio.html), determines whether a
- /// given `aiocb` should be used for a read operation, a write operation, or
- /// ignored. Has no effect for any other aio functions.
- #[repr(i32)]
- #[non_exhaustive]
- pub enum LioOpcode {
- /// No operation
- LIO_NOP,
- /// Write data as if by a call to [`AioCb::write`]
- LIO_WRITE,
- /// Write data as if by a call to [`AioCb::read`]
- LIO_READ,
- }
+ impl TryFrom<i32>
}
libc_enum! {
@@ -103,354 +98,133 @@ struct LibcAiocb(libc::aiocb);
unsafe impl Send for LibcAiocb {}
unsafe impl Sync for LibcAiocb {}
-/// AIO Control Block.
-///
-/// The basic structure used by all aio functions. Each `AioCb` represents one
-/// I/O request.
-pub struct AioCb<'a> {
- aiocb: LibcAiocb,
- /// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable
- mutable: bool,
+/// Base class for all AIO operations. Should only be used directly when
+/// checking for completion.
+// We could create some kind of AsPinnedMut trait, and implement it for all aio
+// ops, allowing the crate's users to get pinned references to `AioCb`. That
+// could save some code for things like polling methods. But IMHO it would
+// provide polymorphism at the wrong level. Instead, the best place for
+// polymorphism is at the level of `Futures`.
+#[repr(C)]
+struct AioCb {
+ aiocb: LibcAiocb,
/// Could this `AioCb` potentially have any in-kernel state?
+ // It would be really nice to perform the in-progress check entirely at
+ // compile time. But I can't figure out how, because:
+ // * Future::poll takes a `Pin<&mut self>` rather than `self`, and
+ // * Rust's lack of an equivalent of C++'s Guaranteed Copy Elision means
+ // that there's no way to write an AioCb constructor that neither boxes
+ // the object itself, nor moves it during return.
in_progress: bool,
- _buffer: std::marker::PhantomData<&'a [u8]>,
- _pin: std::marker::PhantomPinned
}
-impl<'a> AioCb<'a> {
- /// Returns the underlying file descriptor associated with the `AioCb`
- pub fn fd(&self) -> RawFd {
- self.aiocb.0.aio_fildes
- }
+impl AioCb {
+ pin_utils::unsafe_unpinned!(aiocb: LibcAiocb);
- /// Constructs a new `AioCb` with no associated buffer.
- ///
- /// The resulting `AioCb` structure is suitable for use with `AioCb::fsync`.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`.
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a raw file descriptor and use it for an
- /// [`fsync`](#method.fsync) operation.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify::SigevNone;
- /// # use std::{thread, time};
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// let f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_fd( f.as_raw_fd(), 0, SigevNone);
- /// aiocb.fsync(AioFsyncMode::O_SYNC).expect("aio_fsync failed early");
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// aiocb.aio_return().expect("aio_fsync failed late");
- /// ```
- pub fn from_fd(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = 0;
- a.0.aio_nbytes = 0;
- a.0.aio_buf = null_mut();
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn aio_return(mut self: Pin<&mut Self>) -> Result<usize> {
+ self.in_progress = false;
+ unsafe {
+ let p: *mut libc::aiocb = &mut self.aiocb.0;
+ Errno::result(libc::aio_return(p))
+ }
+ .map(|r| r as usize)
}
- // Private helper
- #[cfg(not(any(target_os = "ios", target_os = "macos")))]
- fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a>
- {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
+ fn cancel(mut self: Pin<&mut Self>) -> Result<AioCancelStat> {
+ let r = unsafe {
+ libc::aio_cancel(self.aiocb.0.aio_fildes, &mut self.aiocb.0)
+ };
+ match r {
+ libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
+ libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
+ libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
+ -1 => Err(Errno::last()),
+ _ => panic!("unknown aio_cancel return value"),
+ }
+ }
+ fn common_init(fd: RawFd, prio: i32, sigev_notify: SigevNotify) -> Self {
+ // Use mem::zeroed instead of explicitly zeroing each field, because the
+ // number and name of reserved fields is OS-dependent. On some OSes,
+ // some reserved fields are used the kernel for state, and must be
+ // explicitly zeroed when allocated.
+ let mut a = unsafe { mem::zeroed::<libc::aiocb>() };
+ a.aio_fildes = fd;
+ a.aio_reqprio = prio;
+ a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
AioCb {
- aiocb: a,
- mutable: true,
+ aiocb: LibcAiocb(a),
in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
}
}
- /// Constructs a new `AioCb` from a mutable slice.
- ///
- /// The resulting `AioCb` will be suitable for both read and write
- /// operations, but only if the borrow checker can guarantee that the slice
- /// will outlive the `AioCb`. That will usually be the case if the `AioCb`
- /// is stack-allocated.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: A memory buffer
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Examples
- ///
- /// Create an `AioCb` from a mutable slice and read into it.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::io::Write;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const INITIAL: &[u8] = b"abcdef123456";
- /// const LEN: usize = 4;
- /// let mut rbuf = vec![0; LEN];
- /// let mut f = tempfile().unwrap();
- /// f.write_all(INITIAL).unwrap();
- /// {
- /// let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
- /// 2, //offset
- /// &mut rbuf,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.read().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN);
- /// }
- /// assert_eq!(rbuf, b"cdef");
- /// ```
- pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: true,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn error(self: Pin<&mut Self>) -> Result<()> {
+ let r = unsafe { libc::aio_error(&self.aiocb().0) };
+ match r {
+ 0 => Ok(()),
+ num if num > 0 => Err(Errno::from_i32(num)),
+ -1 => Err(Errno::last()),
+ num => panic!("unknown aio_error return value {:?}", num),
+ }
}
- /// Constructs a new `AioCb` from a mutable raw pointer
- ///
- /// Unlike `from_mut_slice`, this method returns a structure suitable for
- /// placement on the heap. It may be used for both reads and writes. Due
- /// to its unsafety, this method is not recommended. It is most useful when
- /// heap allocation is required.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: Pointer to the memory buffer
- /// * `len`: Length of the buffer pointed to by `buf`
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Safety
- ///
- /// The caller must ensure that the storage pointed to by `buf` outlives the
- /// `AioCb`. The lifetime checker can't help here.
- pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t,
- buf: *mut c_void, len: usize,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = len;
- a.0.aio_buf = buf;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: true,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned,
- })
+ fn in_progress(&self) -> bool {
+ self.in_progress
}
- /// Constructs a new `AioCb` from a raw pointer.
- ///
- /// Unlike `from_slice`, this method returns a structure suitable for
- /// placement on the heap. Due to its unsafety, this method is not
- /// recommended. It is most useful when heap allocation is required.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: Pointer to the memory buffer
- /// * `len`: Length of the buffer pointed to by `buf`
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
- ///
- /// # Safety
- ///
- /// The caller must ensure that the storage pointed to by `buf` outlives the
- /// `AioCb`. The lifetime checker can't help here.
- pub unsafe fn from_ptr(fd: RawFd, offs: off_t,
- buf: *const c_void, len: usize,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = len;
- // casting a const ptr to a mutable ptr here is ok, because we set the
- // AioCb's mutable field to false
- a.0.aio_buf = buf as *mut c_void;
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- Box::pin(AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- })
+ fn set_in_progress(mut self: Pin<&mut Self>) {
+ self.as_mut().in_progress = true;
}
- // Private helper
- fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb
- {
- let mut a = AioCb::common_init(fd, prio, sigev_notify);
- a.0.aio_offset = offs;
- a.0.aio_nbytes = buf.len() as size_t;
- // casting an immutable buffer to a mutable pointer looks unsafe,
- // but technically its only unsafe to dereference it, not to create
- // it.
- a.0.aio_buf = buf.as_ptr() as *mut c_void;
- assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
- a.0.aio_lio_opcode = opcode as libc::c_int;
-
- AioCb {
- aiocb: a,
- mutable: false,
- in_progress: false,
- _buffer: PhantomData,
- _pin: std::marker::PhantomPinned
- }
+ /// Update the notification settings for an existing AIO operation that has
+ /// not yet been submitted.
+ // Takes a normal reference rather than a pinned one because this method is
+ // normally called before the object needs to be pinned, that is, before
+ // it's been submitted to the kernel.
+ fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
+ assert!(
+ !self.in_progress,
+ "Can't change notification settings for an in-progress operation"
+ );
+ self.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
+}
- /// Like [`AioCb::from_mut_slice`], but works on constant slices rather than
- /// mutable slices.
- ///
- /// An `AioCb` created this way cannot be used with `read`, and its
- /// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when
- /// writing a const buffer with `AioCb::write`, since `from_mut_slice` can't
- /// work with const buffers.
- ///
- /// # Examples
- ///
- /// Construct an `AioCb` from a slice and use it for writing.
- ///
- /// ```
- /// # use nix::errno::Errno;
- /// # use nix::Error;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::{thread, time};
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
- /// ```
- // Note: another solution to the problem of writing const buffers would be
- // to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read
- // could take the former and AioCb::write could take the latter. However,
- // then lio_listio wouldn't work, because that function needs a slice of
- // AioCb, and they must all be of the same type.
- pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Pin<Box<AioCb>>
- {
- Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify,
- opcode))
+impl Debug for AioCb {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("AioCb")
+ .field("aiocb", &self.aiocb.0)
+ .field("in_progress", &self.in_progress)
+ .finish()
}
+}
- fn common_init(fd: RawFd, prio: libc::c_int,
- sigev_notify: SigevNotify) -> LibcAiocb {
- // Use mem::zeroed instead of explicitly zeroing each field, because the
- // number and name of reserved fields is OS-dependent. On some OSes,
- // some reserved fields are used the kernel for state, and must be
- // explicitly zeroed when allocated.
- let mut a = unsafe { mem::zeroed::<libc::aiocb>()};
- a.aio_fildes = fd;
- a.aio_reqprio = prio;
- a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
- LibcAiocb(a)
+impl Drop for AioCb {
+ /// If the `AioCb` has no remaining state in the kernel, just drop it.
+ /// Otherwise, dropping constitutes a resource leak, which is an error
+ fn drop(&mut self) {
+ assert!(
+ thread::panicking() || !self.in_progress,
+ "Dropped an in-progress AioCb"
+ );
}
+}
- /// Update the notification settings for an existing `aiocb`
- pub fn set_sigev_notify(self: &mut Pin<Box<Self>>,
- sigev_notify: SigevNotify)
- {
- // Safe because we don't move any of the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
- }
+/// Methods common to all AIO operations
+pub trait Aio {
+ /// The return type of [`Aio::aio_return`].
+ type Output;
+
+ /// Retrieve return status of an asynchronous operation.
+ ///
+ /// Should only be called once for each operation, after [`Aio::error`]
+ /// indicates that it has completed. The result is the same as for the
+ /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions.
+ ///
+ /// # References
+ ///
+ /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html)
+ fn aio_return(self: Pin<&mut Self>) -> Result<Self::Output>;
/// Cancels an outstanding AIO request.
///
@@ -477,51 +251,26 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+ /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// &wbuf[..],
/// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// let cs = aiocb.cancel().unwrap();
+ /// SigevNotify::SigevNone));
+ /// aiocb.as_mut().submit().unwrap();
+ /// let cs = aiocb.as_mut().cancel().unwrap();
/// if cs == AioCancelStat::AioNotCanceled {
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+ /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
/// }
/// // Must call `aio_return`, but ignore the result
- /// let _ = aiocb.aio_return();
+ /// let _ = aiocb.as_mut().aio_return();
/// ```
///
/// # References
///
/// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html)
- pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> {
- let r = unsafe {
- let selfp = self.as_mut().get_unchecked_mut();
- libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0)
- };
- match r {
- libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
- libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
- libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
- -1 => Err(Errno::last()),
- _ => panic!("unknown aio_cancel return value")
- }
- }
-
- fn error_unpinned(&mut self) -> Result<()> {
- let r = unsafe {
- libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb)
- };
- match r {
- 0 => Ok(()),
- num if num > 0 => Err(Errno::from_i32(num)),
- -1 => Err(Errno::last()),
- num => panic!("unknown aio_error return value {:?}", num)
- }
- }
+ fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat>;
/// Retrieve error status of an asynchronous operation.
///
@@ -543,60 +292,265 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+ /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_NOP);
- /// aiocb.write().unwrap();
- /// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+ /// SigevNotify::SigevNone));
+ /// aiocb.as_mut().submit().unwrap();
+ /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
+ /// assert_eq!(aiocb.as_mut().aio_return().unwrap(), WBUF.len());
/// ```
///
/// # References
///
/// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html)
- pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> {
- // Safe because error_unpinned doesn't move the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.error_unpinned()
- }
+ fn error(self: Pin<&mut Self>) -> Result<()>;
+
+ /// Returns the underlying file descriptor associated with the operation.
+ fn fd(&self) -> RawFd;
- /// An asynchronous version of `fsync(2)`.
+ /// Does this operation currently have any in-kernel state?
///
- /// # References
+ /// Dropping an operation that does have in-kernel state constitutes a
+ /// resource leak.
///
- /// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html)
- pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> {
- // Safe because we don't move the libc::aiocb
- unsafe {
- let selfp = self.as_mut().get_unchecked_mut();
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- libc::aio_fsync(mode as libc::c_int, p)
- }).map(|_| {
- selfp.in_progress = true;
+ /// # Examples
+ ///
+ /// ```
+ /// # use nix::errno::Errno;
+ /// # use nix::Error;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify::SigevNone;
+ /// # use std::{thread, time};
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use tempfile::tempfile;
+ /// let f = tempfile().unwrap();
+ /// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC,
+ /// 0, SigevNone));
+ /// assert!(!aiof.as_mut().in_progress());
+ /// aiof.as_mut().submit().expect("aio_fsync failed early");
+ /// assert!(aiof.as_mut().in_progress());
+ /// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) {
+ /// thread::sleep(time::Duration::from_millis(10));
+ /// }
+ /// aiof.as_mut().aio_return().expect("aio_fsync failed late");
+ /// assert!(!aiof.as_mut().in_progress());
+ /// ```
+ fn in_progress(&self) -> bool;
+
+ /// Returns the priority of the `AioCb`
+ fn priority(&self) -> i32;
+
+ /// Update the notification settings for an existing AIO operation that has
+ /// not yet been submitted.
+ fn set_sigev_notify(&mut self, sev: SigevNotify);
+
+ /// Returns the `SigEvent` that will be used for notification.
+ fn sigevent(&self) -> SigEvent;
+
+ /// Actually start the I/O operation.
+ ///
+ /// After calling this method and until [`Aio::aio_return`] returns `Ok`,
+ /// the structure may not be moved in memory.
+ fn submit(self: Pin<&mut Self>) -> Result<()>;
+}
+
+macro_rules! aio_methods {
+ () => {
+ fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat> {
+ self.aiocb().cancel()
+ }
+
+ fn error(self: Pin<&mut Self>) -> Result<()> {
+ self.aiocb().error()
+ }
+
+ fn fd(&self) -> RawFd {
+ self.aiocb.aiocb.0.aio_fildes
+ }
+
+ fn in_progress(&self) -> bool {
+ self.aiocb.in_progress()
+ }
+
+ fn priority(&self) -> i32 {
+ self.aiocb.aiocb.0.aio_reqprio
+ }
+
+ fn set_sigev_notify(&mut self, sev: SigevNotify) {
+ self.aiocb.set_sigev_notify(sev)
+ }
+
+ fn sigevent(&self) -> SigEvent {
+ SigEvent::from(&self.aiocb.aiocb.0.aio_sigevent)
+ }
+ };
+ ($func:ident) => {
+ aio_methods!();
+
+ fn aio_return(self: Pin<&mut Self>) -> Result<<Self as Aio>::Output> {
+ self.aiocb().aio_return()
+ }
+
+ fn submit(mut self: Pin<&mut Self>) -> Result<()> {
+ let p: *mut libc::aiocb = &mut self.as_mut().aiocb().aiocb.0;
+ Errno::result({ unsafe { libc::$func(p) } }).map(|_| {
+ self.aiocb().set_in_progress();
})
}
+ };
+}
+
+/// An asynchronous version of `fsync(2)`.
+///
+/// # References
+///
+/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html)
+/// # Examples
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify::SigevNone;
+/// # use std::{thread, time};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// let f = tempfile().unwrap();
+/// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC,
+/// 0, SigevNone));
+/// aiof.as_mut().submit().expect("aio_fsync failed early");
+/// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// aiof.as_mut().aio_return().expect("aio_fsync failed late");
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioFsync {
+ aiocb: AioCb,
+ _pin: PhantomPinned,
+}
+
+impl AioFsync {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the operation's fsync mode: data and metadata or data only?
+ pub fn mode(&self) -> AioFsyncMode {
+ AioFsyncMode::try_from(self.aiocb.aiocb.0.aio_lio_opcode).unwrap()
}
- /// Returns the `aiocb`'s `LioOpcode` field
+ /// Create a new `AioFsync`.
///
- /// If the value cannot be represented as an `LioOpcode`, returns `None`
- /// instead.
- pub fn lio_opcode(&self) -> Option<LioOpcode> {
- match self.aiocb.0.aio_lio_opcode {
- libc::LIO_READ => Some(LioOpcode::LIO_READ),
- libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE),
- libc::LIO_NOP => Some(LioOpcode::LIO_NOP),
- _ => None
+ /// # Arguments
+ ///
+ /// * `fd`: File descriptor to sync.
+ /// * `mode`: Whether to sync file metadata too, or just data.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`.
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ mode: AioFsyncMode,
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // To save some memory, store mode in an unused field of the AioCb.
+ // True it isn't very much memory, but downstream creates will likely
+ // create an enum containing this and other AioCb variants and pack
+ // those enums into data structures like Vec, so it adds up.
+ aiocb.aiocb.0.aio_lio_opcode = mode as libc::c_int;
+ AioFsync {
+ aiocb,
+ _pin: PhantomPinned,
}
}
+}
+
+impl Aio for AioFsync {
+ type Output = ();
+
+ aio_methods!();
+
+ fn aio_return(self: Pin<&mut Self>) -> Result<()> {
+ self.aiocb().aio_return().map(drop)
+ }
+
+ fn submit(mut self: Pin<&mut Self>) -> Result<()> {
+ let aiocb = &mut self.as_mut().aiocb().aiocb.0;
+ let mode = mem::replace(&mut aiocb.aio_lio_opcode, 0);
+ let p: *mut libc::aiocb = aiocb;
+ Errno::result(unsafe { libc::aio_fsync(mode, p) }).map(|_| {
+ self.aiocb().set_in_progress();
+ })
+ }
+}
+
+// AioFsync does not need AsMut, since it can't be used with lio_listio
+
+impl AsRef<libc::aiocb> for AioFsync {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
+ }
+}
+
+/// Asynchronously reads from a file descriptor into a buffer
+///
+/// # References
+///
+/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html)
+///
+/// # Examples
+///
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::Write;
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const INITIAL: &[u8] = b"abcdef123456";
+/// const LEN: usize = 4;
+/// let mut rbuf = vec![0; LEN];
+/// let mut f = tempfile().unwrap();
+/// f.write_all(INITIAL).unwrap();
+/// {
+/// let mut aior = Box::pin(
+/// AioRead::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &mut rbuf,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aior.as_mut().submit().unwrap();
+/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aior.as_mut().aio_return().unwrap(), LEN);
+/// }
+/// assert_eq!(rbuf, b"cdef");
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioRead<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [u8]>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> AioRead<'a> {
+ unsafe_pinned!(aiocb: AioCb);
/// Returns the requested length of the aio operation in bytes
///
@@ -604,85 +558,418 @@ impl<'a> AioCb<'a> {
/// number of bytes actually read or written by a completed operation, use
/// `aio_return` instead.
pub fn nbytes(&self) -> usize {
- self.aiocb.0.aio_nbytes
+ self.aiocb.aiocb.0.aio_nbytes
}
- /// Returns the file offset stored in the `AioCb`
+ /// Create a new `AioRead`, placing the data in a mutable slice.
+ ///
+ /// # Arguments
+ ///
+ /// * `fd`: File descriptor to read from
+ /// * `offs`: File offset
+ /// * `buf`: A memory buffer. It must outlive the `AioRead`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ buf: &'a mut [u8],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ aiocb.aiocb.0.aio_nbytes = buf.len();
+ aiocb.aiocb.0.aio_buf = buf.as_mut_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READ;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioRead {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
pub fn offset(&self) -> off_t {
- self.aiocb.0.aio_offset
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+impl<'a> Aio for AioRead<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_read);
+}
+
+impl<'a> AsMut<libc::aiocb> for AioRead<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
}
+}
- /// Returns the priority of the `AioCb`
- pub fn priority(&self) -> libc::c_int {
- self.aiocb.0.aio_reqprio
+impl<'a> AsRef<libc::aiocb> for AioRead<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
+}
- /// Asynchronously reads from a file descriptor into a buffer
+/// Asynchronously reads from a file descriptor into a scatter/gather list of buffers.
+///
+/// # References
+///
+/// [aio_readv](https://www.freebsd.org/cgi/man.cgi?query=aio_readv)
+///
+/// # Examples
+///
+///
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::{IoSliceMut, Write};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const INITIAL: &[u8] = b"abcdef123456";
+/// let mut rbuf0 = vec![0; 4];
+/// let mut rbuf1 = vec![0; 2];
+/// let expected_len = rbuf0.len() + rbuf1.len();
+/// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
+/// let mut f = tempfile().unwrap();
+/// f.write_all(INITIAL).unwrap();
+/// {
+/// let mut aior = Box::pin(
+/// AioReadv::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &mut rbufs,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aior.as_mut().submit().unwrap();
+/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aior.as_mut().aio_return().unwrap(), expected_len);
+/// }
+/// assert_eq!(rbuf0, b"cdef");
+/// assert_eq!(rbuf1, b"12");
+/// ```
+#[cfg(target_os = "freebsd")]
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioReadv<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [&'a [u8]]>,
+ _pin: PhantomPinned,
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AioReadv<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the number of buffers the operation will read into.
+ pub fn iovlen(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
+ }
+
+ /// Create a new `AioReadv`, placing the data in a list of mutable slices.
///
- /// # References
+ /// # Arguments
///
- /// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html)
- pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> {
- assert!(self.mutable, "Can't read into an immutable buffer");
- // Safe because we don't move anything
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- unsafe { libc::aio_read(p) }
- }).map(|_| {
- selfp.in_progress = true;
- })
+ /// * `fd`: File descriptor to read from
+ /// * `offs`: File offset
+ /// * `bufs`: A scatter/gather list of memory buffers. They must
+ /// outlive the `AioReadv`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ bufs: &mut [IoSliceMut<'a>],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // In vectored mode, aio_nbytes stores the length of the iovec array,
+ // not the byte count.
+ aiocb.aiocb.0.aio_nbytes = bufs.len();
+ aiocb.aiocb.0.aio_buf = bufs.as_mut_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READV;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioReadv {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
}
- /// Returns the `SigEvent` stored in the `AioCb`
- pub fn sigevent(&self) -> SigEvent {
- SigEvent::from(&self.aiocb.0.aio_sigevent)
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
}
+}
- fn aio_return_unpinned(&mut self) -> Result<isize> {
- unsafe {
- let p: *mut libc::aiocb = &mut self.aiocb.0;
- self.in_progress = false;
- Errno::result(libc::aio_return(p))
- }
+#[cfg(target_os = "freebsd")]
+impl<'a> Aio for AioReadv<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_readv);
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsMut<libc::aiocb> for AioReadv<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsRef<libc::aiocb> for AioReadv<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
+}
- /// Retrieve return status of an asynchronous operation.
+/// Asynchronously writes from a buffer to a file descriptor
+///
+/// # References
+///
+/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html)
+///
+/// # Examples
+///
+/// ```
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(
+/// AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// WBUF,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aiow.as_mut().submit().unwrap();
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioWrite<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [u8]>,
+ _pin: PhantomPinned,
+}
+
+impl<'a> AioWrite<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the requested length of the aio operation in bytes
///
- /// Should only be called once for each `AioCb`, after `AioCb::error`
- /// indicates that it has completed. The result is the same as for the
- /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions.
+ /// This method returns the *requested* length of the operation. To get the
+ /// number of bytes actually read or written by a completed operation, use
+ /// `aio_return` instead.
+ pub fn nbytes(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
+ }
+
+ /// Construct a new `AioWrite`.
///
- /// # References
+ /// # Arguments
///
- /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html)
- // Note: this should be just `return`, but that's a reserved word
- pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> {
- // Safe because aio_return_unpinned does not move the data
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- selfp.aio_return_unpinned()
+ /// * `fd`: File descriptor to write to
+ /// * `offs`: File offset
+ /// * `buf`: A memory buffer. It must outlive the `AioWrite`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ buf: &'a [u8],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ aiocb.aiocb.0.aio_nbytes = buf.len();
+ // casting an immutable buffer to a mutable pointer looks unsafe,
+ // but technically its only unsafe to dereference it, not to create
+ // it. Type Safety guarantees that we'll never pass aiocb to
+ // aio_read or aio_readv.
+ aiocb.aiocb.0.aio_buf = buf.as_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITE;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioWrite {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+impl<'a> Aio for AioWrite<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_write);
+}
+
+impl<'a> AsMut<libc::aiocb> for AioWrite<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+impl<'a> AsRef<libc::aiocb> for AioWrite<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
+ }
+}
+
+/// Asynchronously writes from a scatter/gather list of buffers to a file descriptor.
+///
+/// # References
+///
+/// [aio_writev](https://www.freebsd.org/cgi/man.cgi?query=aio_writev)
+///
+/// # Examples
+///
+#[cfg_attr(fbsd14, doc = " ```")]
+#[cfg_attr(not(fbsd14), doc = " ```no_run")]
+/// # use nix::errno::Errno;
+/// # use nix::Error;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use std::{thread, time};
+/// # use std::io::IoSlice;
+/// # use std::os::unix::io::AsRawFd;
+/// # use tempfile::tempfile;
+/// const wbuf0: &[u8] = b"abcdef";
+/// const wbuf1: &[u8] = b"123456";
+/// let len = wbuf0.len() + wbuf1.len();
+/// let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(
+/// AioWritev::new(
+/// f.as_raw_fd(),
+/// 2, //offset
+/// &wbufs,
+/// 0, //priority
+/// SigevNotify::SigevNone
+/// )
+/// );
+/// aiow.as_mut().submit().unwrap();
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), len);
+/// ```
+#[cfg(target_os = "freebsd")]
+#[derive(Debug)]
+#[repr(transparent)]
+pub struct AioWritev<'a> {
+ aiocb: AioCb,
+ _data: PhantomData<&'a [&'a [u8]]>,
+ _pin: PhantomPinned,
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AioWritev<'a> {
+ unsafe_pinned!(aiocb: AioCb);
+
+ /// Returns the number of buffers the operation will read into.
+ pub fn iovlen(&self) -> usize {
+ self.aiocb.aiocb.0.aio_nbytes
}
- /// Asynchronously writes from a buffer to a file descriptor
+ /// Construct a new `AioWritev`.
///
- /// # References
+ /// # Arguments
///
- /// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html)
- pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> {
- // Safe because we don't move anything
- let selfp = unsafe {
- self.as_mut().get_unchecked_mut()
- };
- Errno::result({
- let p: *mut libc::aiocb = &mut selfp.aiocb.0;
- unsafe{ libc::aio_write(p) }
- }).map(|_| {
- selfp.in_progress = true;
- })
+ /// * `fd`: File descriptor to write to
+ /// * `offs`: File offset
+ /// * `bufs`: A scatter/gather list of memory buffers. They must
+ /// outlive the `AioWritev`.
+ /// * `prio`: If POSIX Prioritized IO is supported, then the
+ /// operation will be prioritized at the process's
+ /// priority level minus `prio`
+ /// * `sigev_notify`: Determines how you will be notified of event
+ /// completion.
+ pub fn new(
+ fd: RawFd,
+ offs: off_t,
+ bufs: &[IoSlice<'a>],
+ prio: i32,
+ sigev_notify: SigevNotify,
+ ) -> Self {
+ let mut aiocb = AioCb::common_init(fd, prio, sigev_notify);
+ // In vectored mode, aio_nbytes stores the length of the iovec array,
+ // not the byte count.
+ aiocb.aiocb.0.aio_nbytes = bufs.len();
+ // casting an immutable buffer to a mutable pointer looks unsafe,
+ // but technically its only unsafe to dereference it, not to create
+ // it. Type Safety guarantees that we'll never pass aiocb to
+ // aio_read or aio_readv.
+ aiocb.aiocb.0.aio_buf = bufs.as_ptr() as *mut c_void;
+ aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITEV;
+ aiocb.aiocb.0.aio_offset = offs;
+ AioWritev {
+ aiocb,
+ _data: PhantomData,
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Returns the file offset of the operation.
+ pub fn offset(&self) -> off_t {
+ self.aiocb.aiocb.0.aio_offset
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> Aio for AioWritev<'a> {
+ type Output = usize;
+
+ aio_methods!(aio_writev);
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsMut<libc::aiocb> for AioWritev<'a> {
+ fn as_mut(&mut self) -> &mut libc::aiocb {
+ &mut self.aiocb.aiocb.0
+ }
+}
+
+#[cfg(target_os = "freebsd")]
+impl<'a> AsRef<libc::aiocb> for AioWritev<'a> {
+ fn as_ref(&self) -> &libc::aiocb {
+ &self.aiocb.aiocb.0
}
}
@@ -704,38 +991,37 @@ impl<'a> AioCb<'a> {
/// # use tempfile::tempfile;
/// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// &wbuf[..],
/// 0, //priority
-/// SigevNotify::SigevNone,
-/// LioOpcode::LIO_NOP);
-/// aiocb.write().unwrap();
+/// SigevNotify::SigevNone));
+/// aiocb.as_mut().submit().unwrap();
/// let cs = aio_cancel_all(f.as_raw_fd()).unwrap();
/// if cs == AioCancelStat::AioNotCanceled {
-/// while (aiocb.error() == Err(Errno::EINPROGRESS)) {
+/// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
/// }
/// // Must call `aio_return`, but ignore the result
-/// let _ = aiocb.aio_return();
+/// let _ = aiocb.as_mut().aio_return();
/// ```
///
/// # References
///
/// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html)
pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
- match unsafe { libc::aio_cancel(fd, null_mut()) } {
+ match unsafe { libc::aio_cancel(fd, ptr::null_mut()) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Errno::last()),
- _ => panic!("unknown aio_cancel return value")
+ _ => panic!("unknown aio_cancel return value"),
}
}
-/// Suspends the calling process until at least one of the specified `AioCb`s
-/// has completed, a signal is delivered, or the timeout has passed.
+/// Suspends the calling process until at least one of the specified operations
+/// have completed, a signal is delivered, or the timeout has passed.
///
/// If `timeout` is `None`, `aio_suspend` will block indefinitely.
///
@@ -750,380 +1036,209 @@ pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
/// # use tempfile::tempfile;
/// const WBUF: &[u8] = b"abcdef123456";
/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
+/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
/// 2, //offset
/// WBUF,
/// 0, //priority
-/// SigevNotify::SigevNone,
-/// LioOpcode::LIO_NOP);
-/// aiocb.write().unwrap();
-/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed");
-/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
+/// SigevNotify::SigevNone));
+/// aiocb.as_mut().submit().unwrap();
+/// aio_suspend(&[&*aiocb], None).expect("aio_suspend failed");
+/// assert_eq!(aiocb.as_mut().aio_return().unwrap() as usize, WBUF.len());
/// ```
/// # References
///
/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html)
-pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> {
- let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb];
- let p = plist as *const *const libc::aiocb;
+pub fn aio_suspend(
+ list: &[&dyn AsRef<libc::aiocb>],
+ timeout: Option<TimeSpec>,
+) -> Result<()> {
+ let p = list as *const [&dyn AsRef<libc::aiocb>]
+ as *const [*const libc::aiocb]
+ as *const *const libc::aiocb;
let timep = match timeout {
- None => null::<libc::timespec>(),
- Some(x) => x.as_ref() as *const libc::timespec
+ None => ptr::null::<libc::timespec>(),
+ Some(x) => x.as_ref() as *const libc::timespec,
};
- Errno::result(unsafe {
- libc::aio_suspend(p, list.len() as i32, timep)
- }).map(drop)
-}
-
-impl<'a> Debug for AioCb<'a> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_struct("AioCb")
- .field("aiocb", &self.aiocb.0)
- .field("mutable", &self.mutable)
- .field("in_progress", &self.in_progress)
- .finish()
- }
+ Errno::result(unsafe { libc::aio_suspend(p, list.len() as i32, timep) })
+ .map(drop)
}
-impl<'a> Drop for AioCb<'a> {
- /// If the `AioCb` has no remaining state in the kernel, just drop it.
- /// Otherwise, dropping constitutes a resource leak, which is an error
- fn drop(&mut self) {
- assert!(thread::panicking() || !self.in_progress,
- "Dropped an in-progress AioCb");
- }
-}
-
-/// LIO Control Block.
+/// Submits multiple asynchronous I/O requests with a single system call.
///
-/// The basic structure used to issue multiple AIO operations simultaneously.
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-pub struct LioCb<'a> {
- /// A collection of [`AioCb`]s. All of these will be issued simultaneously
- /// by the [`listio`] method.
- ///
- /// [`AioCb`]: struct.AioCb.html
- /// [`listio`]: #method.listio
- // Their locations in memory must be fixed once they are passed to the
- // kernel. So this field must be non-public so the user can't swap.
- aiocbs: Box<[AioCb<'a>]>,
-
- /// The actual list passed to `libc::lio_listio`.
- ///
- /// It must live for as long as any of the operations are still being
- /// processesed, because the aio subsystem uses its address as a unique
- /// identifier.
- list: Vec<*mut libc::aiocb>,
-
- /// A partial set of results. This field will get populated by
- /// `listio_resubmit` when an `LioCb` is resubmitted after an error
- results: Vec<Option<Result<isize>>>
-}
-
-/// LioCb can't automatically impl Send and Sync just because of the raw
-/// pointers in list. But that's stupid. There's no reason that raw pointers
-/// should automatically be non-Send
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-unsafe impl<'a> Send for LioCb<'a> {}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-unsafe impl<'a> Sync for LioCb<'a> {}
-
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-impl<'a> LioCb<'a> {
- /// Are no [`AioCb`]s contained?
- pub fn is_empty(&self) -> bool {
- self.aiocbs.is_empty()
- }
-
- /// Return the number of individual [`AioCb`]s contained.
- pub fn len(&self) -> usize {
- self.aiocbs.len()
- }
-
- /// Submits multiple asynchronous I/O requests with a single system call.
- ///
- /// They are not guaranteed to complete atomically, and the order in which
- /// the requests are carried out is not specified. Reads, writes, and
- /// fsyncs may be freely mixed.
- ///
- /// This function is useful for reducing the context-switch overhead of
- /// submitting many AIO operations. It can also be used with
- /// `LioMode::LIO_WAIT` to block on the result of several independent
- /// operations. Used that way, it is often useful in programs that
- /// otherwise make little use of AIO.
- ///
- /// # Examples
- ///
- /// Use `listio` to submit an aio operation and wait for its completion. In
- /// this case, there is no need to use [`aio_suspend`] to wait or
- /// [`AioCb::error`] to poll.
- ///
- /// ```
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::os::unix::io::AsRawFd;
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCbBuilder::with_capacity(1)
- /// .emplace_slice(
- /// f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE
- /// ).finish();
- /// liocb.listio(LioMode::LIO_WAIT,
- /// SigevNotify::SigevNone).unwrap();
- /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- /// ```
- ///
- /// # References
- ///
- /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
- ///
- /// [`aio_suspend`]: fn.aio_suspend.html
- /// [`AioCb::error`]: struct.AioCb.html#method.error
- pub fn listio(&mut self, mode: LioMode,
- sigev_notify: SigevNotify) -> Result<()> {
- let sigev = SigEvent::new(sigev_notify);
- let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
- self.list.clear();
- for a in &mut self.aiocbs.iter_mut() {
- a.in_progress = true;
- self.list.push(a as *mut AioCb<'a>
- as *mut libc::aiocb);
- }
- let p = self.list.as_ptr();
- Errno::result(unsafe {
- libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
- }).map(drop)
- }
-
- /// Resubmits any incomplete operations with [`lio_listio`].
- ///
- /// Sometimes, due to system resource limitations, an `lio_listio` call will
- /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
- /// `EINTR`. In any of these cases, only a subset of its constituent
- /// operations will actually have been initiated. `listio_resubmit` will
- /// resubmit any operations that are still uninitiated.
- ///
- /// After calling `listio_resubmit`, results should be collected by
- /// [`LioCb::aio_return`].
- ///
- /// # Examples
- /// ```no_run
- /// # use nix::Error;
- /// # use nix::errno::Errno;
- /// # use nix::sys::aio::*;
- /// # use nix::sys::signal::SigevNotify;
- /// # use std::os::unix::io::AsRawFd;
- /// # use std::{thread, time};
- /// # use tempfile::tempfile;
- /// const WBUF: &[u8] = b"abcdef123456";
- /// let mut f = tempfile().unwrap();
- /// let mut liocb = LioCbBuilder::with_capacity(1)
- /// .emplace_slice(
- /// f.as_raw_fd(),
- /// 2, //offset
- /// WBUF,
- /// 0, //priority
- /// SigevNotify::SigevNone,
- /// LioOpcode::LIO_WRITE
- /// ).finish();
- /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
- /// while err == Err(Errno::EIO) ||
- /// err == Err(Errno::EAGAIN) {
- /// thread::sleep(time::Duration::from_millis(10));
- /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
- /// }
- /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- /// ```
- ///
- /// # References
- ///
- /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
- ///
- /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
- /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return
- // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
- // changed by this method, because the kernel relies on their addresses
- // being stable.
- // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
- // sigev_notify will immediately refire.
- pub fn listio_resubmit(&mut self, mode:LioMode,
- sigev_notify: SigevNotify) -> Result<()> {
- let sigev = SigEvent::new(sigev_notify);
- let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
- self.list.clear();
-
- while self.results.len() < self.aiocbs.len() {
- self.results.push(None);
- }
-
- for (i, a) in self.aiocbs.iter_mut().enumerate() {
- if self.results[i].is_some() {
- // Already collected final status for this operation
- continue;
- }
- match a.error_unpinned() {
- Ok(()) => {
- // aiocb is complete; collect its status and don't resubmit
- self.results[i] = Some(a.aio_return_unpinned());
- },
- Err(Errno::EAGAIN) => {
- self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
- },
- Err(Errno::EINPROGRESS) => {
- // aiocb is was successfully queued; no need to do anything
- },
- Err(Errno::EINVAL) => panic!(
- "AioCb was never submitted, or already finalized"),
- _ => unreachable!()
- }
- }
- let p = self.list.as_ptr();
- Errno::result(unsafe {
- libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
- }).map(drop)
- }
-
- /// Collect final status for an individual `AioCb` submitted as part of an
- /// `LioCb`.
- ///
- /// This is just like [`AioCb::aio_return`], except it takes into account
- /// operations that were restarted by [`LioCb::listio_resubmit`]
- ///
- /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return
- /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
- pub fn aio_return(&mut self, i: usize) -> Result<isize> {
- if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].aio_return_unpinned()
- } else {
- self.results[i].unwrap()
- }
- }
-
- /// Retrieve error status of an individual `AioCb` submitted as part of an
- /// `LioCb`.
- ///
- /// This is just like [`AioCb::error`], except it takes into account
- /// operations that were restarted by [`LioCb::listio_resubmit`]
- ///
- /// [`AioCb::error`]: struct.AioCb.html#method.error
- /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
- pub fn error(&mut self, i: usize) -> Result<()> {
- if i >= self.results.len() || self.results[i].is_none() {
- self.aiocbs[i].error_unpinned()
- } else {
- Ok(())
- }
- }
-}
-
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-impl<'a> Debug for LioCb<'a> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_struct("LioCb")
- .field("aiocbs", &self.aiocbs)
- .finish()
- }
-}
-
-/// Used to construct `LioCb`
-// This must be a separate class from LioCb due to pinning constraints. LioCb
-// must use a boxed slice of AioCbs so they will have stable storage, but
-// LioCbBuilder must use a Vec to make construction possible when the final size
-// is unknown.
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-#[derive(Debug)]
-pub struct LioCbBuilder<'a> {
- /// A collection of [`AioCb`]s.
- ///
- /// [`AioCb`]: struct.AioCb.html
- pub aiocbs: Vec<AioCb<'a>>,
+/// They are not guaranteed to complete atomically, and the order in which the
+/// requests are carried out is not specified. Reads, and writes may be freely
+/// mixed.
+///
+/// # Examples
+///
+/// Use `lio_listio` to submit an aio operation and wait for its completion. In
+/// this case, there is no need to use aio_suspend to wait or `error` to poll.
+/// This mode is useful for otherwise-synchronous programs that want to execute
+/// a handful of I/O operations in parallel.
+/// ```
+/// # use std::os::unix::io::AsRawFd;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// lio_listio(LioMode::LIO_WAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone)
+/// .unwrap();
+/// // At this point, we are guaranteed that aiow is complete.
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+///
+/// Use `lio_listio` to submit multiple asynchronous operations with a single
+/// syscall, but receive notification individually. This is an efficient
+/// technique for reducing overall context-switch overhead, especially when
+/// combined with kqueue.
+/// ```
+/// # use std::os::unix::io::AsRawFd;
+/// # use std::thread;
+/// # use std::time;
+/// # use nix::errno::Errno;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::SigevNotify;
+/// # use tempfile::tempfile;
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone)
+/// .unwrap();
+/// // We must wait for the completion of each individual operation
+/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+///
+/// Use `lio_listio` to submit multiple operations, and receive notification
+/// only when all of them are complete. This can be useful when there is some
+/// logical relationship between the operations. But beware! Errors or system
+/// resource limitations may cause `lio_listio` to return `EIO`, `EAGAIN`, or
+/// `EINTR`, in which case some but not all operations may have been submitted.
+/// In that case, you must check the status of each individual operation, and
+/// possibly resubmit some.
+/// ```
+/// # use libc::c_int;
+/// # use std::os::unix::io::AsRawFd;
+/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::thread;
+/// # use std::time;
+/// # use lazy_static::lazy_static;
+/// # use nix::errno::Errno;
+/// # use nix::sys::aio::*;
+/// # use nix::sys::signal::*;
+/// # use tempfile::tempfile;
+/// lazy_static! {
+/// pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
+/// }
+///
+/// extern fn sigfunc(_: c_int) {
+/// SIGNALED.store(true, Ordering::Relaxed);
+/// }
+/// let sa = SigAction::new(SigHandler::Handler(sigfunc),
+/// SaFlags::SA_RESETHAND,
+/// SigSet::empty());
+/// SIGNALED.store(false, Ordering::Relaxed);
+/// unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
+///
+/// const WBUF: &[u8] = b"abcdef123456";
+/// let mut f = tempfile().unwrap();
+/// let mut aiow = Box::pin(AioWrite::new(
+/// f.as_raw_fd(),
+/// 2, // offset
+/// WBUF,
+/// 0, // priority
+/// SigevNotify::SigevNone
+/// ));
+/// let sev = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, si_value: 0 };
+/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], sev).unwrap();
+/// while !SIGNALED.load(Ordering::Relaxed) {
+/// thread::sleep(time::Duration::from_millis(10));
+/// }
+/// // At this point, since `lio_listio` returned success and delivered its
+/// // notification, we know that all operations are complete.
+/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
+/// ```
+pub fn lio_listio(
+ mode: LioMode,
+ list: &mut [Pin<&mut dyn AsMut<libc::aiocb>>],
+ sigev_notify: SigevNotify,
+) -> Result<()> {
+ let p = list as *mut [Pin<&mut dyn AsMut<libc::aiocb>>]
+ as *mut [*mut libc::aiocb]
+ as *mut *mut libc::aiocb;
+ let sigev = SigEvent::new(sigev_notify);
+ let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
+ Errno::result(unsafe {
+ libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
+ })
+ .map(drop)
}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(docsrs, doc(cfg(all())))]
-impl<'a> LioCbBuilder<'a> {
- /// Initialize an empty `LioCb`
- pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> {
- LioCbBuilder {
- aiocbs: Vec::with_capacity(capacity),
- }
- }
+#[cfg(test)]
+mod t {
+ use super::*;
- /// Add a new operation on an immutable slice to the [`LioCb`] under
- /// construction.
- ///
- /// Arguments are the same as for [`AioCb::from_slice`]
- ///
- /// [`LioCb`]: struct.LioCb.html
- /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice
- #[must_use]
- pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8],
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> Self
- {
- self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio,
- sigev_notify, opcode));
- self
- }
+ /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb
+ /// pointers. This test ensures that such casts are valid.
+ #[test]
+ fn casting() {
+ let sev = SigevNotify::SigevNone;
+ let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev);
+ assert_eq!(
+ aiof.as_ref() as *const libc::aiocb,
+ &aiof as *const AioFsync as *const libc::aiocb
+ );
- /// Add a new operation on a mutable slice to the [`LioCb`] under
- /// construction.
- ///
- /// Arguments are the same as for [`AioCb::from_mut_slice`]
- ///
- /// [`LioCb`]: struct.LioCb.html
- /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice
- #[must_use]
- pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t,
- buf: &'a mut [u8], prio: libc::c_int,
- sigev_notify: SigevNotify, opcode: LioOpcode)
- -> Self
- {
- self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio,
- sigev_notify, opcode));
- self
- }
+ let mut rbuf = [];
+ let aior = AioRead::new(666, 0, &mut rbuf, 0, sev);
+ assert_eq!(
+ aior.as_ref() as *const libc::aiocb,
+ &aior as *const AioRead as *const libc::aiocb
+ );
- /// Finalize this [`LioCb`].
- ///
- /// Afterwards it will be possible to issue the operations with
- /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new
- /// operations with [`LioCbBuilder::emplace_slice`] or
- /// [`LioCbBuilder::emplace_mut_slice`].
- ///
- /// [`LioCb::listio`]: struct.LioCb.html#method.listio
- /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice
- /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice
- pub fn finish(self) -> LioCb<'a> {
- let len = self.aiocbs.len();
- LioCb {
- aiocbs: self.aiocbs.into(),
- list: Vec::with_capacity(len),
- results: Vec::with_capacity(len)
- }
+ let wbuf = [];
+ let aiow = AioWrite::new(666, 0, &wbuf, 0, sev);
+ assert_eq!(
+ aiow.as_ref() as *const libc::aiocb,
+ &aiow as *const AioWrite as *const libc::aiocb
+ );
}
-}
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg(test)]
-mod t {
- use super::*;
-
- // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on
- // it.
+ #[cfg(target_os = "freebsd")]
#[test]
- fn liocb_is_unpin() {
- use assert_impl::assert_impl;
+ fn casting_vectored() {
+ let sev = SigevNotify::SigevNone;
+
+ let mut rbuf = [];
+ let mut rbufs = [IoSliceMut::new(&mut rbuf)];
+ let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev);
+ assert_eq!(
+ aiorv.as_ref() as *const libc::aiocb,
+ &aiorv as *const AioReadv as *const libc::aiocb
+ );
- assert_impl!(Unpin: LioCb);
+ let wbuf = [];
+ let wbufs = [IoSlice::new(&wbuf)];
+ let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev);
+ assert_eq!(
+ aiowv.as_ref() as *const libc::aiocb,
+ &aiowv as *const AioWritev as *const libc::aiocb
+ );
}
}
diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs
index 80cd053f..ca35b5f8 100644
--- a/test/sys/test_aio.rs
+++ b/test/sys/test_aio.rs
@@ -1,415 +1,525 @@
-use libc::{c_int, c_void};
-use nix::Result;
-use nix::errno::*;
-use nix::sys::aio::*;
-use nix::sys::signal::{SaFlags, SigAction, sigaction, SigevNotify, SigHandler, Signal, SigSet};
-use nix::sys::time::{TimeSpec, TimeValLike};
-use std::io::{Write, Read, Seek, SeekFrom};
-use std::ops::Deref;
-use std::os::unix::io::AsRawFd;
-use std::pin::Pin;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::{thread, time};
+use std::{
+ io::{Read, Seek, SeekFrom, Write},
+ ops::Deref,
+ os::unix::io::AsRawFd,
+ pin::Pin,
+ sync::atomic::{AtomicBool, Ordering},
+ thread,
+ time,
+};
+
+use libc::c_int;
+use nix::{
+ errno::*,
+ sys::{
+ aio::*,
+ signal::{
+ sigaction,
+ SaFlags,
+ SigAction,
+ SigHandler,
+ SigSet,
+ SigevNotify,
+ Signal,
+ },
+ time::{TimeSpec, TimeValLike},
+ },
+};
use tempfile::tempfile;
-// Helper that polls an AioCb for completion or error
-fn poll_aio(aiocb: &mut Pin<Box<AioCb>>) -> Result<()> {
- loop {
- let err = aiocb.error();
- if err != Err(Errno::EINPROGRESS) { return err; };
- thread::sleep(time::Duration::from_millis(10));
- }
+lazy_static! {
+ pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
}
-// Helper that polls a component of an LioCb for completion or error
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-fn poll_lio(liocb: &mut LioCb, i: usize) -> Result<()> {
- loop {
- let err = liocb.error(i);
- if err != Err(Errno::EINPROGRESS) { return err; };
- thread::sleep(time::Duration::from_millis(10));
- }
+extern "C" fn sigfunc(_: c_int) {
+ SIGNALED.store(true, Ordering::Relaxed);
}
-#[test]
-fn test_accessors() {
- let mut rbuf = vec![0; 4];
- let aiocb = AioCb::from_mut_slice( 1001,
- 2, //offset
- &mut rbuf,
- 42, //priority
- SigevNotify::SigevSignal {
- signal: Signal::SIGUSR2,
- si_value: 99
- },
- LioOpcode::LIO_NOP);
- assert_eq!(1001, aiocb.fd());
- assert_eq!(Some(LioOpcode::LIO_NOP), aiocb.lio_opcode());
- assert_eq!(4, aiocb.nbytes());
- assert_eq!(2, aiocb.offset());
- assert_eq!(42, aiocb.priority());
- let sev = aiocb.sigevent().sigevent();
- assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
- assert_eq!(99, sev.sigev_value.sival_ptr as i64);
+// Helper that polls an AioCb for completion or error
+macro_rules! poll_aio {
+ ($aiocb: expr) => {
+ loop {
+ let err = $aiocb.as_mut().error();
+ if err != Err(Errno::EINPROGRESS) {
+ break err;
+ };
+ thread::sleep(time::Duration::from_millis(10));
+ }
+ };
}
-// Tests AioCb.cancel. We aren't trying to test the OS's implementation, only
-// our bindings. So it's sufficient to check that AioCb.cancel returned any
-// AioCancelStat value.
-#[test]
-#[cfg_attr(target_env = "musl", ignore)]
-fn test_cancel() {
- let wbuf: &[u8] = b"CDEF";
-
- let f = tempfile().unwrap();
- let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- 0, //offset
- wbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.write().unwrap();
- let err = aiocb.error();
- assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
+mod aio_fsync {
+ use super::*;
+
+ #[test]
+ fn test_accessors() {
+ let aiocb = AioFsync::new(
+ 1001,
+ AioFsyncMode::O_SYNC,
+ 42,
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 99,
+ },
+ );
+ assert_eq!(1001, aiocb.fd());
+ assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode());
+ assert_eq!(42, aiocb.priority());
+ let sev = aiocb.sigevent().sigevent();
+ assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
+ assert_eq!(99, sev.sigev_value.sival_ptr as i64);
+ }
- let cancelstat = aiocb.cancel();
- assert!(cancelstat.is_ok());
+ /// `AioFsync::submit` should not modify the `AioCb` object if
+ /// `libc::aio_fsync` returns an error
+ // Skip on Linux, because Linux's AIO implementation can't detect errors
+ // synchronously
+ #[test]
+ #[cfg(any(target_os = "freebsd", target_os = "macos"))]
+ fn error() {
+ use std::mem;
+
+ const INITIAL: &[u8] = b"abcdef123456";
+ // Create an invalid AioFsyncMode
+ let mode = unsafe { mem::transmute(666) };
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let mut aiof = Box::pin(AioFsync::new(
+ f.as_raw_fd(),
+ mode,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ let err = aiof.as_mut().submit();
+ assert!(err.is_err());
+ }
- // Wait for aiocb to complete, but don't care whether it succeeded
- let _ = poll_aio(&mut aiocb);
- let _ = aiocb.aio_return();
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn ok() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let fd = f.as_raw_fd();
+ let mut aiof = Box::pin(AioFsync::new(
+ fd,
+ AioFsyncMode::O_SYNC,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ let err = aiof.as_mut().submit();
+ assert!(err.is_ok());
+ poll_aio!(&mut aiof).unwrap();
+ aiof.as_mut().aio_return().unwrap();
+ }
}
-// Tests using aio_cancel_all for all outstanding IOs.
-#[test]
-#[cfg_attr(target_env = "musl", ignore)]
-fn test_aio_cancel_all() {
- let wbuf: &[u8] = b"CDEF";
-
- let f = tempfile().unwrap();
- let mut aiocb = AioCb::from_slice(f.as_raw_fd(),
- 0, //offset
- wbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.write().unwrap();
- let err = aiocb.error();
- assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
+mod aio_read {
+ use super::*;
+
+ #[test]
+ fn test_accessors() {
+ let mut rbuf = vec![0; 4];
+ let aiocb = AioRead::new(
+ 1001,
+ 2, //offset
+ &mut rbuf,
+ 42, //priority
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 99,
+ },
+ );
+ assert_eq!(1001, aiocb.fd());
+ assert_eq!(4, aiocb.nbytes());
+ assert_eq!(2, aiocb.offset());
+ assert_eq!(42, aiocb.priority());
+ let sev = aiocb.sigevent().sigevent();
+ assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
+ assert_eq!(99, sev.sigev_value.sival_ptr as i64);
+ }
- let cancelstat = aio_cancel_all(f.as_raw_fd());
- assert!(cancelstat.is_ok());
+ // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
+ // only our bindings. So it's sufficient to check that cancel
+ // returned any AioCancelStat value.
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn cancel() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut rbuf = vec![0; 4];
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let fd = f.as_raw_fd();
+ let mut aior =
+ Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone));
+ aior.as_mut().submit().unwrap();
+
+ let cancelstat = aior.as_mut().cancel();
+ assert!(cancelstat.is_ok());
+
+ // Wait for aiow to complete, but don't care whether it succeeded
+ let _ = poll_aio!(&mut aior);
+ let _ = aior.as_mut().aio_return();
+ }
- // Wait for aiocb to complete, but don't care whether it succeeded
- let _ = poll_aio(&mut aiocb);
- let _ = aiocb.aio_return();
-}
+ /// `AioRead::submit` should not modify the `AioCb` object if
+ /// `libc::aio_read` returns an error
+ // Skip on Linux, because Linux's AIO implementation can't detect errors
+ // synchronously
+ #[test]
+ #[cfg(any(target_os = "freebsd", target_os = "macos"))]
+ fn error() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut rbuf = vec![0; 4];
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let mut aior = Box::pin(AioRead::new(
+ f.as_raw_fd(),
+ -1, //an invalid offset
+ &mut rbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ ));
+ assert!(aior.as_mut().submit().is_err());
+ }
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_fsync() {
- const INITIAL: &[u8] = b"abcdef123456";
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_fd( f.as_raw_fd(),
- 0, //priority
- SigevNotify::SigevNone);
- let err = aiocb.fsync(AioFsyncMode::O_SYNC);
- assert!(err.is_ok());
- poll_aio(&mut aiocb).unwrap();
- aiocb.aio_return().unwrap();
-}
+ // Test a simple aio operation with no completion notification. We must
+ // poll for completion
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn ok() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut rbuf = vec![0; 4];
+ const EXPECT: &[u8] = b"cdef";
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ {
+ let fd = f.as_raw_fd();
+ let mut aior = Box::pin(AioRead::new(
+ fd,
+ 2,
+ &mut rbuf,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ aior.as_mut().submit().unwrap();
-/// `AioCb::fsync` should not modify the `AioCb` object if `libc::aio_fsync` returns
-/// an error
-// Skip on Linux, because Linux's AIO implementation can't detect errors
-// synchronously
-#[test]
-#[cfg(any(target_os = "freebsd", target_os = "macos"))]
-fn test_fsync_error() {
- use std::mem;
+ let err = poll_aio!(&mut aior);
+ assert_eq!(err, Ok(()));
+ assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
+ }
+ assert_eq!(EXPECT, rbuf.deref().deref());
+ }
- const INITIAL: &[u8] = b"abcdef123456";
- // Create an invalid AioFsyncMode
- let mode = unsafe { mem::transmute(666) };
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_fd( f.as_raw_fd(),
- 0, //priority
- SigevNotify::SigevNone);
- let err = aiocb.fsync(mode);
- assert!(err.is_err());
+ // Like ok, but allocates the structure on the stack.
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn on_stack() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut rbuf = vec![0; 4];
+ const EXPECT: &[u8] = b"cdef";
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ {
+ let fd = f.as_raw_fd();
+ let mut aior =
+ AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone);
+ let mut aior = unsafe { Pin::new_unchecked(&mut aior) };
+ aior.as_mut().submit().unwrap();
+
+ let err = poll_aio!(&mut aior);
+ assert_eq!(err, Ok(()));
+ assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
+ }
+ assert_eq!(EXPECT, rbuf.deref().deref());
+ }
}
-#[test]
-// On Cirrus on Linux, this test fails due to a glibc bug.
-// https://github.com/nix-rust/nix/issues/1099
-#[cfg_attr(target_os = "linux", ignore)]
-// On Cirrus, aio_suspend is failing with EINVAL
-// https://github.com/nix-rust/nix/issues/1361
-#[cfg_attr(target_os = "macos", ignore)]
-fn test_aio_suspend() {
- const INITIAL: &[u8] = b"abcdef123456";
- const WBUF: &[u8] = b"CDEFG";
- let timeout = TimeSpec::seconds(10);
- let mut rbuf = vec![0; 4];
- let rlen = rbuf.len();
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
+#[cfg(target_os = "freebsd")]
+#[cfg(fbsd14)]
+mod aio_readv {
+ use std::io::IoSliceMut;
+
+ use super::*;
+
+ #[test]
+ fn test_accessors() {
+ let mut rbuf0 = vec![0; 4];
+ let mut rbuf1 = vec![0; 8];
+ let mut rbufs =
+ [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
+ let aiocb = AioReadv::new(
+ 1001,
+ 2, //offset
+ &mut rbufs,
+ 42, //priority
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 99,
+ },
+ );
+ assert_eq!(1001, aiocb.fd());
+ assert_eq!(2, aiocb.iovlen());
+ assert_eq!(2, aiocb.offset());
+ assert_eq!(42, aiocb.priority());
+ let sev = aiocb.sigevent().sigevent();
+ assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
+ assert_eq!(99, sev.sigev_value.sival_ptr as i64);
+ }
- let mut wcb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_WRITE);
-
- let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
- 8, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ);
- wcb.write().unwrap();
- rcb.read().unwrap();
- loop {
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn ok() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let mut rbuf0 = vec![0; 4];
+ let mut rbuf1 = vec![0; 2];
+ let mut rbufs =
+ [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
+ const EXPECT0: &[u8] = b"cdef";
+ const EXPECT1: &[u8] = b"12";
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
{
- let cbbuf = [wcb.as_ref(), rcb.as_ref()];
- let r = aio_suspend(&cbbuf[..], Some(timeout));
- match r {
- Err(Errno::EINTR) => continue,
- Err(e) => panic!("aio_suspend returned {:?}", e),
- Ok(_) => ()
- };
- }
- if rcb.error() != Err(Errno::EINPROGRESS) &&
- wcb.error() != Err(Errno::EINPROGRESS) {
- break
+ let fd = f.as_raw_fd();
+ let mut aior = Box::pin(AioReadv::new(
+ fd,
+ 2,
+ &mut rbufs,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ aior.as_mut().submit().unwrap();
+
+ let err = poll_aio!(&mut aior);
+ assert_eq!(err, Ok(()));
+ assert_eq!(
+ aior.as_mut().aio_return().unwrap(),
+ EXPECT0.len() + EXPECT1.len()
+ );
}
+ assert_eq!(&EXPECT0, &rbuf0);
+ assert_eq!(&EXPECT1, &rbuf1);
}
-
- assert_eq!(wcb.aio_return().unwrap() as usize, WBUF.len());
- assert_eq!(rcb.aio_return().unwrap() as usize, rlen);
}
-// Test a simple aio operation with no completion notification. We must poll
-// for completion
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_read() {
- const INITIAL: &[u8] = b"abcdef123456";
- let mut rbuf = vec![0; 4];
- const EXPECT: &[u8] = b"cdef";
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- {
- let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
- 2, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.read().unwrap();
-
- let err = poll_aio(&mut aiocb);
- assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
+mod aio_write {
+ use super::*;
+
+ #[test]
+ fn test_accessors() {
+ let wbuf = vec![0; 4];
+ let aiocb = AioWrite::new(
+ 1001,
+ 2, //offset
+ &wbuf,
+ 42, //priority
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 99,
+ },
+ );
+ assert_eq!(1001, aiocb.fd());
+ assert_eq!(4, aiocb.nbytes());
+ assert_eq!(2, aiocb.offset());
+ assert_eq!(42, aiocb.priority());
+ let sev = aiocb.sigevent().sigevent();
+ assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
+ assert_eq!(99, sev.sigev_value.sival_ptr as i64);
}
- assert_eq!(EXPECT, rbuf.deref().deref());
-}
+ // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
+ // only our bindings. So it's sufficient to check that cancel
+ // returned any AioCancelStat value.
+ #[test]
+ #[cfg_attr(target_env = "musl", ignore)]
+ fn cancel() {
+ let wbuf: &[u8] = b"CDEF";
-/// `AioCb::read` should not modify the `AioCb` object if `libc::aio_read`
-/// returns an error
-// Skip on Linux, because Linux's AIO implementation can't detect errors
-// synchronously
-#[test]
-#[cfg(any(target_os = "freebsd", target_os = "macos"))]
-fn test_read_error() {
- const INITIAL: &[u8] = b"abcdef123456";
- let mut rbuf = vec![0; 4];
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
- -1, //an invalid offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- assert!(aiocb.read().is_err());
-}
+ let f = tempfile().unwrap();
+ let mut aiow = Box::pin(AioWrite::new(
+ f.as_raw_fd(),
+ 0,
+ wbuf,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ aiow.as_mut().submit().unwrap();
+ let err = aiow.as_mut().error();
+ assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
-// Tests from_mut_slice
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_read_into_mut_slice() {
- const INITIAL: &[u8] = b"abcdef123456";
- let mut rbuf = vec![0; 4];
- const EXPECT: &[u8] = b"cdef";
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- {
- let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
- 2, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.read().unwrap();
-
- let err = poll_aio(&mut aiocb);
- assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
+ let cancelstat = aiow.as_mut().cancel();
+ assert!(cancelstat.is_ok());
+
+ // Wait for aiow to complete, but don't care whether it succeeded
+ let _ = poll_aio!(&mut aiow);
+ let _ = aiow.as_mut().aio_return();
}
- assert_eq!(rbuf, EXPECT);
-}
+ // Test a simple aio operation with no completion notification. We must
+ // poll for completion.
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn ok() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let wbuf = "CDEF".to_string().into_bytes();
+ let mut rbuf = Vec::new();
+ const EXPECT: &[u8] = b"abCDEF123456";
+
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let mut aiow = Box::pin(AioWrite::new(
+ f.as_raw_fd(),
+ 2,
+ &wbuf,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ aiow.as_mut().submit().unwrap();
-// Tests from_ptr
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_read_into_pointer() {
- const INITIAL: &[u8] = b"abcdef123456";
- let mut rbuf = vec![0; 4];
- const EXPECT: &[u8] = b"cdef";
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- {
- // Safety: ok because rbuf lives until after poll_aio
- let mut aiocb = unsafe {
- AioCb::from_mut_ptr( f.as_raw_fd(),
- 2, //offset
- rbuf.as_mut_ptr() as *mut c_void,
- rbuf.len(),
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP)
- };
- aiocb.read().unwrap();
-
- let err = poll_aio(&mut aiocb);
+ let err = poll_aio!(&mut aiow);
assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
- }
+ assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
- assert_eq!(rbuf, EXPECT);
-}
-
-// Test reading into an immutable buffer. It should fail
-// FIXME: This test fails to panic on Linux/musl
-#[test]
-#[should_panic(expected = "Can't read into an immutable buffer")]
-#[cfg_attr(target_env = "musl", ignore)]
-fn test_read_immutable_buffer() {
- let rbuf: &[u8] = b"CDEF";
- let f = tempfile().unwrap();
- let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.read().unwrap();
-}
+ f.seek(SeekFrom::Start(0)).unwrap();
+ let len = f.read_to_end(&mut rbuf).unwrap();
+ assert_eq!(len, EXPECT.len());
+ assert_eq!(rbuf, EXPECT);
+ }
+ // Like ok, but allocates the structure on the stack.
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn on_stack() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let wbuf = "CDEF".to_string().into_bytes();
+ let mut rbuf = Vec::new();
+ const EXPECT: &[u8] = b"abCDEF123456";
+
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let mut aiow = AioWrite::new(
+ f.as_raw_fd(),
+ 2, //offset
+ &wbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ );
+ let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) };
+ aiow.as_mut().submit().unwrap();
-// Test a simple aio operation with no completion notification. We must poll
-// for completion. Unlike test_aio_read, this test uses AioCb::from_slice
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_write() {
- const INITIAL: &[u8] = b"abcdef123456";
- let wbuf = "CDEF".to_string().into_bytes();
- let mut rbuf = Vec::new();
- const EXPECT: &[u8] = b"abCDEF123456";
+ let err = poll_aio!(&mut aiow);
+ assert_eq!(err, Ok(()));
+ assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- &wbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.write().unwrap();
-
- let err = poll_aio(&mut aiocb);
- assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len());
+ f.seek(SeekFrom::Start(0)).unwrap();
+ let len = f.read_to_end(&mut rbuf).unwrap();
+ assert_eq!(len, EXPECT.len());
+ assert_eq!(rbuf, EXPECT);
+ }
- f.seek(SeekFrom::Start(0)).unwrap();
- let len = f.read_to_end(&mut rbuf).unwrap();
- assert_eq!(len, EXPECT.len());
- assert_eq!(rbuf, EXPECT);
+ /// `AioWrite::write` should not modify the `AioCb` object if
+ /// `libc::aio_write` returns an error.
+ // Skip on Linux, because Linux's AIO implementation can't detect errors
+ // synchronously
+ #[test]
+ #[cfg(any(target_os = "freebsd", target_os = "macos"))]
+ fn error() {
+ let wbuf = "CDEF".to_string().into_bytes();
+ let mut aiow = Box::pin(AioWrite::new(
+ 666, // An invalid file descriptor
+ 0, //offset
+ &wbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ ));
+ assert!(aiow.as_mut().submit().is_err());
+ // Dropping the AioWrite at this point should not panic
+ }
}
-// Tests `AioCb::from_ptr`
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_write_from_pointer() {
- const INITIAL: &[u8] = b"abcdef123456";
- let wbuf = "CDEF".to_string().into_bytes();
- let mut rbuf = Vec::new();
- const EXPECT: &[u8] = b"abCDEF123456";
-
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
- // Safety: ok because aiocb outlives poll_aio
- let mut aiocb = unsafe {
- AioCb::from_ptr( f.as_raw_fd(),
- 2, //offset
- wbuf.as_ptr() as *const c_void,
- wbuf.len(),
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP)
- };
- aiocb.write().unwrap();
-
- let err = poll_aio(&mut aiocb);
- assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len());
-
- f.seek(SeekFrom::Start(0)).unwrap();
- let len = f.read_to_end(&mut rbuf).unwrap();
- assert_eq!(len, EXPECT.len());
- assert_eq!(rbuf, EXPECT);
-}
+#[cfg(target_os = "freebsd")]
+#[cfg(fbsd14)]
+mod aio_writev {
+ use std::io::IoSlice;
+
+ use super::*;
+
+ #[test]
+ fn test_accessors() {
+ let wbuf0 = vec![0; 4];
+ let wbuf1 = vec![0; 8];
+ let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)];
+ let aiocb = AioWritev::new(
+ 1001,
+ 2, //offset
+ &wbufs,
+ 42, //priority
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 99,
+ },
+ );
+ assert_eq!(1001, aiocb.fd());
+ assert_eq!(2, aiocb.iovlen());
+ assert_eq!(2, aiocb.offset());
+ assert_eq!(42, aiocb.priority());
+ let sev = aiocb.sigevent().sigevent();
+ assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
+ assert_eq!(99, sev.sigev_value.sival_ptr as i64);
+ }
-/// `AioCb::write` should not modify the `AioCb` object if `libc::aio_write`
-/// returns an error
-// Skip on Linux, because Linux's AIO implementation can't detect errors
-// synchronously
-#[test]
-#[cfg(any(target_os = "freebsd", target_os = "macos"))]
-fn test_write_error() {
- let wbuf = "CDEF".to_string().into_bytes();
- let mut aiocb = AioCb::from_slice( 666, // An invalid file descriptor
- 0, //offset
- &wbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- assert!(aiocb.write().is_err());
-}
+ // Test a simple aio operation with no completion notification. We must
+ // poll for completion.
+ #[test]
+ #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
+ fn ok() {
+ const INITIAL: &[u8] = b"abcdef123456";
+ let wbuf0 = b"BC";
+ let wbuf1 = b"DEF";
+ let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
+ let wlen = wbuf0.len() + wbuf1.len();
+ let mut rbuf = Vec::new();
+ const EXPECT: &[u8] = b"aBCDEF123456";
+
+ let mut f = tempfile().unwrap();
+ f.write_all(INITIAL).unwrap();
+ let mut aiow = Box::pin(AioWritev::new(
+ f.as_raw_fd(),
+ 1,
+ &wbufs,
+ 0,
+ SigevNotify::SigevNone,
+ ));
+ aiow.as_mut().submit().unwrap();
-lazy_static! {
- pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
-}
+ let err = poll_aio!(&mut aiow);
+ assert_eq!(err, Ok(()));
+ assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen);
-extern fn sigfunc(_: c_int) {
- SIGNALED.store(true, Ordering::Relaxed);
+ f.seek(SeekFrom::Start(0)).unwrap();
+ let len = f.read_to_end(&mut rbuf).unwrap();
+ assert_eq!(len, EXPECT.len());
+ assert_eq!(rbuf, EXPECT);
+ }
}
// Test an aio operation with completion delivered by a signal
-// FIXME: This test is ignored on mips because of failures in qemu in CI
#[test]
-#[cfg_attr(any(all(target_env = "musl", target_arch = "x86_64"), target_arch = "mips", target_arch = "mips64"), ignore)]
-fn test_write_sigev_signal() {
+#[cfg_attr(
+ any(
+ all(target_env = "musl", target_arch = "x86_64"),
+ target_arch = "mips",
+ target_arch = "mips64"
+ ),
+ ignore
+)]
+fn sigev_signal() {
let _m = crate::SIGNAL_MTX.lock();
- let sa = SigAction::new(SigHandler::Handler(sigfunc),
- SaFlags::SA_RESETHAND,
- SigSet::empty());
+ let sa = SigAction::new(
+ SigHandler::Handler(sigfunc),
+ SaFlags::SA_RESETHAND,
+ SigSet::empty(),
+ );
SIGNALED.store(false, Ordering::Relaxed);
unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
@@ -420,201 +530,107 @@ fn test_write_sigev_signal() {
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevSignal {
- signal: Signal::SIGUSR2,
- si_value: 0 //TODO: validate in sigfunc
- },
- LioOpcode::LIO_NOP);
- aiocb.write().unwrap();
+ let mut aiow = Box::pin(AioWrite::new(
+ f.as_raw_fd(),
+ 2, //offset
+ WBUF,
+ 0, //priority
+ SigevNotify::SigevSignal {
+ signal: Signal::SIGUSR2,
+ si_value: 0, //TODO: validate in sigfunc
+ },
+ ));
+ aiow.as_mut().submit().unwrap();
while !SIGNALED.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
}
- assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
+ assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf, EXPECT);
}
-// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the
-// time listio returns.
-#[test]
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_liocb_listio_wait() {
- const INITIAL: &[u8] = b"abcdef123456";
- const WBUF: &[u8] = b"CDEF";
- let mut rbuf = vec![0; 4];
- let rlen = rbuf.len();
- let mut rbuf2 = Vec::new();
- const EXPECT: &[u8] = b"abCDEF123456";
- let mut f = tempfile().unwrap();
-
- f.write_all(INITIAL).unwrap();
-
- {
- let mut liocb = LioCbBuilder::with_capacity(2)
- .emplace_slice(
- f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_WRITE
- ).emplace_mut_slice(
- f.as_raw_fd(),
- 8, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ
- ).finish();
- let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
- err.expect("lio_listio");
-
- assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen);
- }
- assert_eq!(rbuf.deref().deref(), b"3456");
-
- f.seek(SeekFrom::Start(0)).unwrap();
- let len = f.read_to_end(&mut rbuf2).unwrap();
- assert_eq!(len, EXPECT.len());
- assert_eq!(rbuf2, EXPECT);
-}
-
-// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other
-// mechanism to check for the individual AioCb's completion.
+// Tests using aio_cancel_all for all outstanding IOs.
#[test]
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_liocb_listio_nowait() {
- const INITIAL: &[u8] = b"abcdef123456";
- const WBUF: &[u8] = b"CDEF";
- let mut rbuf = vec![0; 4];
- let rlen = rbuf.len();
- let mut rbuf2 = Vec::new();
- const EXPECT: &[u8] = b"abCDEF123456";
- let mut f = tempfile().unwrap();
+#[cfg_attr(target_env = "musl", ignore)]
+fn test_aio_cancel_all() {
+ let wbuf: &[u8] = b"CDEF";
- f.write_all(INITIAL).unwrap();
+ let f = tempfile().unwrap();
+ let mut aiocb = Box::pin(AioWrite::new(
+ f.as_raw_fd(),
+ 0, //offset
+ wbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ ));
+ aiocb.as_mut().submit().unwrap();
+ let err = aiocb.as_mut().error();
+ assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
- {
- let mut liocb = LioCbBuilder::with_capacity(2)
- .emplace_slice(
- f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_WRITE
- ).emplace_mut_slice(
- f.as_raw_fd(),
- 8, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ
- ).finish();
- let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
- err.expect("lio_listio");
-
- poll_lio(&mut liocb, 0).unwrap();
- poll_lio(&mut liocb, 1).unwrap();
- assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen);
- }
- assert_eq!(rbuf.deref().deref(), b"3456");
+ let cancelstat = aio_cancel_all(f.as_raw_fd());
+ assert!(cancelstat.is_ok());
- f.seek(SeekFrom::Start(0)).unwrap();
- let len = f.read_to_end(&mut rbuf2).unwrap();
- assert_eq!(len, EXPECT.len());
- assert_eq!(rbuf2, EXPECT);
+ // Wait for aiocb to complete, but don't care whether it succeeded
+ let _ = poll_aio!(&mut aiocb);
+ let _ = aiocb.as_mut().aio_return();
}
-// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all
-// AioCb's are complete.
-// FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI.
#[test]
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)]
-fn test_liocb_listio_signal() {
- let _m = crate::SIGNAL_MTX.lock();
+// On Cirrus on Linux, this test fails due to a glibc bug.
+// https://github.com/nix-rust/nix/issues/1099
+#[cfg_attr(target_os = "linux", ignore)]
+// On Cirrus, aio_suspend is failing with EINVAL
+// https://github.com/nix-rust/nix/issues/1361
+#[cfg_attr(target_os = "macos", ignore)]
+fn test_aio_suspend() {
const INITIAL: &[u8] = b"abcdef123456";
- const WBUF: &[u8] = b"CDEF";
+ const WBUF: &[u8] = b"CDEFG";
+ let timeout = TimeSpec::seconds(10);
let mut rbuf = vec![0; 4];
let rlen = rbuf.len();
- let mut rbuf2 = Vec::new();
- const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
- let sa = SigAction::new(SigHandler::Handler(sigfunc),
- SaFlags::SA_RESETHAND,
- SigSet::empty());
- let sigev_notify = SigevNotify::SigevSignal { signal: Signal::SIGUSR2,
- si_value: 0 };
-
f.write_all(INITIAL).unwrap();
- {
- let mut liocb = LioCbBuilder::with_capacity(2)
- .emplace_slice(
- f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_WRITE
- ).emplace_mut_slice(
- f.as_raw_fd(),
- 8, //offset
- &mut rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ
- ).finish();
- SIGNALED.store(false, Ordering::Relaxed);
- unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
- let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify);
- err.expect("lio_listio");
- while !SIGNALED.load(Ordering::Relaxed) {
- thread::sleep(time::Duration::from_millis(10));
+ let mut wcb = Box::pin(AioWrite::new(
+ f.as_raw_fd(),
+ 2, //offset
+ WBUF,
+ 0, //priority
+ SigevNotify::SigevNone,
+ ));
+
+ let mut rcb = Box::pin(AioRead::new(
+ f.as_raw_fd(),
+ 8, //offset
+ &mut rbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ ));
+ wcb.as_mut().submit().unwrap();
+ rcb.as_mut().submit().unwrap();
+ loop {
+ {
+ let cbbuf = [
+ &*wcb as &dyn AsRef<libc::aiocb>,
+ &*rcb as &dyn AsRef<libc::aiocb>,
+ ];
+ let r = aio_suspend(&cbbuf[..], Some(timeout));
+ match r {
+ Err(Errno::EINTR) => continue,
+ Err(e) => panic!("aio_suspend returned {:?}", e),
+ Ok(_) => (),
+ };
+ }
+ if rcb.as_mut().error() != Err(Errno::EINPROGRESS) &&
+ wcb.as_mut().error() != Err(Errno::EINPROGRESS)
+ {
+ break;
}
-
- assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
- assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen);
}
- assert_eq!(rbuf.deref().deref(), b"3456");
-
- f.seek(SeekFrom::Start(0)).unwrap();
- let len = f.read_to_end(&mut rbuf2).unwrap();
- assert_eq!(len, EXPECT.len());
- assert_eq!(rbuf2, EXPECT);
-}
-// Try to use LioCb::listio to read into an immutable buffer. It should fail
-// FIXME: This test fails to panic on Linux/musl
-#[test]
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-#[should_panic(expected = "Can't read into an immutable buffer")]
-#[cfg_attr(target_env = "musl", ignore)]
-fn test_liocb_listio_read_immutable() {
- let rbuf: &[u8] = b"abcd";
- let f = tempfile().unwrap();
-
-
- let mut liocb = LioCbBuilder::with_capacity(1)
- .emplace_slice(
- f.as_raw_fd(),
- 2, //offset
- rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ
- ).finish();
- let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
+ assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len());
+ assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen);
}
diff --git a/test/sys/test_aio_drop.rs b/test/sys/test_aio_drop.rs
index f9ff97af..0836a542 100644
--- a/test/sys/test_aio_drop.rs
+++ b/test/sys/test_aio_drop.rs
@@ -20,11 +20,10 @@ fn test_drop() {
let f = tempfile().unwrap();
f.set_len(6).unwrap();
- let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- WBUF,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.write().unwrap();
+ let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(),
+ 2, //offset
+ WBUF,
+ 0, //priority
+ SigevNotify::SigevNone));
+ aiocb.as_mut().submit().unwrap();
}
diff --git a/test/sys/test_lio_listio_resubmit.rs b/test/sys/test_lio_listio_resubmit.rs
deleted file mode 100644
index 2ed058c2..00000000
--- a/test/sys/test_lio_listio_resubmit.rs
+++ /dev/null
@@ -1,106 +0,0 @@
-// vim: tw=80
-
-// Annoyingly, Cargo is unable to conditionally build an entire test binary. So
-// we must disable the test here rather than in Cargo.toml
-#![cfg(target_os = "freebsd")]
-
-use nix::errno::*;
-use nix::libc::off_t;
-use nix::sys::aio::*;
-use nix::sys::signal::SigevNotify;
-use nix::unistd::{SysconfVar, sysconf};
-use std::os::unix::io::AsRawFd;
-use std::{thread, time};
-use sysctl::{CtlValue, Sysctl};
-use tempfile::tempfile;
-
-const BYTES_PER_OP: usize = 512;
-
-/// Attempt to collect final status for all of `liocb`'s operations, freeing
-/// system resources
-fn finish_liocb(liocb: &mut LioCb) {
- for j in 0..liocb.len() {
- loop {
- let e = liocb.error(j);
- match e {
- Ok(()) => break,
- Err(Errno::EINPROGRESS) =>
- thread::sleep(time::Duration::from_millis(10)),
- Err(x) => panic!("aio_error({:?})", x)
- }
- }
- assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize);
- }
-}
-
-// Deliberately exceed system resource limits, causing lio_listio to return EIO.
-// This test must run in its own process since it deliberately uses all AIO
-// resources. ATM it is only enabled on FreeBSD, because I don't know how to
-// check system AIO limits on other operating systems.
-#[test]
-fn test_lio_listio_resubmit() {
- let mut resubmit_count = 0;
-
- // Lookup system resource limits
- let alm = sysconf(SysconfVar::AIO_LISTIO_MAX)
- .expect("sysconf").unwrap() as usize;
- let ctl = sysctl::Ctl::new("vfs.aio.max_aio_queue_per_proc").unwrap();
- let maqpp = if let CtlValue::Int(x) = ctl.value().unwrap() {
- x as usize
- } else {
- panic!("unknown sysctl");
- };
-
- // Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also
- // result in a final lio_listio call that can only partially be queued
- let target_ops = maqpp + alm / 2;
- let num_listios = (target_ops + alm - 3) / (alm - 2);
- let ops_per_listio = (target_ops + num_listios - 1) / num_listios;
- assert!((num_listios - 1) * ops_per_listio < maqpp,
- "the last lio_listio won't make any progress; fix the algorithm");
- println!("Using {:?} LioCbs of {:?} operations apiece", num_listios,
- ops_per_listio);
-
- let f = tempfile().unwrap();
- let buffer_set = (0..num_listios).map(|_| {
- (0..ops_per_listio).map(|_| {
- vec![0u8; BYTES_PER_OP]
- }).collect::<Vec<_>>()
- }).collect::<Vec<_>>();
-
- let mut liocbs = (0..num_listios).map(|i| {
- let mut builder = LioCbBuilder::with_capacity(ops_per_listio);
- for j in 0..ops_per_listio {
- let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t;
- builder = builder.emplace_slice(f.as_raw_fd(),
- offset,
- &buffer_set[i][j][..],
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_WRITE);
- }
- let mut liocb = builder.finish();
- let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
- while err == Err(Errno::EIO) ||
- err == Err(Errno::EAGAIN) ||
- err == Err(Errno::EINTR) {
- //
- thread::sleep(time::Duration::from_millis(10));
- resubmit_count += 1;
- err = liocb.listio_resubmit(LioMode::LIO_NOWAIT,
- SigevNotify::SigevNone);
- }
- liocb
- }).collect::<Vec<_>>();
-
- // Ensure that every AioCb completed
- for liocb in liocbs.iter_mut() {
- finish_liocb(liocb);
- }
-
- if resubmit_count > 0 {
- println!("Resubmitted {:?} times, test passed", resubmit_count);
- } else {
- println!("Never resubmitted. Test ambiguous");
- }
-}