diff options
-rw-r--r-- | .cirrus.yml | 13 | ||||
-rw-r--r-- | CHANGELOG.md | 16 | ||||
-rw-r--r-- | Cargo.toml | 9 | ||||
-rw-r--r-- | bors.toml | 3 | ||||
-rw-r--r-- | src/fcntl.rs | 8 | ||||
-rw-r--r-- | src/sys/aio.rs | 1801 | ||||
-rw-r--r-- | test/sys/test_aio.rs | 1086 | ||||
-rw-r--r-- | test/sys/test_aio_drop.rs | 13 | ||||
-rw-r--r-- | test/sys/test_lio_listio_resubmit.rs | 106 |
9 files changed, 1550 insertions, 1505 deletions
diff --git a/.cirrus.yml b/.cirrus.yml index 1045f4f5..231cc38f 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -36,11 +36,18 @@ test: &TEST # 64-bit kernel and in a 64-bit environment. Our tests don't execute any of # the system's binaries, so the environment shouldn't matter. task: - name: FreeBSD amd64 & i686 env: TARGET: x86_64-unknown-freebsd - freebsd_instance: - image: freebsd-12-3-release-amd64 + matrix: + - name: FreeBSD 12 amd64 & i686 + freebsd_instance: + image: freebsd-12-3-release-amd64 + - name: FreeBSD 14 amd64 & i686 + freebsd_instance: + image_family: freebsd-14-0-snap + # Enable tests that would fail on FreeBSD 12 + RUSTFLAGS: --cfg fbsd14 -D warnings + RUSTDOCFLAGS: --cfg fbsd14 setup_script: - kldload mqueuefs - fetch https://sh.rustup.rs -o rustup.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e9baf90..de4d22e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,14 +6,30 @@ This project adheres to [Semantic Versioning](https://semver.org/). ## [Unreleased] - ReleaseDate ### Added +- Added `aio_writev` and `aio_readv`. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + - impl From<SockaddrIn> for std::net::SocketAddrV4 and impl From<SockaddrIn6> for std::net::SocketAddrV6. (#[1711](https://github.com/nix-rust/nix/pull/1711)) ### Changed + +- Rewrote the aio module. The new module: + * Does more type checking at compile time rather than runtime. + * Gives the caller control over whether and when to `Box` an aio operation. + * Changes the type of the `priority` arguments to `i32`. + * Changes the return type of `aio_return` to `usize`. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + ### Fixed ### Removed +- Removed support for resubmitting partially complete `lio_listio` operations. + It was too complicated, and didn't fit Nix's theme of zero-cost abstractions. + Instead, it can be reimplemented downstream. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + ## [0.24.1] - 2022-04-22 ### Added ### Changed @@ -27,9 +27,10 @@ targets = [ ] [dependencies] -libc = { version = "0.2.124", features = [ "extra_traits" ] } +libc = { git = "http://github.com/rust-lang/libc.git", rev = "cd99f681181c310abfba742aef11115d2eff03dc", features = [ "extra_traits" ] } bitflags = "1.1" cfg-if = "1.0" +pin-utils = { version = "0.1.0", optional = true } [target.'cfg(not(target_os = "redox"))'.dependencies] memoffset = { version = "0.6.3", optional = true } @@ -44,7 +45,7 @@ default = [ ] acct = [] -aio = [] +aio = ["pin-utils"] dir = ["fs"] env = [] event = [] @@ -103,10 +104,6 @@ name = "test-clearenv" path = "test/test_clearenv.rs" [[test]] -name = "test-lio-listio-resubmit" -path = "test/sys/test_lio_listio_resubmit.rs" - -[[test]] name = "test-mount" path = "test/test_mount.rs" harness = false @@ -5,7 +5,8 @@ status = [ "Android i686", "Android x86_64", "DragonFly BSD x86_64", - "FreeBSD amd64 & i686", + "FreeBSD 12 amd64 & i686", + "FreeBSD 14 amd64 & i686", "Fuchsia x86_64", "Linux MIPS", "Linux MIPS64 el", diff --git a/src/fcntl.rs b/src/fcntl.rs index fa64c8ea..5272c809 100644 --- a/src/fcntl.rs +++ b/src/fcntl.rs @@ -742,8 +742,8 @@ impl SpacectlRange { /// /// # Example /// -// no_run because it fails to link until FreeBSD 14.0 -/// ```no_run +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] /// # use std::io::Write; /// # use std::os::unix::fs::FileExt; /// # use std::os::unix::io::AsRawFd; @@ -788,8 +788,8 @@ pub fn fspacectl(fd: RawFd, range: SpacectlRange) -> Result<SpacectlRange> { /// /// # Example /// -// no_run because it fails to link until FreeBSD 14.0 -/// ```no_run +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] /// # use std::io::Write; /// # use std::os::unix::fs::FileExt; /// # use std::os::unix::io::AsRawFd; diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 4780cdee..6ff88469 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -2,9 +2,12 @@ //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like -//! devices. It supports [`read`](struct.AioCb.html#method.read), -//! [`write`](struct.AioCb.html#method.write), and -//! [`fsync`](struct.AioCb.html#method.fsync) operations. Completion +//! devices. It supports [`read`](struct.AioRead.html#method.new), +//! [`write`](struct.AioWrite.html#method.new), +//! [`fsync`](struct.AioFsync.html#method.new), +//! [`readv`](struct.AioReadv.html#method.new), and +//! [`writev`](struct.AioWritev.html#method.new), operations, subject to +//! platform support. Completion //! notifications can optionally be delivered via //! [signals](../signal/enum.SigevNotify.html#variant.SigevSignal), via the //! [`aio_suspend`](fn.aio_suspend.html) function, or via polling. Some @@ -17,23 +20,30 @@ //! that they will be executed atomically. //! //! Outstanding operations may be cancelled with -//! [`cancel`](struct.AioCb.html#method.cancel) or +//! [`cancel`](trait.Aio.html#method.cancel) or //! [`aio_cancel_all`](fn.aio_cancel_all.html), though the operating system may //! not support this for all filesystems and devices. +#[cfg(target_os = "freebsd")] +use std::io::{IoSlice, IoSliceMut}; +use std::{ + convert::TryFrom, + fmt::{self, Debug}, + marker::{PhantomData, PhantomPinned}, + mem, + os::unix::io::RawFd, + pin::Pin, + ptr, + thread, +}; -use crate::Result; -use crate::errno::Errno; -use std::os::unix::io::RawFd; -use libc::{c_void, off_t, size_t}; -use std::fmt; -use std::fmt::Debug; -use std::marker::PhantomData; -use std::mem; -use std::pin::Pin; -use std::ptr::{null, null_mut}; -use crate::sys::signal::*; -use std::thread; -use crate::sys::time::TimeSpec; +use libc::{c_void, off_t}; +use pin_utils::unsafe_pinned; + +use crate::{ + errno::Errno, + sys::{signal::*, time::TimeSpec}, + Result, +}; libc_enum! { /// Mode for `AioCb::fsync`. Controls whether only data or both data and @@ -52,22 +62,7 @@ libc_enum! { #[cfg_attr(docsrs, doc(cfg(all())))] O_DSYNC } -} - -libc_enum! { - /// When used with [`lio_listio`](fn.lio_listio.html), determines whether a - /// given `aiocb` should be used for a read operation, a write operation, or - /// ignored. Has no effect for any other aio functions. - #[repr(i32)] - #[non_exhaustive] - pub enum LioOpcode { - /// No operation - LIO_NOP, - /// Write data as if by a call to [`AioCb::write`] - LIO_WRITE, - /// Write data as if by a call to [`AioCb::read`] - LIO_READ, - } + impl TryFrom<i32> } libc_enum! { @@ -103,354 +98,133 @@ struct LibcAiocb(libc::aiocb); unsafe impl Send for LibcAiocb {} unsafe impl Sync for LibcAiocb {} -/// AIO Control Block. -/// -/// The basic structure used by all aio functions. Each `AioCb` represents one -/// I/O request. -pub struct AioCb<'a> { - aiocb: LibcAiocb, - /// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable - mutable: bool, +/// Base class for all AIO operations. Should only be used directly when +/// checking for completion. +// We could create some kind of AsPinnedMut trait, and implement it for all aio +// ops, allowing the crate's users to get pinned references to `AioCb`. That +// could save some code for things like polling methods. But IMHO it would +// provide polymorphism at the wrong level. Instead, the best place for +// polymorphism is at the level of `Futures`. +#[repr(C)] +struct AioCb { + aiocb: LibcAiocb, /// Could this `AioCb` potentially have any in-kernel state? + // It would be really nice to perform the in-progress check entirely at + // compile time. But I can't figure out how, because: + // * Future::poll takes a `Pin<&mut self>` rather than `self`, and + // * Rust's lack of an equivalent of C++'s Guaranteed Copy Elision means + // that there's no way to write an AioCb constructor that neither boxes + // the object itself, nor moves it during return. in_progress: bool, - _buffer: std::marker::PhantomData<&'a [u8]>, - _pin: std::marker::PhantomPinned } -impl<'a> AioCb<'a> { - /// Returns the underlying file descriptor associated with the `AioCb` - pub fn fd(&self) -> RawFd { - self.aiocb.0.aio_fildes - } +impl AioCb { + pin_utils::unsafe_unpinned!(aiocb: LibcAiocb); - /// Constructs a new `AioCb` with no associated buffer. - /// - /// The resulting `AioCb` structure is suitable for use with `AioCb::fsync`. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio`. - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// - /// # Examples - /// - /// Create an `AioCb` from a raw file descriptor and use it for an - /// [`fsync`](#method.fsync) operation. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify::SigevNone; - /// # use std::{thread, time}; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// let f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_fd( f.as_raw_fd(), 0, SigevNone); - /// aiocb.fsync(AioFsyncMode::O_SYNC).expect("aio_fsync failed early"); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// aiocb.aio_return().expect("aio_fsync failed late"); - /// ``` - pub fn from_fd(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = 0; - a.0.aio_nbytes = 0; - a.0.aio_buf = null_mut(); - - Box::pin(AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn aio_return(mut self: Pin<&mut Self>) -> Result<usize> { + self.in_progress = false; + unsafe { + let p: *mut libc::aiocb = &mut self.aiocb.0; + Errno::result(libc::aio_return(p)) + } + .map(|r| r as usize) } - // Private helper - #[cfg(not(any(target_os = "ios", target_os = "macos")))] - fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> - { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - a.0.aio_buf = buf.as_ptr() as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; + fn cancel(mut self: Pin<&mut Self>) -> Result<AioCancelStat> { + let r = unsafe { + libc::aio_cancel(self.aiocb.0.aio_fildes, &mut self.aiocb.0) + }; + match r { + libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), + libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), + libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), + -1 => Err(Errno::last()), + _ => panic!("unknown aio_cancel return value"), + } + } + fn common_init(fd: RawFd, prio: i32, sigev_notify: SigevNotify) -> Self { + // Use mem::zeroed instead of explicitly zeroing each field, because the + // number and name of reserved fields is OS-dependent. On some OSes, + // some reserved fields are used the kernel for state, and must be + // explicitly zeroed when allocated. + let mut a = unsafe { mem::zeroed::<libc::aiocb>() }; + a.aio_fildes = fd; + a.aio_reqprio = prio; + a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); AioCb { - aiocb: a, - mutable: true, + aiocb: LibcAiocb(a), in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned } } - /// Constructs a new `AioCb` from a mutable slice. - /// - /// The resulting `AioCb` will be suitable for both read and write - /// operations, but only if the borrow checker can guarantee that the slice - /// will outlive the `AioCb`. That will usually be the case if the `AioCb` - /// is stack-allocated. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: A memory buffer - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Examples - /// - /// Create an `AioCb` from a mutable slice and read into it. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::io::Write; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const INITIAL: &[u8] = b"abcdef123456"; - /// const LEN: usize = 4; - /// let mut rbuf = vec![0; LEN]; - /// let mut f = tempfile().unwrap(); - /// f.write_all(INITIAL).unwrap(); - /// { - /// let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - /// 2, //offset - /// &mut rbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.read().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN); - /// } - /// assert_eq!(rbuf, b"cdef"); - /// ``` - pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - a.0.aio_buf = buf.as_ptr() as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: true, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn error(self: Pin<&mut Self>) -> Result<()> { + let r = unsafe { libc::aio_error(&self.aiocb().0) }; + match r { + 0 => Ok(()), + num if num > 0 => Err(Errno::from_i32(num)), + -1 => Err(Errno::last()), + num => panic!("unknown aio_error return value {:?}", num), + } } - /// Constructs a new `AioCb` from a mutable raw pointer - /// - /// Unlike `from_mut_slice`, this method returns a structure suitable for - /// placement on the heap. It may be used for both reads and writes. Due - /// to its unsafety, this method is not recommended. It is most useful when - /// heap allocation is required. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: Pointer to the memory buffer - /// * `len`: Length of the buffer pointed to by `buf` - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Safety - /// - /// The caller must ensure that the storage pointed to by `buf` outlives the - /// `AioCb`. The lifetime checker can't help here. - pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t, - buf: *mut c_void, len: usize, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = len; - a.0.aio_buf = buf; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: true, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned, - }) + fn in_progress(&self) -> bool { + self.in_progress } - /// Constructs a new `AioCb` from a raw pointer. - /// - /// Unlike `from_slice`, this method returns a structure suitable for - /// placement on the heap. Due to its unsafety, this method is not - /// recommended. It is most useful when heap allocation is required. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: Pointer to the memory buffer - /// * `len`: Length of the buffer pointed to by `buf` - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Safety - /// - /// The caller must ensure that the storage pointed to by `buf` outlives the - /// `AioCb`. The lifetime checker can't help here. - pub unsafe fn from_ptr(fd: RawFd, offs: off_t, - buf: *const c_void, len: usize, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = len; - // casting a const ptr to a mutable ptr here is ok, because we set the - // AioCb's mutable field to false - a.0.aio_buf = buf as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn set_in_progress(mut self: Pin<&mut Self>) { + self.as_mut().in_progress = true; } - // Private helper - fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb - { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - // casting an immutable buffer to a mutable pointer looks unsafe, - // but technically its only unsafe to dereference it, not to create - // it. - a.0.aio_buf = buf.as_ptr() as *mut c_void; - assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer"); - a.0.aio_lio_opcode = opcode as libc::c_int; - - AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - } + /// Update the notification settings for an existing AIO operation that has + /// not yet been submitted. + // Takes a normal reference rather than a pinned one because this method is + // normally called before the object needs to be pinned, that is, before + // it's been submitted to the kernel. + fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) { + assert!( + !self.in_progress, + "Can't change notification settings for an in-progress operation" + ); + self.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); } +} - /// Like [`AioCb::from_mut_slice`], but works on constant slices rather than - /// mutable slices. - /// - /// An `AioCb` created this way cannot be used with `read`, and its - /// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when - /// writing a const buffer with `AioCb::write`, since `from_mut_slice` can't - /// work with const buffers. - /// - /// # Examples - /// - /// Construct an `AioCb` from a slice and use it for writing. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); - /// ``` - // Note: another solution to the problem of writing const buffers would be - // to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read - // could take the former and AioCb::write could take the latter. However, - // then lio_listio wouldn't work, because that function needs a slice of - // AioCb, and they must all be of the same type. - pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin<Box<AioCb>> - { - Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify, - opcode)) +impl Debug for AioCb { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("AioCb") + .field("aiocb", &self.aiocb.0) + .field("in_progress", &self.in_progress) + .finish() } +} - fn common_init(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> LibcAiocb { - // Use mem::zeroed instead of explicitly zeroing each field, because the - // number and name of reserved fields is OS-dependent. On some OSes, - // some reserved fields are used the kernel for state, and must be - // explicitly zeroed when allocated. - let mut a = unsafe { mem::zeroed::<libc::aiocb>()}; - a.aio_fildes = fd; - a.aio_reqprio = prio; - a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); - LibcAiocb(a) +impl Drop for AioCb { + /// If the `AioCb` has no remaining state in the kernel, just drop it. + /// Otherwise, dropping constitutes a resource leak, which is an error + fn drop(&mut self) { + assert!( + thread::panicking() || !self.in_progress, + "Dropped an in-progress AioCb" + ); } +} - /// Update the notification settings for an existing `aiocb` - pub fn set_sigev_notify(self: &mut Pin<Box<Self>>, - sigev_notify: SigevNotify) - { - // Safe because we don't move any of the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); - } +/// Methods common to all AIO operations +pub trait Aio { + /// The return type of [`Aio::aio_return`]. + type Output; + + /// Retrieve return status of an asynchronous operation. + /// + /// Should only be called once for each operation, after [`Aio::error`] + /// indicates that it has completed. The result is the same as for the + /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions. + /// + /// # References + /// + /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) + fn aio_return(self: Pin<&mut Self>) -> Result<Self::Output>; /// Cancels an outstanding AIO request. /// @@ -477,51 +251,26 @@ impl<'a> AioCb<'a> { /// # use tempfile::tempfile; /// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), + /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// &wbuf[..], /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// let cs = aiocb.cancel().unwrap(); + /// SigevNotify::SigevNone)); + /// aiocb.as_mut().submit().unwrap(); + /// let cs = aiocb.as_mut().cancel().unwrap(); /// if cs == AioCancelStat::AioNotCanceled { - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { + /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } /// } /// // Must call `aio_return`, but ignore the result - /// let _ = aiocb.aio_return(); + /// let _ = aiocb.as_mut().aio_return(); /// ``` /// /// # References /// /// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) - pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> { - let r = unsafe { - let selfp = self.as_mut().get_unchecked_mut(); - libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0) - }; - match r { - libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), - libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), - libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), - -1 => Err(Errno::last()), - _ => panic!("unknown aio_cancel return value") - } - } - - fn error_unpinned(&mut self) -> Result<()> { - let r = unsafe { - libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb) - }; - match r { - 0 => Ok(()), - num if num > 0 => Err(Errno::from_i32(num)), - -1 => Err(Errno::last()), - num => panic!("unknown aio_error return value {:?}", num) - } - } + fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat>; /// Retrieve error status of an asynchronous operation. /// @@ -543,60 +292,265 @@ impl<'a> AioCb<'a> { /// # use tempfile::tempfile; /// const WBUF: &[u8] = b"abcdef123456"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), + /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// WBUF, /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { + /// SigevNotify::SigevNone)); + /// aiocb.as_mut().submit().unwrap(); + /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); + /// assert_eq!(aiocb.as_mut().aio_return().unwrap(), WBUF.len()); /// ``` /// /// # References /// /// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html) - pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> { - // Safe because error_unpinned doesn't move the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.error_unpinned() - } + fn error(self: Pin<&mut Self>) -> Result<()>; + + /// Returns the underlying file descriptor associated with the operation. + fn fd(&self) -> RawFd; - /// An asynchronous version of `fsync(2)`. + /// Does this operation currently have any in-kernel state? /// - /// # References + /// Dropping an operation that does have in-kernel state constitutes a + /// resource leak. /// - /// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) - pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> { - // Safe because we don't move the libc::aiocb - unsafe { - let selfp = self.as_mut().get_unchecked_mut(); - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - libc::aio_fsync(mode as libc::c_int, p) - }).map(|_| { - selfp.in_progress = true; + /// # Examples + /// + /// ``` + /// # use nix::errno::Errno; + /// # use nix::Error; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify::SigevNone; + /// # use std::{thread, time}; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// let f = tempfile().unwrap(); + /// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, + /// 0, SigevNone)); + /// assert!(!aiof.as_mut().in_progress()); + /// aiof.as_mut().submit().expect("aio_fsync failed early"); + /// assert!(aiof.as_mut().in_progress()); + /// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// } + /// aiof.as_mut().aio_return().expect("aio_fsync failed late"); + /// assert!(!aiof.as_mut().in_progress()); + /// ``` + fn in_progress(&self) -> bool; + + /// Returns the priority of the `AioCb` + fn priority(&self) -> i32; + + /// Update the notification settings for an existing AIO operation that has + /// not yet been submitted. + fn set_sigev_notify(&mut self, sev: SigevNotify); + + /// Returns the `SigEvent` that will be used for notification. + fn sigevent(&self) -> SigEvent; + + /// Actually start the I/O operation. + /// + /// After calling this method and until [`Aio::aio_return`] returns `Ok`, + /// the structure may not be moved in memory. + fn submit(self: Pin<&mut Self>) -> Result<()>; +} + +macro_rules! aio_methods { + () => { + fn cancel(self: Pin<&mut Self>) -> Result<AioCancelStat> { + self.aiocb().cancel() + } + + fn error(self: Pin<&mut Self>) -> Result<()> { + self.aiocb().error() + } + + fn fd(&self) -> RawFd { + self.aiocb.aiocb.0.aio_fildes + } + + fn in_progress(&self) -> bool { + self.aiocb.in_progress() + } + + fn priority(&self) -> i32 { + self.aiocb.aiocb.0.aio_reqprio + } + + fn set_sigev_notify(&mut self, sev: SigevNotify) { + self.aiocb.set_sigev_notify(sev) + } + + fn sigevent(&self) -> SigEvent { + SigEvent::from(&self.aiocb.aiocb.0.aio_sigevent) + } + }; + ($func:ident) => { + aio_methods!(); + + fn aio_return(self: Pin<&mut Self>) -> Result<<Self as Aio>::Output> { + self.aiocb().aio_return() + } + + fn submit(mut self: Pin<&mut Self>) -> Result<()> { + let p: *mut libc::aiocb = &mut self.as_mut().aiocb().aiocb.0; + Errno::result({ unsafe { libc::$func(p) } }).map(|_| { + self.aiocb().set_in_progress(); }) } + }; +} + +/// An asynchronous version of `fsync(2)`. +/// +/// # References +/// +/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) +/// # Examples +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify::SigevNone; +/// # use std::{thread, time}; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// let f = tempfile().unwrap(); +/// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, +/// 0, SigevNone)); +/// aiof.as_mut().submit().expect("aio_fsync failed early"); +/// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// aiof.as_mut().aio_return().expect("aio_fsync failed late"); +/// ``` +#[derive(Debug)] +#[repr(transparent)] +pub struct AioFsync { + aiocb: AioCb, + _pin: PhantomPinned, +} + +impl AioFsync { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the operation's fsync mode: data and metadata or data only? + pub fn mode(&self) -> AioFsyncMode { + AioFsyncMode::try_from(self.aiocb.aiocb.0.aio_lio_opcode).unwrap() } - /// Returns the `aiocb`'s `LioOpcode` field + /// Create a new `AioFsync`. /// - /// If the value cannot be represented as an `LioOpcode`, returns `None` - /// instead. - pub fn lio_opcode(&self) -> Option<LioOpcode> { - match self.aiocb.0.aio_lio_opcode { - libc::LIO_READ => Some(LioOpcode::LIO_READ), - libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE), - libc::LIO_NOP => Some(LioOpcode::LIO_NOP), - _ => None + /// # Arguments + /// + /// * `fd`: File descriptor to sync. + /// * `mode`: Whether to sync file metadata too, or just data. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio`. + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + mode: AioFsyncMode, + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // To save some memory, store mode in an unused field of the AioCb. + // True it isn't very much memory, but downstream creates will likely + // create an enum containing this and other AioCb variants and pack + // those enums into data structures like Vec, so it adds up. + aiocb.aiocb.0.aio_lio_opcode = mode as libc::c_int; + AioFsync { + aiocb, + _pin: PhantomPinned, } } +} + +impl Aio for AioFsync { + type Output = (); + + aio_methods!(); + + fn aio_return(self: Pin<&mut Self>) -> Result<()> { + self.aiocb().aio_return().map(drop) + } + + fn submit(mut self: Pin<&mut Self>) -> Result<()> { + let aiocb = &mut self.as_mut().aiocb().aiocb.0; + let mode = mem::replace(&mut aiocb.aio_lio_opcode, 0); + let p: *mut libc::aiocb = aiocb; + Errno::result(unsafe { libc::aio_fsync(mode, p) }).map(|_| { + self.aiocb().set_in_progress(); + }) + } +} + +// AioFsync does not need AsMut, since it can't be used with lio_listio + +impl AsRef<libc::aiocb> for AioFsync { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 + } +} + +/// Asynchronously reads from a file descriptor into a buffer +/// +/// # References +/// +/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) +/// +/// # Examples +/// +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::Write; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const INITIAL: &[u8] = b"abcdef123456"; +/// const LEN: usize = 4; +/// let mut rbuf = vec![0; LEN]; +/// let mut f = tempfile().unwrap(); +/// f.write_all(INITIAL).unwrap(); +/// { +/// let mut aior = Box::pin( +/// AioRead::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &mut rbuf, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aior.as_mut().submit().unwrap(); +/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aior.as_mut().aio_return().unwrap(), LEN); +/// } +/// assert_eq!(rbuf, b"cdef"); +/// ``` +#[derive(Debug)] +#[repr(transparent)] +pub struct AioRead<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [u8]>, + _pin: PhantomPinned, +} + +impl<'a> AioRead<'a> { + unsafe_pinned!(aiocb: AioCb); /// Returns the requested length of the aio operation in bytes /// @@ -604,85 +558,418 @@ impl<'a> AioCb<'a> { /// number of bytes actually read or written by a completed operation, use /// `aio_return` instead. pub fn nbytes(&self) -> usize { - self.aiocb.0.aio_nbytes + self.aiocb.aiocb.0.aio_nbytes } - /// Returns the file offset stored in the `AioCb` + /// Create a new `AioRead`, placing the data in a mutable slice. + /// + /// # Arguments + /// + /// * `fd`: File descriptor to read from + /// * `offs`: File offset + /// * `buf`: A memory buffer. It must outlive the `AioRead`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + buf: &'a mut [u8], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + aiocb.aiocb.0.aio_nbytes = buf.len(); + aiocb.aiocb.0.aio_buf = buf.as_mut_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READ; + aiocb.aiocb.0.aio_offset = offs; + AioRead { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } + } + + /// Returns the file offset of the operation. pub fn offset(&self) -> off_t { - self.aiocb.0.aio_offset + self.aiocb.aiocb.0.aio_offset + } +} + +impl<'a> Aio for AioRead<'a> { + type Output = usize; + + aio_methods!(aio_read); +} + +impl<'a> AsMut<libc::aiocb> for AioRead<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 } +} - /// Returns the priority of the `AioCb` - pub fn priority(&self) -> libc::c_int { - self.aiocb.0.aio_reqprio +impl<'a> AsRef<libc::aiocb> for AioRead<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } +} - /// Asynchronously reads from a file descriptor into a buffer +/// Asynchronously reads from a file descriptor into a scatter/gather list of buffers. +/// +/// # References +/// +/// [aio_readv](https://www.freebsd.org/cgi/man.cgi?query=aio_readv) +/// +/// # Examples +/// +/// +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::{IoSliceMut, Write}; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const INITIAL: &[u8] = b"abcdef123456"; +/// let mut rbuf0 = vec![0; 4]; +/// let mut rbuf1 = vec![0; 2]; +/// let expected_len = rbuf0.len() + rbuf1.len(); +/// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; +/// let mut f = tempfile().unwrap(); +/// f.write_all(INITIAL).unwrap(); +/// { +/// let mut aior = Box::pin( +/// AioReadv::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &mut rbufs, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aior.as_mut().submit().unwrap(); +/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aior.as_mut().aio_return().unwrap(), expected_len); +/// } +/// assert_eq!(rbuf0, b"cdef"); +/// assert_eq!(rbuf1, b"12"); +/// ``` +#[cfg(target_os = "freebsd")] +#[derive(Debug)] +#[repr(transparent)] +pub struct AioReadv<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [&'a [u8]]>, + _pin: PhantomPinned, +} + +#[cfg(target_os = "freebsd")] +impl<'a> AioReadv<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the number of buffers the operation will read into. + pub fn iovlen(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes + } + + /// Create a new `AioReadv`, placing the data in a list of mutable slices. /// - /// # References + /// # Arguments /// - /// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) - pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> { - assert!(self.mutable, "Can't read into an immutable buffer"); - // Safe because we don't move anything - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - unsafe { libc::aio_read(p) } - }).map(|_| { - selfp.in_progress = true; - }) + /// * `fd`: File descriptor to read from + /// * `offs`: File offset + /// * `bufs`: A scatter/gather list of memory buffers. They must + /// outlive the `AioReadv`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + bufs: &mut [IoSliceMut<'a>], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // In vectored mode, aio_nbytes stores the length of the iovec array, + // not the byte count. + aiocb.aiocb.0.aio_nbytes = bufs.len(); + aiocb.aiocb.0.aio_buf = bufs.as_mut_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READV; + aiocb.aiocb.0.aio_offset = offs; + AioReadv { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } } - /// Returns the `SigEvent` stored in the `AioCb` - pub fn sigevent(&self) -> SigEvent { - SigEvent::from(&self.aiocb.0.aio_sigevent) + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset } +} - fn aio_return_unpinned(&mut self) -> Result<isize> { - unsafe { - let p: *mut libc::aiocb = &mut self.aiocb.0; - self.in_progress = false; - Errno::result(libc::aio_return(p)) - } +#[cfg(target_os = "freebsd")] +impl<'a> Aio for AioReadv<'a> { + type Output = usize; + + aio_methods!(aio_readv); +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsMut<libc::aiocb> for AioReadv<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 + } +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsRef<libc::aiocb> for AioReadv<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } +} - /// Retrieve return status of an asynchronous operation. +/// Asynchronously writes from a buffer to a file descriptor +/// +/// # References +/// +/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) +/// +/// # Examples +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin( +/// AioWrite::new( +/// f.as_raw_fd(), +/// 2, //offset +/// WBUF, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aiow.as_mut().submit().unwrap(); +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +#[derive(Debug)] +#[repr(transparent)] +pub struct AioWrite<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [u8]>, + _pin: PhantomPinned, +} + +impl<'a> AioWrite<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the requested length of the aio operation in bytes /// - /// Should only be called once for each `AioCb`, after `AioCb::error` - /// indicates that it has completed. The result is the same as for the - /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions. + /// This method returns the *requested* length of the operation. To get the + /// number of bytes actually read or written by a completed operation, use + /// `aio_return` instead. + pub fn nbytes(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes + } + + /// Construct a new `AioWrite`. /// - /// # References + /// # Arguments /// - /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) - // Note: this should be just `return`, but that's a reserved word - pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> { - // Safe because aio_return_unpinned does not move the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.aio_return_unpinned() + /// * `fd`: File descriptor to write to + /// * `offs`: File offset + /// * `buf`: A memory buffer. It must outlive the `AioWrite`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + buf: &'a [u8], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + aiocb.aiocb.0.aio_nbytes = buf.len(); + // casting an immutable buffer to a mutable pointer looks unsafe, + // but technically its only unsafe to dereference it, not to create + // it. Type Safety guarantees that we'll never pass aiocb to + // aio_read or aio_readv. + aiocb.aiocb.0.aio_buf = buf.as_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITE; + aiocb.aiocb.0.aio_offset = offs; + AioWrite { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } + } + + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset + } +} + +impl<'a> Aio for AioWrite<'a> { + type Output = usize; + + aio_methods!(aio_write); +} + +impl<'a> AsMut<libc::aiocb> for AioWrite<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 + } +} + +impl<'a> AsRef<libc::aiocb> for AioWrite<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 + } +} + +/// Asynchronously writes from a scatter/gather list of buffers to a file descriptor. +/// +/// # References +/// +/// [aio_writev](https://www.freebsd.org/cgi/man.cgi?query=aio_writev) +/// +/// # Examples +/// +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::IoSlice; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const wbuf0: &[u8] = b"abcdef"; +/// const wbuf1: &[u8] = b"123456"; +/// let len = wbuf0.len() + wbuf1.len(); +/// let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin( +/// AioWritev::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &wbufs, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aiow.as_mut().submit().unwrap(); +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), len); +/// ``` +#[cfg(target_os = "freebsd")] +#[derive(Debug)] +#[repr(transparent)] +pub struct AioWritev<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [&'a [u8]]>, + _pin: PhantomPinned, +} + +#[cfg(target_os = "freebsd")] +impl<'a> AioWritev<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the number of buffers the operation will read into. + pub fn iovlen(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes } - /// Asynchronously writes from a buffer to a file descriptor + /// Construct a new `AioWritev`. /// - /// # References + /// # Arguments /// - /// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) - pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> { - // Safe because we don't move anything - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - unsafe{ libc::aio_write(p) } - }).map(|_| { - selfp.in_progress = true; - }) + /// * `fd`: File descriptor to write to + /// * `offs`: File offset + /// * `bufs`: A scatter/gather list of memory buffers. They must + /// outlive the `AioWritev`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + bufs: &[IoSlice<'a>], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // In vectored mode, aio_nbytes stores the length of the iovec array, + // not the byte count. + aiocb.aiocb.0.aio_nbytes = bufs.len(); + // casting an immutable buffer to a mutable pointer looks unsafe, + // but technically its only unsafe to dereference it, not to create + // it. Type Safety guarantees that we'll never pass aiocb to + // aio_read or aio_readv. + aiocb.aiocb.0.aio_buf = bufs.as_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITEV; + aiocb.aiocb.0.aio_offset = offs; + AioWritev { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } + } + + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset + } +} + +#[cfg(target_os = "freebsd")] +impl<'a> Aio for AioWritev<'a> { + type Output = usize; + + aio_methods!(aio_writev); +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsMut<libc::aiocb> for AioWritev<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 + } +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsRef<libc::aiocb> for AioWritev<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } } @@ -704,38 +991,37 @@ impl<'a> AioCb<'a> { /// # use tempfile::tempfile; /// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), +/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// &wbuf[..], /// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_NOP); -/// aiocb.write().unwrap(); +/// SigevNotify::SigevNone)); +/// aiocb.as_mut().submit().unwrap(); /// let cs = aio_cancel_all(f.as_raw_fd()).unwrap(); /// if cs == AioCancelStat::AioNotCanceled { -/// while (aiocb.error() == Err(Errno::EINPROGRESS)) { +/// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } /// } /// // Must call `aio_return`, but ignore the result -/// let _ = aiocb.aio_return(); +/// let _ = aiocb.as_mut().aio_return(); /// ``` /// /// # References /// /// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> { - match unsafe { libc::aio_cancel(fd, null_mut()) } { + match unsafe { libc::aio_cancel(fd, ptr::null_mut()) } { libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), -1 => Err(Errno::last()), - _ => panic!("unknown aio_cancel return value") + _ => panic!("unknown aio_cancel return value"), } } -/// Suspends the calling process until at least one of the specified `AioCb`s -/// has completed, a signal is delivered, or the timeout has passed. +/// Suspends the calling process until at least one of the specified operations +/// have completed, a signal is delivered, or the timeout has passed. /// /// If `timeout` is `None`, `aio_suspend` will block indefinitely. /// @@ -750,380 +1036,209 @@ pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> { /// # use tempfile::tempfile; /// const WBUF: &[u8] = b"abcdef123456"; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), +/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// WBUF, /// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_NOP); -/// aiocb.write().unwrap(); -/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed"); -/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); +/// SigevNotify::SigevNone)); +/// aiocb.as_mut().submit().unwrap(); +/// aio_suspend(&[&*aiocb], None).expect("aio_suspend failed"); +/// assert_eq!(aiocb.as_mut().aio_return().unwrap() as usize, WBUF.len()); /// ``` /// # References /// /// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html) -pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> { - let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb]; - let p = plist as *const *const libc::aiocb; +pub fn aio_suspend( + list: &[&dyn AsRef<libc::aiocb>], + timeout: Option<TimeSpec>, +) -> Result<()> { + let p = list as *const [&dyn AsRef<libc::aiocb>] + as *const [*const libc::aiocb] + as *const *const libc::aiocb; let timep = match timeout { - None => null::<libc::timespec>(), - Some(x) => x.as_ref() as *const libc::timespec + None => ptr::null::<libc::timespec>(), + Some(x) => x.as_ref() as *const libc::timespec, }; - Errno::result(unsafe { - libc::aio_suspend(p, list.len() as i32, timep) - }).map(drop) -} - -impl<'a> Debug for AioCb<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("AioCb") - .field("aiocb", &self.aiocb.0) - .field("mutable", &self.mutable) - .field("in_progress", &self.in_progress) - .finish() - } + Errno::result(unsafe { libc::aio_suspend(p, list.len() as i32, timep) }) + .map(drop) } -impl<'a> Drop for AioCb<'a> { - /// If the `AioCb` has no remaining state in the kernel, just drop it. - /// Otherwise, dropping constitutes a resource leak, which is an error - fn drop(&mut self) { - assert!(thread::panicking() || !self.in_progress, - "Dropped an in-progress AioCb"); - } -} - -/// LIO Control Block. +/// Submits multiple asynchronous I/O requests with a single system call. /// -/// The basic structure used to issue multiple AIO operations simultaneously. -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -pub struct LioCb<'a> { - /// A collection of [`AioCb`]s. All of these will be issued simultaneously - /// by the [`listio`] method. - /// - /// [`AioCb`]: struct.AioCb.html - /// [`listio`]: #method.listio - // Their locations in memory must be fixed once they are passed to the - // kernel. So this field must be non-public so the user can't swap. - aiocbs: Box<[AioCb<'a>]>, - - /// The actual list passed to `libc::lio_listio`. - /// - /// It must live for as long as any of the operations are still being - /// processesed, because the aio subsystem uses its address as a unique - /// identifier. - list: Vec<*mut libc::aiocb>, - - /// A partial set of results. This field will get populated by - /// `listio_resubmit` when an `LioCb` is resubmitted after an error - results: Vec<Option<Result<isize>>> -} - -/// LioCb can't automatically impl Send and Sync just because of the raw -/// pointers in list. But that's stupid. There's no reason that raw pointers -/// should automatically be non-Send -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -unsafe impl<'a> Send for LioCb<'a> {} -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -unsafe impl<'a> Sync for LioCb<'a> {} - -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -impl<'a> LioCb<'a> { - /// Are no [`AioCb`]s contained? - pub fn is_empty(&self) -> bool { - self.aiocbs.is_empty() - } - - /// Return the number of individual [`AioCb`]s contained. - pub fn len(&self) -> usize { - self.aiocbs.len() - } - - /// Submits multiple asynchronous I/O requests with a single system call. - /// - /// They are not guaranteed to complete atomically, and the order in which - /// the requests are carried out is not specified. Reads, writes, and - /// fsyncs may be freely mixed. - /// - /// This function is useful for reducing the context-switch overhead of - /// submitting many AIO operations. It can also be used with - /// `LioMode::LIO_WAIT` to block on the result of several independent - /// operations. Used that way, it is often useful in programs that - /// otherwise make little use of AIO. - /// - /// # Examples - /// - /// Use `listio` to submit an aio operation and wait for its completion. In - /// this case, there is no need to use [`aio_suspend`] to wait or - /// [`AioCb::error`] to poll. - /// - /// ``` - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCbBuilder::with_capacity(1) - /// .emplace_slice( - /// f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE - /// ).finish(); - /// liocb.listio(LioMode::LIO_WAIT, - /// SigevNotify::SigevNone).unwrap(); - /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - /// ``` - /// - /// # References - /// - /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) - /// - /// [`aio_suspend`]: fn.aio_suspend.html - /// [`AioCb::error`]: struct.AioCb.html#method.error - pub fn listio(&mut self, mode: LioMode, - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - self.list.clear(); - for a in &mut self.aiocbs.iter_mut() { - a.in_progress = true; - self.list.push(a as *mut AioCb<'a> - as *mut libc::aiocb); - } - let p = self.list.as_ptr(); - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) - }).map(drop) - } - - /// Resubmits any incomplete operations with [`lio_listio`]. - /// - /// Sometimes, due to system resource limitations, an `lio_listio` call will - /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return - /// `EINTR`. In any of these cases, only a subset of its constituent - /// operations will actually have been initiated. `listio_resubmit` will - /// resubmit any operations that are still uninitiated. - /// - /// After calling `listio_resubmit`, results should be collected by - /// [`LioCb::aio_return`]. - /// - /// # Examples - /// ```no_run - /// # use nix::Error; - /// # use nix::errno::Errno; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::os::unix::io::AsRawFd; - /// # use std::{thread, time}; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCbBuilder::with_capacity(1) - /// .emplace_slice( - /// f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE - /// ).finish(); - /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); - /// while err == Err(Errno::EIO) || - /// err == Err(Errno::EAGAIN) { - /// thread::sleep(time::Duration::from_millis(10)); - /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); - /// } - /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - /// ``` - /// - /// # References - /// - /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) - /// - /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html - /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return - // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be - // changed by this method, because the kernel relies on their addresses - // being stable. - // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the - // sigev_notify will immediately refire. - pub fn listio_resubmit(&mut self, mode:LioMode, - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - self.list.clear(); - - while self.results.len() < self.aiocbs.len() { - self.results.push(None); - } - - for (i, a) in self.aiocbs.iter_mut().enumerate() { - if self.results[i].is_some() { - // Already collected final status for this operation - continue; - } - match a.error_unpinned() { - Ok(()) => { - // aiocb is complete; collect its status and don't resubmit - self.results[i] = Some(a.aio_return_unpinned()); - }, - Err(Errno::EAGAIN) => { - self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); - }, - Err(Errno::EINPROGRESS) => { - // aiocb is was successfully queued; no need to do anything - }, - Err(Errno::EINVAL) => panic!( - "AioCb was never submitted, or already finalized"), - _ => unreachable!() - } - } - let p = self.list.as_ptr(); - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) - }).map(drop) - } - - /// Collect final status for an individual `AioCb` submitted as part of an - /// `LioCb`. - /// - /// This is just like [`AioCb::aio_return`], except it takes into account - /// operations that were restarted by [`LioCb::listio_resubmit`] - /// - /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return - /// [`LioCb::listio_resubmit`]: #method.listio_resubmit - pub fn aio_return(&mut self, i: usize) -> Result<isize> { - if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].aio_return_unpinned() - } else { - self.results[i].unwrap() - } - } - - /// Retrieve error status of an individual `AioCb` submitted as part of an - /// `LioCb`. - /// - /// This is just like [`AioCb::error`], except it takes into account - /// operations that were restarted by [`LioCb::listio_resubmit`] - /// - /// [`AioCb::error`]: struct.AioCb.html#method.error - /// [`LioCb::listio_resubmit`]: #method.listio_resubmit - pub fn error(&mut self, i: usize) -> Result<()> { - if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].error_unpinned() - } else { - Ok(()) - } - } -} - -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -impl<'a> Debug for LioCb<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("LioCb") - .field("aiocbs", &self.aiocbs) - .finish() - } -} - -/// Used to construct `LioCb` -// This must be a separate class from LioCb due to pinning constraints. LioCb -// must use a boxed slice of AioCbs so they will have stable storage, but -// LioCbBuilder must use a Vec to make construction possible when the final size -// is unknown. -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -#[derive(Debug)] -pub struct LioCbBuilder<'a> { - /// A collection of [`AioCb`]s. - /// - /// [`AioCb`]: struct.AioCb.html - pub aiocbs: Vec<AioCb<'a>>, +/// They are not guaranteed to complete atomically, and the order in which the +/// requests are carried out is not specified. Reads, and writes may be freely +/// mixed. +/// +/// # Examples +/// +/// Use `lio_listio` to submit an aio operation and wait for its completion. In +/// this case, there is no need to use aio_suspend to wait or `error` to poll. +/// This mode is useful for otherwise-synchronous programs that want to execute +/// a handful of I/O operations in parallel. +/// ``` +/// # use std::os::unix::io::AsRawFd; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// lio_listio(LioMode::LIO_WAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) +/// .unwrap(); +/// // At this point, we are guaranteed that aiow is complete. +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +/// +/// Use `lio_listio` to submit multiple asynchronous operations with a single +/// syscall, but receive notification individually. This is an efficient +/// technique for reducing overall context-switch overhead, especially when +/// combined with kqueue. +/// ``` +/// # use std::os::unix::io::AsRawFd; +/// # use std::thread; +/// # use std::time; +/// # use nix::errno::Errno; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) +/// .unwrap(); +/// // We must wait for the completion of each individual operation +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +/// +/// Use `lio_listio` to submit multiple operations, and receive notification +/// only when all of them are complete. This can be useful when there is some +/// logical relationship between the operations. But beware! Errors or system +/// resource limitations may cause `lio_listio` to return `EIO`, `EAGAIN`, or +/// `EINTR`, in which case some but not all operations may have been submitted. +/// In that case, you must check the status of each individual operation, and +/// possibly resubmit some. +/// ``` +/// # use libc::c_int; +/// # use std::os::unix::io::AsRawFd; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use std::thread; +/// # use std::time; +/// # use lazy_static::lazy_static; +/// # use nix::errno::Errno; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::*; +/// # use tempfile::tempfile; +/// lazy_static! { +/// pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); +/// } +/// +/// extern fn sigfunc(_: c_int) { +/// SIGNALED.store(true, Ordering::Relaxed); +/// } +/// let sa = SigAction::new(SigHandler::Handler(sigfunc), +/// SaFlags::SA_RESETHAND, +/// SigSet::empty()); +/// SIGNALED.store(false, Ordering::Relaxed); +/// unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); +/// +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// let sev = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, si_value: 0 }; +/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], sev).unwrap(); +/// while !SIGNALED.load(Ordering::Relaxed) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// // At this point, since `lio_listio` returned success and delivered its +/// // notification, we know that all operations are complete. +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +pub fn lio_listio( + mode: LioMode, + list: &mut [Pin<&mut dyn AsMut<libc::aiocb>>], + sigev_notify: SigevNotify, +) -> Result<()> { + let p = list as *mut [Pin<&mut dyn AsMut<libc::aiocb>>] + as *mut [*mut libc::aiocb] + as *mut *mut libc::aiocb; + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, list.len() as i32, sigevp) + }) + .map(drop) } -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -impl<'a> LioCbBuilder<'a> { - /// Initialize an empty `LioCb` - pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> { - LioCbBuilder { - aiocbs: Vec::with_capacity(capacity), - } - } +#[cfg(test)] +mod t { + use super::*; - /// Add a new operation on an immutable slice to the [`LioCb`] under - /// construction. - /// - /// Arguments are the same as for [`AioCb::from_slice`] - /// - /// [`LioCb`]: struct.LioCb.html - /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice - #[must_use] - pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Self - { - self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio, - sigev_notify, opcode)); - self - } + /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb + /// pointers. This test ensures that such casts are valid. + #[test] + fn casting() { + let sev = SigevNotify::SigevNone; + let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev); + assert_eq!( + aiof.as_ref() as *const libc::aiocb, + &aiof as *const AioFsync as *const libc::aiocb + ); - /// Add a new operation on a mutable slice to the [`LioCb`] under - /// construction. - /// - /// Arguments are the same as for [`AioCb::from_mut_slice`] - /// - /// [`LioCb`]: struct.LioCb.html - /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice - #[must_use] - pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t, - buf: &'a mut [u8], prio: libc::c_int, - sigev_notify: SigevNotify, opcode: LioOpcode) - -> Self - { - self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio, - sigev_notify, opcode)); - self - } + let mut rbuf = []; + let aior = AioRead::new(666, 0, &mut rbuf, 0, sev); + assert_eq!( + aior.as_ref() as *const libc::aiocb, + &aior as *const AioRead as *const libc::aiocb + ); - /// Finalize this [`LioCb`]. - /// - /// Afterwards it will be possible to issue the operations with - /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new - /// operations with [`LioCbBuilder::emplace_slice`] or - /// [`LioCbBuilder::emplace_mut_slice`]. - /// - /// [`LioCb::listio`]: struct.LioCb.html#method.listio - /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice - /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice - pub fn finish(self) -> LioCb<'a> { - let len = self.aiocbs.len(); - LioCb { - aiocbs: self.aiocbs.into(), - list: Vec::with_capacity(len), - results: Vec::with_capacity(len) - } + let wbuf = []; + let aiow = AioWrite::new(666, 0, &wbuf, 0, sev); + assert_eq!( + aiow.as_ref() as *const libc::aiocb, + &aiow as *const AioWrite as *const libc::aiocb + ); } -} -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg(test)] -mod t { - use super::*; - - // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on - // it. + #[cfg(target_os = "freebsd")] #[test] - fn liocb_is_unpin() { - use assert_impl::assert_impl; + fn casting_vectored() { + let sev = SigevNotify::SigevNone; + + let mut rbuf = []; + let mut rbufs = [IoSliceMut::new(&mut rbuf)]; + let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev); + assert_eq!( + aiorv.as_ref() as *const libc::aiocb, + &aiorv as *const AioReadv as *const libc::aiocb + ); - assert_impl!(Unpin: LioCb); + let wbuf = []; + let wbufs = [IoSlice::new(&wbuf)]; + let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev); + assert_eq!( + aiowv.as_ref() as *const libc::aiocb, + &aiowv as *const AioWritev as *const libc::aiocb + ); } } diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index 80cd053f..ca35b5f8 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -1,415 +1,525 @@ -use libc::{c_int, c_void}; -use nix::Result; -use nix::errno::*; -use nix::sys::aio::*; -use nix::sys::signal::{SaFlags, SigAction, sigaction, SigevNotify, SigHandler, Signal, SigSet}; -use nix::sys::time::{TimeSpec, TimeValLike}; -use std::io::{Write, Read, Seek, SeekFrom}; -use std::ops::Deref; -use std::os::unix::io::AsRawFd; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::{thread, time}; +use std::{ + io::{Read, Seek, SeekFrom, Write}, + ops::Deref, + os::unix::io::AsRawFd, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + thread, + time, +}; + +use libc::c_int; +use nix::{ + errno::*, + sys::{ + aio::*, + signal::{ + sigaction, + SaFlags, + SigAction, + SigHandler, + SigSet, + SigevNotify, + Signal, + }, + time::{TimeSpec, TimeValLike}, + }, +}; use tempfile::tempfile; -// Helper that polls an AioCb for completion or error -fn poll_aio(aiocb: &mut Pin<Box<AioCb>>) -> Result<()> { - loop { - let err = aiocb.error(); - if err != Err(Errno::EINPROGRESS) { return err; }; - thread::sleep(time::Duration::from_millis(10)); - } +lazy_static! { + pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); } -// Helper that polls a component of an LioCb for completion or error -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -fn poll_lio(liocb: &mut LioCb, i: usize) -> Result<()> { - loop { - let err = liocb.error(i); - if err != Err(Errno::EINPROGRESS) { return err; }; - thread::sleep(time::Duration::from_millis(10)); - } +extern "C" fn sigfunc(_: c_int) { + SIGNALED.store(true, Ordering::Relaxed); } -#[test] -fn test_accessors() { - let mut rbuf = vec![0; 4]; - let aiocb = AioCb::from_mut_slice( 1001, - 2, //offset - &mut rbuf, - 42, //priority - SigevNotify::SigevSignal { - signal: Signal::SIGUSR2, - si_value: 99 - }, - LioOpcode::LIO_NOP); - assert_eq!(1001, aiocb.fd()); - assert_eq!(Some(LioOpcode::LIO_NOP), aiocb.lio_opcode()); - assert_eq!(4, aiocb.nbytes()); - assert_eq!(2, aiocb.offset()); - assert_eq!(42, aiocb.priority()); - let sev = aiocb.sigevent().sigevent(); - assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); - assert_eq!(99, sev.sigev_value.sival_ptr as i64); +// Helper that polls an AioCb for completion or error +macro_rules! poll_aio { + ($aiocb: expr) => { + loop { + let err = $aiocb.as_mut().error(); + if err != Err(Errno::EINPROGRESS) { + break err; + }; + thread::sleep(time::Duration::from_millis(10)); + } + }; } -// Tests AioCb.cancel. We aren't trying to test the OS's implementation, only -// our bindings. So it's sufficient to check that AioCb.cancel returned any -// AioCancelStat value. -#[test] -#[cfg_attr(target_env = "musl", ignore)] -fn test_cancel() { - let wbuf: &[u8] = b"CDEF"; - - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 0, //offset - wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - let err = aiocb.error(); - assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); +mod aio_fsync { + use super::*; + + #[test] + fn test_accessors() { + let aiocb = AioFsync::new( + 1001, + AioFsyncMode::O_SYNC, + 42, + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let cancelstat = aiocb.cancel(); - assert!(cancelstat.is_ok()); + /// `AioFsync::submit` should not modify the `AioCb` object if + /// `libc::aio_fsync` returns an error + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + use std::mem; + + const INITIAL: &[u8] = b"abcdef123456"; + // Create an invalid AioFsyncMode + let mode = unsafe { mem::transmute(666) }; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiof = Box::pin(AioFsync::new( + f.as_raw_fd(), + mode, + 0, + SigevNotify::SigevNone, + )); + let err = aiof.as_mut().submit(); + assert!(err.is_err()); + } - // Wait for aiocb to complete, but don't care whether it succeeded - let _ = poll_aio(&mut aiocb); - let _ = aiocb.aio_return(); + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let fd = f.as_raw_fd(); + let mut aiof = Box::pin(AioFsync::new( + fd, + AioFsyncMode::O_SYNC, + 0, + SigevNotify::SigevNone, + )); + let err = aiof.as_mut().submit(); + assert!(err.is_ok()); + poll_aio!(&mut aiof).unwrap(); + aiof.as_mut().aio_return().unwrap(); + } } -// Tests using aio_cancel_all for all outstanding IOs. -#[test] -#[cfg_attr(target_env = "musl", ignore)] -fn test_aio_cancel_all() { - let wbuf: &[u8] = b"CDEF"; - - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice(f.as_raw_fd(), - 0, //offset - wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - let err = aiocb.error(); - assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); +mod aio_read { + use super::*; + + #[test] + fn test_accessors() { + let mut rbuf = vec![0; 4]; + let aiocb = AioRead::new( + 1001, + 2, //offset + &mut rbuf, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(4, aiocb.nbytes()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let cancelstat = aio_cancel_all(f.as_raw_fd()); - assert!(cancelstat.is_ok()); + // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, + // only our bindings. So it's sufficient to check that cancel + // returned any AioCancelStat value. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn cancel() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let fd = f.as_raw_fd(); + let mut aior = + Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone)); + aior.as_mut().submit().unwrap(); + + let cancelstat = aior.as_mut().cancel(); + assert!(cancelstat.is_ok()); + + // Wait for aiow to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aior); + let _ = aior.as_mut().aio_return(); + } - // Wait for aiocb to complete, but don't care whether it succeeded - let _ = poll_aio(&mut aiocb); - let _ = aiocb.aio_return(); -} + /// `AioRead::submit` should not modify the `AioCb` object if + /// `libc::aio_read` returns an error + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aior = Box::pin(AioRead::new( + f.as_raw_fd(), + -1, //an invalid offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + )); + assert!(aior.as_mut().submit().is_err()); + } -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_fsync() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_fd( f.as_raw_fd(), - 0, //priority - SigevNotify::SigevNone); - let err = aiocb.fsync(AioFsyncMode::O_SYNC); - assert!(err.is_ok()); - poll_aio(&mut aiocb).unwrap(); - aiocb.aio_return().unwrap(); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + const EXPECT: &[u8] = b"cdef"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + { + let fd = f.as_raw_fd(); + let mut aior = Box::pin(AioRead::new( + fd, + 2, + &mut rbuf, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut().submit().unwrap(); -/// `AioCb::fsync` should not modify the `AioCb` object if `libc::aio_fsync` returns -/// an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_fsync_error() { - use std::mem; + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); + } + assert_eq!(EXPECT, rbuf.deref().deref()); + } - const INITIAL: &[u8] = b"abcdef123456"; - // Create an invalid AioFsyncMode - let mode = unsafe { mem::transmute(666) }; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_fd( f.as_raw_fd(), - 0, //priority - SigevNotify::SigevNone); - let err = aiocb.fsync(mode); - assert!(err.is_err()); + // Like ok, but allocates the structure on the stack. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn on_stack() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + const EXPECT: &[u8] = b"cdef"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + { + let fd = f.as_raw_fd(); + let mut aior = + AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone); + let mut aior = unsafe { Pin::new_unchecked(&mut aior) }; + aior.as_mut().submit().unwrap(); + + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); + } + assert_eq!(EXPECT, rbuf.deref().deref()); + } } -#[test] -// On Cirrus on Linux, this test fails due to a glibc bug. -// https://github.com/nix-rust/nix/issues/1099 -#[cfg_attr(target_os = "linux", ignore)] -// On Cirrus, aio_suspend is failing with EINVAL -// https://github.com/nix-rust/nix/issues/1361 -#[cfg_attr(target_os = "macos", ignore)] -fn test_aio_suspend() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEFG"; - let timeout = TimeSpec::seconds(10); - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); +#[cfg(target_os = "freebsd")] +#[cfg(fbsd14)] +mod aio_readv { + use std::io::IoSliceMut; + + use super::*; + + #[test] + fn test_accessors() { + let mut rbuf0 = vec![0; 4]; + let mut rbuf1 = vec![0; 8]; + let mut rbufs = + [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; + let aiocb = AioReadv::new( + 1001, + 2, //offset + &mut rbufs, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(2, aiocb.iovlen()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let mut wcb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE); - - let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ); - wcb.write().unwrap(); - rcb.read().unwrap(); - loop { + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf0 = vec![0; 4]; + let mut rbuf1 = vec![0; 2]; + let mut rbufs = + [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; + const EXPECT0: &[u8] = b"cdef"; + const EXPECT1: &[u8] = b"12"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); { - let cbbuf = [wcb.as_ref(), rcb.as_ref()]; - let r = aio_suspend(&cbbuf[..], Some(timeout)); - match r { - Err(Errno::EINTR) => continue, - Err(e) => panic!("aio_suspend returned {:?}", e), - Ok(_) => () - }; - } - if rcb.error() != Err(Errno::EINPROGRESS) && - wcb.error() != Err(Errno::EINPROGRESS) { - break + let fd = f.as_raw_fd(); + let mut aior = Box::pin(AioReadv::new( + fd, + 2, + &mut rbufs, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut().submit().unwrap(); + + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!( + aior.as_mut().aio_return().unwrap(), + EXPECT0.len() + EXPECT1.len() + ); } + assert_eq!(&EXPECT0, &rbuf0); + assert_eq!(&EXPECT1, &rbuf1); } - - assert_eq!(wcb.aio_return().unwrap() as usize, WBUF.len()); - assert_eq!(rcb.aio_return().unwrap() as usize, rlen); } -// Test a simple aio operation with no completion notification. We must poll -// for completion -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - 2, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); +mod aio_write { + use super::*; + + #[test] + fn test_accessors() { + let wbuf = vec![0; 4]; + let aiocb = AioWrite::new( + 1001, + 2, //offset + &wbuf, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(4, aiocb.nbytes()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); } - assert_eq!(EXPECT, rbuf.deref().deref()); -} + // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, + // only our bindings. So it's sufficient to check that cancel + // returned any AioCancelStat value. + #[test] + #[cfg_attr(target_env = "musl", ignore)] + fn cancel() { + let wbuf: &[u8] = b"CDEF"; -/// `AioCb::read` should not modify the `AioCb` object if `libc::aio_read` -/// returns an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_read_error() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - -1, //an invalid offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - assert!(aiocb.read().is_err()); -} + let f = tempfile().unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 0, + wbuf, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); + let err = aiow.as_mut().error(); + assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); -// Tests from_mut_slice -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read_into_mut_slice() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - 2, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); + let cancelstat = aiow.as_mut().cancel(); + assert!(cancelstat.is_ok()); + + // Wait for aiow to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aiow); + let _ = aiow.as_mut().aio_return(); } - assert_eq!(rbuf, EXPECT); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf = "CDEF".to_string().into_bytes(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, + &wbuf, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); -// Tests from_ptr -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read_into_pointer() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - // Safety: ok because rbuf lives until after poll_aio - let mut aiocb = unsafe { - AioCb::from_mut_ptr( f.as_raw_fd(), - 2, //offset - rbuf.as_mut_ptr() as *mut c_void, - rbuf.len(), - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP) - }; - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); + let err = poll_aio!(&mut aiow); assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); - } + assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); - assert_eq!(rbuf, EXPECT); -} - -// Test reading into an immutable buffer. It should fail -// FIXME: This test fails to panic on Linux/musl -#[test] -#[should_panic(expected = "Can't read into an immutable buffer")] -#[cfg_attr(target_env = "musl", ignore)] -fn test_read_immutable_buffer() { - let rbuf: &[u8] = b"CDEF"; - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); -} + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } + // Like ok, but allocates the structure on the stack. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn on_stack() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf = "CDEF".to_string().into_bytes(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = AioWrite::new( + f.as_raw_fd(), + 2, //offset + &wbuf, + 0, //priority + SigevNotify::SigevNone, + ); + let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) }; + aiow.as_mut().submit().unwrap(); -// Test a simple aio operation with no completion notification. We must poll -// for completion. Unlike test_aio_read, this test uses AioCb::from_slice -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_write() { - const INITIAL: &[u8] = b"abcdef123456"; - let wbuf = "CDEF".to_string().into_bytes(); - let mut rbuf = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; + let err = poll_aio!(&mut aiow); + assert_eq!(err, Ok(())); + assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - &wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf, EXPECT); + /// `AioWrite::write` should not modify the `AioCb` object if + /// `libc::aio_write` returns an error. + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + let wbuf = "CDEF".to_string().into_bytes(); + let mut aiow = Box::pin(AioWrite::new( + 666, // An invalid file descriptor + 0, //offset + &wbuf, + 0, //priority + SigevNotify::SigevNone, + )); + assert!(aiow.as_mut().submit().is_err()); + // Dropping the AioWrite at this point should not panic + } } -// Tests `AioCb::from_ptr` -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_write_from_pointer() { - const INITIAL: &[u8] = b"abcdef123456"; - let wbuf = "CDEF".to_string().into_bytes(); - let mut rbuf = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - // Safety: ok because aiocb outlives poll_aio - let mut aiocb = unsafe { - AioCb::from_ptr( f.as_raw_fd(), - 2, //offset - wbuf.as_ptr() as *const c_void, - wbuf.len(), - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP) - }; - aiocb.write().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf, EXPECT); -} +#[cfg(target_os = "freebsd")] +#[cfg(fbsd14)] +mod aio_writev { + use std::io::IoSlice; + + use super::*; + + #[test] + fn test_accessors() { + let wbuf0 = vec![0; 4]; + let wbuf1 = vec![0; 8]; + let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)]; + let aiocb = AioWritev::new( + 1001, + 2, //offset + &wbufs, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(2, aiocb.iovlen()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } -/// `AioCb::write` should not modify the `AioCb` object if `libc::aio_write` -/// returns an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_write_error() { - let wbuf = "CDEF".to_string().into_bytes(); - let mut aiocb = AioCb::from_slice( 666, // An invalid file descriptor - 0, //offset - &wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - assert!(aiocb.write().is_err()); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf0 = b"BC"; + let wbuf1 = b"DEF"; + let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; + let wlen = wbuf0.len() + wbuf1.len(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"aBCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = Box::pin(AioWritev::new( + f.as_raw_fd(), + 1, + &wbufs, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); -lazy_static! { - pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); -} + let err = poll_aio!(&mut aiow); + assert_eq!(err, Ok(())); + assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen); -extern fn sigfunc(_: c_int) { - SIGNALED.store(true, Ordering::Relaxed); + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } } // Test an aio operation with completion delivered by a signal -// FIXME: This test is ignored on mips because of failures in qemu in CI #[test] -#[cfg_attr(any(all(target_env = "musl", target_arch = "x86_64"), target_arch = "mips", target_arch = "mips64"), ignore)] -fn test_write_sigev_signal() { +#[cfg_attr( + any( + all(target_env = "musl", target_arch = "x86_64"), + target_arch = "mips", + target_arch = "mips64" + ), + ignore +)] +fn sigev_signal() { let _m = crate::SIGNAL_MTX.lock(); - let sa = SigAction::new(SigHandler::Handler(sigfunc), - SaFlags::SA_RESETHAND, - SigSet::empty()); + let sa = SigAction::new( + SigHandler::Handler(sigfunc), + SaFlags::SA_RESETHAND, + SigSet::empty(), + ); SIGNALED.store(false, Ordering::Relaxed); unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); @@ -420,201 +530,107 @@ fn test_write_sigev_signal() { let mut f = tempfile().unwrap(); f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevSignal { - signal: Signal::SIGUSR2, - si_value: 0 //TODO: validate in sigfunc - }, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 0, //TODO: validate in sigfunc + }, + )); + aiow.as_mut().submit().unwrap(); while !SIGNALED.load(Ordering::Relaxed) { thread::sleep(time::Duration::from_millis(10)); } - assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); + assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); f.seek(SeekFrom::Start(0)).unwrap(); let len = f.read_to_end(&mut rbuf).unwrap(); assert_eq!(len, EXPECT.len()); assert_eq!(rbuf, EXPECT); } -// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the -// time listio returns. -#[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_liocb_listio_wait() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - let mut f = tempfile().unwrap(); - - f.write_all(INITIAL).unwrap(); - - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); - err.expect("lio_listio"); - - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); - } - assert_eq!(rbuf.deref().deref(), b"3456"); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); -} - -// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other -// mechanism to check for the individual AioCb's completion. +// Tests using aio_cancel_all for all outstanding IOs. #[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_liocb_listio_nowait() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - let mut f = tempfile().unwrap(); +#[cfg_attr(target_env = "musl", ignore)] +fn test_aio_cancel_all() { + let wbuf: &[u8] = b"CDEF"; - f.write_all(INITIAL).unwrap(); + let f = tempfile().unwrap(); + let mut aiocb = Box::pin(AioWrite::new( + f.as_raw_fd(), + 0, //offset + wbuf, + 0, //priority + SigevNotify::SigevNone, + )); + aiocb.as_mut().submit().unwrap(); + let err = aiocb.as_mut().error(); + assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); - err.expect("lio_listio"); - - poll_lio(&mut liocb, 0).unwrap(); - poll_lio(&mut liocb, 1).unwrap(); - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); - } - assert_eq!(rbuf.deref().deref(), b"3456"); + let cancelstat = aio_cancel_all(f.as_raw_fd()); + assert!(cancelstat.is_ok()); - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); + // Wait for aiocb to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aiocb); + let _ = aiocb.as_mut().aio_return(); } -// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all -// AioCb's are complete. -// FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI. #[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)] -fn test_liocb_listio_signal() { - let _m = crate::SIGNAL_MTX.lock(); +// On Cirrus on Linux, this test fails due to a glibc bug. +// https://github.com/nix-rust/nix/issues/1099 +#[cfg_attr(target_os = "linux", ignore)] +// On Cirrus, aio_suspend is failing with EINVAL +// https://github.com/nix-rust/nix/issues/1361 +#[cfg_attr(target_os = "macos", ignore)] +fn test_aio_suspend() { const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; + const WBUF: &[u8] = b"CDEFG"; + let timeout = TimeSpec::seconds(10); let mut rbuf = vec![0; 4]; let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; let mut f = tempfile().unwrap(); - let sa = SigAction::new(SigHandler::Handler(sigfunc), - SaFlags::SA_RESETHAND, - SigSet::empty()); - let sigev_notify = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, - si_value: 0 }; - f.write_all(INITIAL).unwrap(); - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - SIGNALED.store(false, Ordering::Relaxed); - unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); - let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify); - err.expect("lio_listio"); - while !SIGNALED.load(Ordering::Relaxed) { - thread::sleep(time::Duration::from_millis(10)); + let mut wcb = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevNone, + )); + + let mut rcb = Box::pin(AioRead::new( + f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + )); + wcb.as_mut().submit().unwrap(); + rcb.as_mut().submit().unwrap(); + loop { + { + let cbbuf = [ + &*wcb as &dyn AsRef<libc::aiocb>, + &*rcb as &dyn AsRef<libc::aiocb>, + ]; + let r = aio_suspend(&cbbuf[..], Some(timeout)); + match r { + Err(Errno::EINTR) => continue, + Err(e) => panic!("aio_suspend returned {:?}", e), + Ok(_) => (), + }; + } + if rcb.as_mut().error() != Err(Errno::EINPROGRESS) && + wcb.as_mut().error() != Err(Errno::EINPROGRESS) + { + break; } - - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); } - assert_eq!(rbuf.deref().deref(), b"3456"); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); -} -// Try to use LioCb::listio to read into an immutable buffer. It should fail -// FIXME: This test fails to panic on Linux/musl -#[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[should_panic(expected = "Can't read into an immutable buffer")] -#[cfg_attr(target_env = "musl", ignore)] -fn test_liocb_listio_read_immutable() { - let rbuf: &[u8] = b"abcd"; - let f = tempfile().unwrap(); - - - let mut liocb = LioCbBuilder::with_capacity(1) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); + assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len()); + assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen); } diff --git a/test/sys/test_aio_drop.rs b/test/sys/test_aio_drop.rs index f9ff97af..0836a542 100644 --- a/test/sys/test_aio_drop.rs +++ b/test/sys/test_aio_drop.rs @@ -20,11 +20,10 @@ fn test_drop() { let f = tempfile().unwrap(); f.set_len(6).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); + let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevNone)); + aiocb.as_mut().submit().unwrap(); } diff --git a/test/sys/test_lio_listio_resubmit.rs b/test/sys/test_lio_listio_resubmit.rs deleted file mode 100644 index 2ed058c2..00000000 --- a/test/sys/test_lio_listio_resubmit.rs +++ /dev/null @@ -1,106 +0,0 @@ -// vim: tw=80 - -// Annoyingly, Cargo is unable to conditionally build an entire test binary. So -// we must disable the test here rather than in Cargo.toml -#![cfg(target_os = "freebsd")] - -use nix::errno::*; -use nix::libc::off_t; -use nix::sys::aio::*; -use nix::sys::signal::SigevNotify; -use nix::unistd::{SysconfVar, sysconf}; -use std::os::unix::io::AsRawFd; -use std::{thread, time}; -use sysctl::{CtlValue, Sysctl}; -use tempfile::tempfile; - -const BYTES_PER_OP: usize = 512; - -/// Attempt to collect final status for all of `liocb`'s operations, freeing -/// system resources -fn finish_liocb(liocb: &mut LioCb) { - for j in 0..liocb.len() { - loop { - let e = liocb.error(j); - match e { - Ok(()) => break, - Err(Errno::EINPROGRESS) => - thread::sleep(time::Duration::from_millis(10)), - Err(x) => panic!("aio_error({:?})", x) - } - } - assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize); - } -} - -// Deliberately exceed system resource limits, causing lio_listio to return EIO. -// This test must run in its own process since it deliberately uses all AIO -// resources. ATM it is only enabled on FreeBSD, because I don't know how to -// check system AIO limits on other operating systems. -#[test] -fn test_lio_listio_resubmit() { - let mut resubmit_count = 0; - - // Lookup system resource limits - let alm = sysconf(SysconfVar::AIO_LISTIO_MAX) - .expect("sysconf").unwrap() as usize; - let ctl = sysctl::Ctl::new("vfs.aio.max_aio_queue_per_proc").unwrap(); - let maqpp = if let CtlValue::Int(x) = ctl.value().unwrap() { - x as usize - } else { - panic!("unknown sysctl"); - }; - - // Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also - // result in a final lio_listio call that can only partially be queued - let target_ops = maqpp + alm / 2; - let num_listios = (target_ops + alm - 3) / (alm - 2); - let ops_per_listio = (target_ops + num_listios - 1) / num_listios; - assert!((num_listios - 1) * ops_per_listio < maqpp, - "the last lio_listio won't make any progress; fix the algorithm"); - println!("Using {:?} LioCbs of {:?} operations apiece", num_listios, - ops_per_listio); - - let f = tempfile().unwrap(); - let buffer_set = (0..num_listios).map(|_| { - (0..ops_per_listio).map(|_| { - vec![0u8; BYTES_PER_OP] - }).collect::<Vec<_>>() - }).collect::<Vec<_>>(); - - let mut liocbs = (0..num_listios).map(|i| { - let mut builder = LioCbBuilder::with_capacity(ops_per_listio); - for j in 0..ops_per_listio { - let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t; - builder = builder.emplace_slice(f.as_raw_fd(), - offset, - &buffer_set[i][j][..], - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE); - } - let mut liocb = builder.finish(); - let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); - while err == Err(Errno::EIO) || - err == Err(Errno::EAGAIN) || - err == Err(Errno::EINTR) { - // - thread::sleep(time::Duration::from_millis(10)); - resubmit_count += 1; - err = liocb.listio_resubmit(LioMode::LIO_NOWAIT, - SigevNotify::SigevNone); - } - liocb - }).collect::<Vec<_>>(); - - // Ensure that every AioCb completed - for liocb in liocbs.iter_mut() { - finish_liocb(liocb); - } - - if resubmit_count > 0 { - println!("Resubmitted {:?} times, test passed", resubmit_count); - } else { - println!("Never resubmitted. Test ambiguous"); - } -} |