summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <bors[bot]@users.noreply.github.com>2018-04-07 04:33:36 +0000
committerbors[bot] <bors[bot]@users.noreply.github.com>2018-04-07 04:33:36 +0000
commit31e901b4bf36be5a4b27f28ae43105913c5d6245 (patch)
tree8d04f119124460ac48915ed7f3b87ccf7c67b67a
parent2def43d3bcb4de42d4783e4c18413f7999ee5caf (diff)
parent4b769a42d5b55367e5908058e8ca59c3c474cacf (diff)
downloadnix-31e901b4bf36be5a4b27f28ae43105913c5d6245.zip
Merge #872
872: Change sys::aio::lio_listio to sys::aio::LioCb::listio r=asomers a=asomers The new LioCb structure allows us to control the exact arguments passed to lio_listio, guaranteeing that each call gets a unique storage location for the list argument. This prevents clients from misusing lio_listio in a way that causes events to get dropped from a kqueue Fixes #870
-rw-r--r--CHANGELOG.md9
-rw-r--r--Cargo.toml15
-rw-r--r--src/lib.rs1
-rw-r--r--src/sys/aio.rs613
-rw-r--r--test/sys/test_aio.rs139
-rw-r--r--test/sys/test_lio_listio_resubmit.rs111
6 files changed, 617 insertions, 271 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5fffab92..c8c15867 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,10 @@ This project adheres to [Semantic Versioning](http://semver.org/).
([#876](https://github.com/nix-rust/nix/pull/876))
- Added `SO_MARK` on Linux.
- ([#873](https://github.com/nix-rust/nix/pull/873))
+- Added safe support for nearly any buffer type in the `sys::aio` module.
+ ([#872](https://github.com/nix-rust/nix/pull/872))
+- Added `sys::aio::LioCb` as a wrapper for `libc::lio_listio`.
+ ([#872](https://github.com/nix-rust/nix/pull/872))
- Added `getsid` in `::nix::unistd`
([#850](https://github.com/nix-rust/nix/pull/850))
- Added `alarm`. ([#830](https://github.com/nix-rust/nix/pull/830))
@@ -35,6 +39,11 @@ This project adheres to [Semantic Versioning](http://semver.org/).
([#837](https://github.com/nix-rust/nix/pull/837))
### Removed
+- Removed explicit support for the `bytes` crate from the `sys::aio` module.
+ See `sys::aio::AioCb::from_boxed_slice`s examples for alternatives.
+ ([#872](https://github.com/nix-rust/nix/pull/872))
+- Removed `sys::aio::lio_listio`. Use `sys::aio::LioCb::listio` instead.
+ ([#872](https://github.com/nix-rust/nix/pull/872))
## [0.10.0] 2018-01-26
diff --git a/Cargo.toml b/Cargo.toml
index 8b1c90e7..6f46d916 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,20 +17,21 @@ bitflags = "1.0"
cfg-if = "0.1.0"
void = "1.0.2"
-[dependencies.bytes]
-version = "0.4.5"
-# Don't include the optional serde feature
-default-features = false
-
[target.'cfg(target_os = "dragonfly")'.build-dependencies]
cc = "1"
[dev-dependencies]
+# The examples use a new feature of Bytes which should be available in 0.4.7
+# https://github.com/carllerche/bytes/pull/192
+bytes = { git = "https://github.com/carllerche/bytes", rev = "ae1b454" }
lazy_static = "1"
rand = "0.4"
tempdir = "0.3"
tempfile = "2"
+[target.'cfg(target_os = "freebsd")'.dev-dependencies]
+sysctl = "0.1"
+
[[test]]
name = "test"
path = "test/test.rs"
@@ -40,6 +41,10 @@ name = "test-aio-drop"
path = "test/sys/test_aio_drop.rs"
[[test]]
+name = "test-lio-listio-resubmit"
+path = "test/sys/test_lio_listio_resubmit.rs"
+
+[[test]]
name = "test-mount"
path = "test/test_mount.rs"
harness = false
diff --git a/src/lib.rs b/src/lib.rs
index 3873f4a1..07e84c12 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,7 +16,6 @@
#![deny(missing_debug_implementations)]
// External crates
-extern crate bytes;
#[macro_use]
extern crate bitflags;
#[macro_use]
diff --git a/src/sys/aio.rs b/src/sys/aio.rs
index fe8e9ed2..3d539821 100644
--- a/src/sys/aio.rs
+++ b/src/sys/aio.rs
@@ -1,3 +1,4 @@
+// vim: tw=80
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
@@ -21,18 +22,18 @@
//! not support this for all filesystems and devices.
use {Error, Result};
-use bytes::{Bytes, BytesMut};
use errno::Errno;
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
use libc;
+use std::borrow::{Borrow, BorrowMut};
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
-use std::ops::Deref;
use std::ptr::{null, null_mut};
use sys::signal::*;
+use std::thread;
use sys::time::TimeSpec;
libc_enum! {
@@ -92,46 +93,38 @@ pub enum AioCancelStat {
/// Owns (uniquely or shared) a memory buffer to keep it from `Drop`ing while
/// the kernel has a pointer to it.
-#[derive(Clone, Debug)]
pub enum Buffer<'a> {
/// No buffer to own.
///
/// Used for operations like `aio_fsync` that have no data, or for unsafe
/// operations that work with raw pointers.
None,
- /// Immutable shared ownership `Bytes` object
- // Must use out-of-line allocation so the address of the data will be
- // stable. `Bytes` and `BytesMut` sometimes dynamically allocate a buffer,
- // and sometimes inline the data within the struct itself.
- Bytes(Bytes),
- /// Mutable uniquely owned `BytesMut` object
- BytesMut(BytesMut),
/// Keeps a reference to a slice
- Phantom(PhantomData<&'a mut [u8]>)
+ Phantom(PhantomData<&'a mut [u8]>),
+ /// Generic thing that keeps a buffer from dropping
+ BoxedSlice(Box<Borrow<[u8]>>),
+ /// Generic thing that keeps a mutable buffer from dropping
+ BoxedMutSlice(Box<BorrowMut<[u8]>>),
}
-impl<'a> Buffer<'a> {
- /// Return the inner `Bytes`, if any
- pub fn bytes(&self) -> Option<&Bytes> {
- match *self {
- Buffer::Bytes(ref x) => Some(x),
- _ => None
- }
- }
-
- /// Return the inner `BytesMut`, if any
- pub fn bytes_mut(&self) -> Option<&BytesMut> {
- match *self {
- Buffer::BytesMut(ref x) => Some(x),
- _ => None
- }
- }
-
- /// Is this `Buffer` `None`?
- pub fn is_none(&self) -> bool {
+impl<'a> Debug for Buffer<'a> {
+ // Note: someday it may be possible to Derive Debug for a trait object, but
+ // not today.
+ // https://github.com/rust-lang/rust/issues/1563
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
- Buffer::None => true,
- _ => false,
+ Buffer::None => write!(fmt, "None"),
+ Buffer::Phantom(p) => p.fmt(fmt),
+ Buffer::BoxedSlice(ref bs) => {
+ let borrowed : &Borrow<[u8]> = bs.borrow();
+ write!(fmt, "BoxedSlice({:?})",
+ borrowed as *const Borrow<[u8]>)
+ },
+ Buffer::BoxedMutSlice(ref bms) => {
+ let borrowed : &BorrowMut<[u8]> = bms.borrow();
+ write!(fmt, "BoxedMutSlice({:?})",
+ borrowed as *const BorrowMut<[u8]>)
+ }
}
}
}
@@ -149,7 +142,7 @@ pub struct AioCb<'a> {
/// Optionally keeps a reference to the data.
///
/// Used to keep buffers from `Drop`'ing, and may be returned once the
- /// `AioCb` is completed by `into_buffer`.
+ /// `AioCb` is completed by [`buffer`](#method.buffer).
buffer: Buffer<'a>
}
@@ -165,6 +158,50 @@ impl<'a> AioCb<'a> {
x
}
+ /// Remove the inner boxed slice, if any, and return it.
+ ///
+ /// The returned value will be the argument that was passed to
+ /// `from_boxed_slice` when this `AioCb` was created.
+ ///
+ /// It is an error to call this method while the `AioCb` is still in
+ /// progress.
+ pub fn boxed_slice(&mut self) -> Option<Box<Borrow<[u8]>>> {
+ assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
+ if let Buffer::BoxedSlice(_) = self.buffer {
+ let mut oldbuffer = Buffer::None;
+ mem::swap(&mut self.buffer, &mut oldbuffer);
+ if let Buffer::BoxedSlice(inner) = oldbuffer {
+ Some(inner)
+ } else {
+ unreachable!();
+ }
+ } else {
+ None
+ }
+ }
+
+ /// Remove the inner boxed mutable slice, if any, and return it.
+ ///
+ /// The returned value will be the argument that was passed to
+ /// `from_boxed_mut_slice` when this `AioCb` was created.
+ ///
+ /// It is an error to call this method while the `AioCb` is still in
+ /// progress.
+ pub fn boxed_mut_slice(&mut self) -> Option<Box<BorrowMut<[u8]>>> {
+ assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
+ if let Buffer::BoxedMutSlice(_) = self.buffer {
+ let mut oldbuffer = Buffer::None;
+ mem::swap(&mut self.buffer, &mut oldbuffer);
+ if let Buffer::BoxedMutSlice(inner) = oldbuffer {
+ Some(inner)
+ } else {
+ unreachable!();
+ }
+ } else {
+ None
+ }
+ }
+
/// Returns the underlying file descriptor associated with the `AioCb`
pub fn fd(&self) -> RawFd {
self.aiocb.aio_fildes
@@ -186,7 +223,7 @@ impl<'a> AioCb<'a> {
/// # Examples
///
/// Create an `AioCb` from a raw file descriptor and use it for an
- /// [`fsync`](#method.from_bytes_mut) operation.
+ /// [`fsync`](#method.fsync) operation.
///
/// ```
/// # extern crate tempfile;
@@ -229,7 +266,7 @@ impl<'a> AioCb<'a> {
/// operations, but only if the borrow checker can guarantee that the slice
/// will outlive the `AioCb`. That will usually be the case if the `AioCb`
/// is stack-allocated. If the borrow checker gives you trouble, try using
- /// [`from_bytes_mut`](#method.from_bytes_mut) instead.
+ /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead.
///
/// # Parameters
///
@@ -299,17 +336,19 @@ impl<'a> AioCb<'a> {
}
}
- /// Constructs a new `AioCb` from a `Bytes` object.
+ /// The safest and most flexible way to create an `AioCb`.
///
- /// Unlike `from_slice`, this method returns a structure suitable for
+ /// Unlike [`from_slice`], this method returns a structure suitable for
/// placement on the heap. It may be used for write operations, but not
- /// read operations.
+ /// read operations. Unlike `from_ptr`, this method will ensure that the
+ /// buffer doesn't `drop` while the kernel is still processing it. Any
+ /// object that can be borrowed as a boxed slice will work.
///
/// # Parameters
///
/// * `fd`: File descriptor. Required for all aio functions.
/// * `offs`: File offset
- /// * `buf`: A shared memory buffer
+ /// * `buf`: A boxed slice-like object
/// * `prio`: If POSIX Prioritized IO is supported, then the
/// operation will be prioritized at the process's
/// priority level minus `prio`
@@ -321,15 +360,13 @@ impl<'a> AioCb<'a> {
///
/// # Examples
///
- /// Create an `AioCb` from a `Bytes` object and use it for writing.
+ /// Create an `AioCb` from a Vector and use it for writing
///
/// ```
- /// # extern crate bytes;
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::errno::Errno;
/// # use nix::Error;
- /// # use bytes::Bytes;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::{thread, time};
@@ -337,11 +374,12 @@ impl<'a> AioCb<'a> {
/// # use std::os::unix::io::AsRawFd;
/// # use tempfile::tempfile;
/// # fn main() {
- /// let wbuf = Bytes::from(&b"CDEF"[..]);
+ /// let wbuf = Box::new(Vec::from("CDEF"));
+ /// let expected_len = wbuf.len();
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(),
+ /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
/// 2, //offset
- /// wbuf.clone(),
+ /// wbuf,
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_NOP);
@@ -349,72 +387,103 @@ impl<'a> AioCb<'a> {
/// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) {
/// thread::sleep(time::Duration::from_millis(10));
/// }
- /// assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len());
+ /// assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len);
/// # }
/// ```
- pub fn from_bytes(fd: RawFd, offs: off_t, buf: Bytes,
+ ///
+ /// Create an `AioCb` from a `Bytes` object
+ ///
+ /// ```
+ /// # extern crate bytes;
+ /// # extern crate tempfile;
+ /// # extern crate nix;
+ /// # use bytes::Bytes;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify;
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use tempfile::tempfile;
+ /// # fn main() {
+ /// let wbuf = Box::new(Bytes::from(&b"CDEF"[..]));
+ /// let mut f = tempfile().unwrap();
+ /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
+ /// 2, //offset
+ /// wbuf,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_NOP);
+ /// # }
+ /// ```
+ ///
+ /// If a library needs to work with buffers that aren't `Box`ed, it can
+ /// create a `Box`ed container for use with this method. Here's an example
+ /// using an un`Box`ed `Bytes` object.
+ ///
+ /// ```
+ /// # extern crate bytes;
+ /// # extern crate tempfile;
+ /// # extern crate nix;
+ /// # use bytes::Bytes;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify;
+ /// # use std::borrow::Borrow;
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use tempfile::tempfile;
+ /// struct BytesContainer(Bytes);
+ /// impl Borrow<[u8]> for BytesContainer {
+ /// fn borrow(&self) -> &[u8] {
+ /// self.0.as_ref()
+ /// }
+ /// }
+ /// fn main() {
+ /// let wbuf = Bytes::from(&b"CDEF"[..]);
+ /// let boxed_wbuf = Box::new(BytesContainer(wbuf));
+ /// let mut f = tempfile().unwrap();
+ /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
+ /// 2, //offset
+ /// boxed_wbuf,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_NOP);
+ /// }
+ /// ```
+ ///
+ /// [`from_slice`]: #method.from_slice
+ pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box<Borrow<[u8]>>,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
- // Small BytesMuts are stored inline. Inline storage is a no-no,
- // because we store a pointer to the buffer in the AioCb before
- // returning the Buffer by move. If the buffer is too small, reallocate
- // it to force out-of-line storage
- // TODO: Add an is_inline() method to BytesMut, and a way to explicitly
- // force out-of-line allocation.
- let buf2 = if buf.len() < 64 {
- // Reallocate to force out-of-line allocation
- let mut ool = Bytes::with_capacity(64);
- ool.extend_from_slice(buf.deref());
- ool
- } else {
- buf
- };
let mut a = AioCb::common_init(fd, prio, sigev_notify);
+ {
+ let borrowed : &Borrow<[u8]> = buf.borrow();
+ let slice : &[u8] = borrowed.borrow();
+ a.aio_nbytes = slice.len() as size_t;
+ a.aio_buf = slice.as_ptr() as *mut c_void;
+ }
a.aio_offset = offs;
- a.aio_nbytes = buf2.len() as size_t;
- a.aio_buf = buf2.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
- buffer: Buffer::Bytes(buf2),
+ buffer: Buffer::BoxedSlice(buf),
}
}
- /// Constructs a new `AioCb` from a `BytesMut` object.
+ /// The safest and most flexible way to create an `AioCb` for reading.
///
- /// Unlike `from_mut_slice`, this method returns a structure suitable for
- /// placement on the heap. It may be used for both reads and writes.
- ///
- /// # Parameters
- ///
- /// * `fd`: File descriptor. Required for all aio functions.
- /// * `offs`: File offset
- /// * `buf`: An owned memory buffer
- /// * `prio`: If POSIX Prioritized IO is supported, then the
- /// operation will be prioritized at the process's
- /// priority level minus `prio`
- /// * `sigev_notify`: Determines how you will be notified of event
- /// completion.
- /// * `opcode`: This field is only used for `lio_listio`. It
- /// determines which operation to use for this individual
- /// aiocb
+ /// Like [`from_boxed_slice`], but the slice is a mutable one. More
+ /// flexible than [`from_mut_slice`], because a wide range of objects can be
+ /// used.
///
/// # Examples
///
- /// Create an `AioCb` from a `BytesMut` and use it for reading. In this
- /// example the `AioCb` is stack-allocated, so we could've used
- /// `from_mut_slice` instead.
+ /// Create an `AioCb` from a Vector and use it for reading
///
/// ```
- /// # extern crate bytes;
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::errno::Errno;
/// # use nix::Error;
- /// # use bytes::BytesMut;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::{thread, time};
@@ -424,10 +493,10 @@ impl<'a> AioCb<'a> {
/// # fn main() {
/// const INITIAL: &[u8] = b"abcdef123456";
/// const LEN: usize = 4;
- /// let rbuf = BytesMut::from(vec![0; LEN]);
+ /// let rbuf = Box::new(vec![0; LEN]);
/// let mut f = tempfile().unwrap();
/// f.write_all(INITIAL).unwrap();
- /// let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(),
+ /// let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(),
/// 2, //offset
/// rbuf,
/// 0, //priority
@@ -438,33 +507,33 @@ impl<'a> AioCb<'a> {
/// thread::sleep(time::Duration::from_millis(10));
/// }
/// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN);
- /// let buffer = aiocb.into_buffer();
+ /// let mut buffer = aiocb.boxed_mut_slice().unwrap();
/// const EXPECT: &[u8] = b"cdef";
- /// assert_eq!(buffer.bytes_mut().unwrap(), EXPECT);
+ /// assert_eq!(buffer.borrow_mut(), EXPECT);
/// # }
/// ```
- pub fn from_bytes_mut(fd: RawFd, offs: off_t, buf: BytesMut,
- prio: libc::c_int, sigev_notify: SigevNotify,
- opcode: LioOpcode) -> AioCb<'a> {
- let mut buf2 = if buf.len() < 64 {
- // Reallocate to force out-of-line allocation
- let mut ool = BytesMut::with_capacity(64);
- ool.extend_from_slice(buf.deref());
- ool
- } else {
- buf
- };
+ ///
+ /// [`from_boxed_slice`]: #method.from_boxed_slice
+ /// [`from_mut_slice`]: #method.from_mut_slice
+ pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t,
+ mut buf: Box<BorrowMut<[u8]>>,
+ prio: libc::c_int, sigev_notify: SigevNotify,
+ opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
+ {
+ let borrowed : &mut BorrowMut<[u8]> = buf.borrow_mut();
+ let slice : &mut [u8] = borrowed.borrow_mut();
+ a.aio_nbytes = slice.len() as size_t;
+ a.aio_buf = slice.as_mut_ptr() as *mut c_void;
+ }
a.aio_offset = offs;
- a.aio_nbytes = buf2.len() as size_t;
- a.aio_buf = buf2.as_mut_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: true,
in_progress: false,
- buffer: Buffer::BytesMut(buf2),
+ buffer: Buffer::BoxedMutSlice(buf),
}
}
@@ -474,7 +543,7 @@ impl<'a> AioCb<'a> {
/// placement on the heap. It may be used for both reads and writes. Due
/// to its unsafety, this method is not recommended. It is most useful when
/// heap allocation is required but for some reason the data cannot be
- /// converted to a `BytesMut`.
+ /// wrapped in a `struct` that implements `BorrowMut<[u8]>`
///
/// # Parameters
///
@@ -518,7 +587,8 @@ impl<'a> AioCb<'a> {
/// Unlike `from_slice`, this method returns a structure suitable for
/// placement on the heap. Due to its unsafety, this method is not
/// recommended. It is most useful when heap allocation is required but for
- /// some reason the data cannot be converted to a `Bytes`.
+ /// some reason the data cannot be wrapped in a `struct` that implements
+ /// `Borrow<[u8]>`
///
/// # Parameters
///
@@ -623,19 +693,6 @@ impl<'a> AioCb<'a> {
}
}
- /// Consumes the `aiocb` and returns its inner `Buffer`, if any.
- ///
- /// This method is especially useful when reading into a `BytesMut`, because
- /// that type does not support shared ownership.
- pub fn into_buffer(mut self) -> Buffer<'static> {
- let buf = self.buffer();
- match buf {
- Buffer::BytesMut(x) => Buffer::BytesMut(x),
- Buffer::Bytes(x) => Buffer::Bytes(x),
- _ => Buffer::None
- }
- }
-
fn common_init(fd: RawFd, prio: libc::c_int,
sigev_notify: SigevNotify) -> libc::aiocb {
// Use mem::zeroed instead of explicitly zeroing each field, because the
@@ -669,12 +726,10 @@ impl<'a> AioCb<'a> {
/// result.
///
/// ```
- /// # extern crate bytes;
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::errno::Errno;
/// # use nix::Error;
- /// # use bytes::Bytes;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::{thread, time};
@@ -682,11 +737,11 @@ impl<'a> AioCb<'a> {
/// # use std::os::unix::io::AsRawFd;
/// # use tempfile::tempfile;
/// # fn main() {
- /// let wbuf = Bytes::from(&b"CDEF"[..]);
+ /// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
- /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(),
+ /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
- /// wbuf.clone(),
+ /// &wbuf[..],
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_NOP);
@@ -870,12 +925,10 @@ impl<'a> AioCb<'a> {
/// descriptor.
///
/// ```
-/// # extern crate bytes;
/// # extern crate tempfile;
/// # extern crate nix;
/// # use nix::errno::Errno;
/// # use nix::Error;
-/// # use bytes::Bytes;
/// # use nix::sys::aio::*;
/// # use nix::sys::signal::SigevNotify;
/// # use std::{thread, time};
@@ -883,11 +936,11 @@ impl<'a> AioCb<'a> {
/// # use std::os::unix::io::AsRawFd;
/// # use tempfile::tempfile;
/// # fn main() {
-/// let wbuf = Bytes::from(&b"CDEF"[..]);
+/// let wbuf = b"CDEF";
/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(),
+/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
/// 2, //offset
-/// wbuf.clone(),
+/// &wbuf[..],
/// 0, //priority
/// SigevNotify::SigevNone,
/// LioOpcode::LIO_NOP);
@@ -965,63 +1018,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
}).map(drop)
}
-
-/// Submits multiple asynchronous I/O requests with a single system call.
-///
-/// They are not guaranteed to complete atomically, and the order in which the
-/// requests are carried out is not specified. Reads, writes, and fsyncs may be
-/// freely mixed.
-///
-/// This function is useful for reducing the context-switch overhead of
-/// submitting many AIO operations. It can also be used with
-/// `LioMode::LIO_WAIT` to block on the result of several independent
-/// operations. Used that way, it is often useful in programs that otherwise
-/// make little use of AIO.
-///
-/// # Examples
-///
-/// Use `lio_listio` to submit an aio operation and wait for its completion. In
-/// this case, there is no need to use `aio_suspend` to wait or `AioCb#error` to
-/// poll.
-///
-/// ```
-/// # extern crate tempfile;
-/// # extern crate nix;
-/// # use nix::sys::aio::*;
-/// # use nix::sys::signal::SigevNotify;
-/// # use std::os::unix::io::AsRawFd;
-/// # use tempfile::tempfile;
-/// # fn main() {
-/// const WBUF: &[u8] = b"abcdef123456";
-/// let mut f = tempfile().unwrap();
-/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
-/// 2, //offset
-/// WBUF,
-/// 0, //priority
-/// SigevNotify::SigevNone,
-/// LioOpcode::LIO_WRITE);
-/// lio_listio(LioMode::LIO_WAIT,
-/// &[&mut aiocb],
-/// SigevNotify::SigevNone).unwrap();
-/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
-/// # }
-/// ```
-///
-/// # References
-///
-/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
-#[cfg(not(any(target_os = "ios", target_os = "macos")))]
-pub fn lio_listio(mode: LioMode, list: &[&mut AioCb],
- sigev_notify: SigevNotify) -> Result<()> {
- let sigev = SigEvent::new(sigev_notify);
- let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
- let plist = list as *const [&mut AioCb] as *const [*mut libc::aiocb];
- let p = plist as *const *mut libc::aiocb;
- Errno::result(unsafe {
- libc::lio_listio(mode as i32, p, list.len() as i32, sigevp)
- }).map(drop)
-}
-
impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
@@ -1042,6 +1038,249 @@ impl<'a> Drop for AioCb<'a> {
/// If the `AioCb` has no remaining state in the kernel, just drop it.
/// Otherwise, dropping constitutes a resource leak, which is an error
fn drop(&mut self) {
- assert!(!self.in_progress, "Dropped an in-progress AioCb");
+ assert!(thread::panicking() || !self.in_progress,
+ "Dropped an in-progress AioCb");
+ }
+}
+
+/// LIO Control Block.
+///
+/// The basic structure used to issue multiple AIO operations simultaneously.
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+pub struct LioCb<'a> {
+ /// A collection of [`AioCb`]s. All of these will be issued simultaneously
+ /// by the [`listio`] method.
+ ///
+ /// [`AioCb`]: struct.AioCb.html
+ /// [`listio`]: #method.listio
+ pub aiocbs: Vec<AioCb<'a>>,
+
+ /// The actual list passed to `libc::lio_listio`.
+ ///
+ /// It must live for as long as any of the operations are still being
+ /// processesed, because the aio subsystem uses its address as a unique
+ /// identifier.
+ list: Vec<*mut libc::aiocb>,
+
+ /// A partial set of results. This field will get populated by
+ /// `listio_resubmit` when an `LioCb` is resubmitted after an error
+ results: Vec<Option<Result<isize>>>
+}
+
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+impl<'a> LioCb<'a> {
+ /// Initialize an empty `LioCb`
+ pub fn with_capacity(capacity: usize) -> LioCb<'a> {
+ LioCb {
+ aiocbs: Vec::with_capacity(capacity),
+ list: Vec::with_capacity(capacity),
+ results: Vec::with_capacity(capacity)
+ }
+ }
+
+ /// Submits multiple asynchronous I/O requests with a single system call.
+ ///
+ /// They are not guaranteed to complete atomically, and the order in which
+ /// the requests are carried out is not specified. Reads, writes, and
+ /// fsyncs may be freely mixed.
+ ///
+ /// This function is useful for reducing the context-switch overhead of
+ /// submitting many AIO operations. It can also be used with
+ /// `LioMode::LIO_WAIT` to block on the result of several independent
+ /// operations. Used that way, it is often useful in programs that
+ /// otherwise make little use of AIO.
+ ///
+ /// # Examples
+ ///
+ /// Use `listio` to submit an aio operation and wait for its completion. In
+ /// this case, there is no need to use [`aio_suspend`] to wait or
+ /// [`AioCb::error`] to poll.
+ ///
+ /// ```
+ /// # extern crate tempfile;
+ /// # extern crate nix;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify;
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use tempfile::tempfile;
+ /// # fn main() {
+ /// const WBUF: &[u8] = b"abcdef123456";
+ /// let mut f = tempfile().unwrap();
+ /// let mut liocb = LioCb::with_capacity(1);
+ /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
+ /// 2, //offset
+ /// WBUF,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_WRITE));
+ /// liocb.listio(LioMode::LIO_WAIT,
+ /// SigevNotify::SigevNone).unwrap();
+ /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
+ /// # }
+ /// ```
+ ///
+ /// # References
+ ///
+ /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
+ ///
+ /// [`aio_suspend`]: fn.aio_suspend.html
+ /// [`AioCb::error`]: struct.AioCb.html#method.error
+ pub fn listio(&mut self, mode: LioMode,
+ sigev_notify: SigevNotify) -> Result<()> {
+ let sigev = SigEvent::new(sigev_notify);
+ let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
+ self.list.clear();
+ for a in &mut self.aiocbs {
+ a.in_progress = true;
+ self.list.push(a as *mut AioCb<'a>
+ as *mut libc::aiocb);
+ }
+ let p = self.list.as_ptr();
+ Errno::result(unsafe {
+ libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
+ }).map(|_| ())
+ }
+
+ /// Resubmits any incomplete operations with [`lio_listio`].
+ ///
+ /// Sometimes, due to system resource limitations, an `lio_listio` call will
+ /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
+ /// `EINTR`. In any of these cases, only a subset of its constituent
+ /// operations will actually have been initiated. `listio_resubmit` will
+ /// resubmit any operations that are still uninitiated.
+ ///
+ /// After calling `listio_resubmit`, results should be collected by
+ /// [`LioCb::aio_return`].
+ ///
+ /// # Examples
+ /// ```no_run
+ /// # extern crate tempfile;
+ /// # extern crate nix;
+ /// # use nix::Error;
+ /// # use nix::errno::Errno;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify;
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use std::{thread, time};
+ /// # use tempfile::tempfile;
+ /// # fn main() {
+ /// const WBUF: &[u8] = b"abcdef123456";
+ /// let mut f = tempfile().unwrap();
+ /// let mut liocb = LioCb::with_capacity(1);
+ /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
+ /// 2, //offset
+ /// WBUF,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_WRITE));
+ /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
+ /// while err == Err(Error::Sys(Errno::EIO)) ||
+ /// err == Err(Error::Sys(Errno::EAGAIN)) {
+ /// thread::sleep(time::Duration::from_millis(10));
+ /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
+ /// }
+ /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
+ /// # }
+ /// ```
+ ///
+ /// # References
+ ///
+ /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
+ ///
+ /// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return
+ // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
+ // changed by this method, because the kernel relies on their addresses
+ // being stable.
+ // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
+ // sigev_notify will immediately refire.
+ pub fn listio_resubmit(&mut self, mode:LioMode,
+ sigev_notify: SigevNotify) -> Result<()> {
+ let sigev = SigEvent::new(sigev_notify);
+ let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
+ self.list.clear();
+
+ while self.results.len() < self.aiocbs.len() {
+ self.results.push(None);
+ }
+
+ for (i, a) in self.aiocbs.iter_mut().enumerate() {
+ if self.results[i].is_some() {
+ // Already collected final status for this operation
+ continue;
+ }
+ match a.error() {
+ Ok(()) => {
+ // aiocb is complete; collect its status and don't resubmit
+ self.results[i] = Some(a.aio_return());
+ },
+ Err(Error::Sys(Errno::EAGAIN)) => {
+ self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
+ },
+ Err(Error::Sys(Errno::EINPROGRESS)) => {
+ // aiocb is was successfully queued; no need to do anything
+ ()
+ },
+ Err(Error::Sys(Errno::EINVAL)) => panic!(
+ "AioCb was never submitted, or already finalized"),
+ _ => unreachable!()
+ }
+ }
+ let p = self.list.as_ptr();
+ Errno::result(unsafe {
+ libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
+ }).map(|_| ())
+ }
+
+ /// Collect final status for an individual `AioCb` submitted as part of an
+ /// `LioCb`.
+ ///
+ /// This is just like [`AioCb::aio_return`], except it takes into account
+ /// operations that were restarted by [`LioCb::listio_resubmit`]
+ ///
+ /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return
+ /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
+ pub fn aio_return(&mut self, i: usize) -> Result<isize> {
+ if i >= self.results.len() || self.results[i].is_none() {
+ self.aiocbs[i].aio_return()
+ } else {
+ self.results[i].unwrap()
+ }
+ }
+
+ /// Retrieve error status of an individual `AioCb` submitted as part of an
+ /// `LioCb`.
+ ///
+ /// This is just like [`AioCb::error`], except it takes into account
+ /// operations that were restarted by [`LioCb::listio_resubmit`]
+ ///
+ /// [`AioCb::error`]: struct.AioCb.html#method.error
+ /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
+ pub fn error(&mut self, i: usize) -> Result<()> {
+ if i >= self.results.len() || self.results[i].is_none() {
+ self.aiocbs[i].error()
+ } else {
+ Ok(())
+ }
+ }
+}
+
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+impl<'a> Debug for LioCb<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("LioCb")
+ .field("aiocbs", &self.aiocbs)
+ .finish()
+ }
+}
+
+#[cfg(not(any(target_os = "ios", target_os = "macos")))]
+impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
+ fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
+ LioCb {
+ list: Vec::with_capacity(src.capacity()),
+ results: Vec::with_capacity(src.capacity()),
+ aiocbs: src,
+ }
}
}
diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs
index f88fc268..48399fbd 100644
--- a/test/sys/test_aio.rs
+++ b/test/sys/test_aio.rs
@@ -323,20 +323,21 @@ fn test_write() {
assert!(rbuf == EXPECT);
}
-// Tests `AioCb::from_bytes`
+// Tests `AioCb::from_boxed_slice` with `Bytes`
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_write_bytes() {
const INITIAL: &[u8] = b"abcdef123456";
- let wbuf = Bytes::from(&b"CDEF"[..]);
+ let wbuf = Box::new(Bytes::from(&b"CDEF"[..]));
let mut rbuf = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
+ let expected_len = wbuf.len();
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_bytes( f.as_raw_fd(),
+ let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
2, //offset
- wbuf.clone(),
+ wbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
@@ -344,7 +345,7 @@ fn test_write_bytes() {
let err = poll_aio(&mut aiocb);
assert!(err == Ok(()));
- assert!(aiocb.aio_return().unwrap() as usize == wbuf.len());
+ assert!(aiocb.aio_return().unwrap() as usize == expected_len);
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
@@ -352,46 +353,17 @@ fn test_write_bytes() {
assert!(rbuf == EXPECT);
}
-// Tests `AioCb::from_bytes_mut`
-#[test]
-#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_read_bytes_mut_big() {
- const INITIAL: &[u8] = b"abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678";
- // rbuf needs to be larger than 32 bytes (64 on 32-bit systems) so
- // BytesMut::clone is implemented by reference.
- let rbuf = BytesMut::from(vec![0; 70]);
- const EXPECT: &[u8] = b"cdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh";
- let mut f = tempfile().unwrap();
- f.write_all(INITIAL).unwrap();
-
- let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(),
- 2, //offset
- rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_NOP);
- aiocb.read().unwrap();
-
- let err = poll_aio(&mut aiocb);
- assert_eq!(err, Ok(()));
- assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
- let buffer = aiocb.into_buffer();
- assert_eq!(buffer.bytes_mut().unwrap(), EXPECT);
-}
-
-// Tests reallocation in `AioCb::from_bytes_mut`
+// Tests `AioCb::from_boxed_mut_slice` with `BytesMut`
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_read_bytes_mut_small() {
const INITIAL: &[u8] = b"abcdef";
- // rbuf needs to be no more than 32 bytes (64 on 32-bit systems) so
- // BytesMut::clone is implemented inline.
- let rbuf = BytesMut::from(vec![0; 4]);
+ let rbuf = Box::new(BytesMut::from(vec![0; 4]));
const EXPECT: &[u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
- let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(),
+ let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(),
2, //offset
rbuf,
0, //priority
@@ -402,8 +374,8 @@ fn test_read_bytes_mut_small() {
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
- let buffer = aiocb.into_buffer();
- assert_eq!(buffer.bytes_mut().unwrap(), EXPECT);
+ let buffer = aiocb.boxed_mut_slice().unwrap();
+ assert_eq!(buffer.borrow(), EXPECT);
}
// Tests `AioCb::from_ptr`
@@ -505,12 +477,12 @@ fn test_write_sigev_signal() {
assert!(rbuf == EXPECT);
}
-// Test lio_listio with LIO_WAIT, so all AIO ops should be complete by the time
-// lio_listio returns.
+// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the
+// time listio returns.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_lio_listio_wait() {
+fn test_liocb_listio_wait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
@@ -522,24 +494,27 @@ fn test_lio_listio_wait() {
f.write_all(INITIAL).unwrap();
{
- let mut wcb = AioCb::from_slice( f.as_raw_fd(),
+ let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
- let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
+ let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
- let err = lio_listio(LioMode::LIO_WAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone);
- err.expect("lio_listio failed");
-
- assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
- assert!(rcb.aio_return().unwrap() as usize == rlen);
+ let mut liocb = LioCb::with_capacity(2);
+ liocb.aiocbs.push(wcb);
+ liocb.aiocbs.push(rcb);
+ let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
+ err.expect("lio_listio");
+
+ assert!(liocb.aio_return(0).unwrap() as usize == WBUF.len());
+ assert!(liocb.aio_return(1).unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");
@@ -549,12 +524,12 @@ fn test_lio_listio_wait() {
assert!(rbuf2 == EXPECT);
}
-// Test lio_listio with LIO_NOWAIT and no SigEvent, so we must use some other
+// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other
// mechanism to check for the individual AioCb's completion.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
-fn test_lio_listio_nowait() {
+fn test_liocb_listio_nowait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
@@ -566,26 +541,29 @@ fn test_lio_listio_nowait() {
f.write_all(INITIAL).unwrap();
{
- let mut wcb = AioCb::from_slice( f.as_raw_fd(),
+ let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
- let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
+ let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
- let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone);
- err.expect("lio_listio failed");
-
- poll_aio(&mut wcb).unwrap();
- poll_aio(&mut rcb).unwrap();
- assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
- assert!(rcb.aio_return().unwrap() as usize == rlen);
+ let mut liocb = LioCb::with_capacity(2);
+ liocb.aiocbs.push(wcb);
+ liocb.aiocbs.push(rcb);
+ let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
+ err.expect("lio_listio");
+
+ poll_aio(&mut liocb.aiocbs[0]).unwrap();
+ poll_aio(&mut liocb.aiocbs[1]).unwrap();
+ assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
+ assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");
@@ -595,13 +573,13 @@ fn test_lio_listio_nowait() {
assert!(rbuf2 == EXPECT);
}
-// Test lio_listio with LIO_NOWAIT and a SigEvent to indicate when all AioCb's
-// are complete.
+// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all
+// AioCb's are complete.
// FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI.
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)]
-fn test_lio_listio_signal() {
+fn test_liocb_listio_signal() {
#[allow(unused_variables)]
let m = ::SIGNAL_MTX.lock().expect("Mutex got poisoned by another test");
const INITIAL: &[u8] = b"abcdef123456";
@@ -620,29 +598,32 @@ fn test_lio_listio_signal() {
f.write_all(INITIAL).unwrap();
{
- let mut wcb = AioCb::from_slice( f.as_raw_fd(),
+ let wcb = AioCb::from_slice( f.as_raw_fd(),
2, //offset
WBUF,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
- let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
+ let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, //offset
&mut rbuf,
0, //priority
SigevNotify::SigevNone,
LioOpcode::LIO_READ);
+ let mut liocb = LioCb::with_capacity(2);
+ liocb.aiocbs.push(wcb);
+ liocb.aiocbs.push(rcb);
SIGNALED.store(false, Ordering::Relaxed);
unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
- let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], sigev_notify);
- err.expect("lio_listio failed");
+ let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify);
+ err.expect("lio_listio");
while !SIGNALED.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
}
- assert!(wcb.aio_return().unwrap() as usize == WBUF.len());
- assert!(rcb.aio_return().unwrap() as usize == rlen);
+ assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len());
+ assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen);
}
assert!(rbuf.deref().deref() == b"3456");
@@ -652,22 +633,24 @@ fn test_lio_listio_signal() {
assert!(rbuf2 == EXPECT);
}
-// Try to use lio_listio to read into an immutable buffer. It should fail
+// Try to use LioCb::listio to read into an immutable buffer. It should fail
// FIXME: This test fails to panic on Linux/musl
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[should_panic(expected = "Can't read into an immutable buffer")]
#[cfg_attr(target_env = "musl", ignore)]
-fn test_lio_listio_read_immutable() {
+fn test_liocb_listio_read_immutable() {
let rbuf: &[u8] = b"abcd";
let f = tempfile().unwrap();
- let mut rcb = AioCb::from_slice( f.as_raw_fd(),
- 2, //offset
- rbuf,
- 0, //priority
- SigevNotify::SigevNone,
- LioOpcode::LIO_READ);
- let _ = lio_listio(LioMode::LIO_NOWAIT, &[&mut rcb], SigevNotify::SigevNone);
+ let mut liocb = LioCb::from(vec![
+ AioCb::from_slice( f.as_raw_fd(),
+ 2, //offset
+ rbuf,
+ 0, //priority
+ SigevNotify::SigevNone,
+ LioOpcode::LIO_READ)
+ ]);
+ let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
}
diff --git a/test/sys/test_lio_listio_resubmit.rs b/test/sys/test_lio_listio_resubmit.rs
new file mode 100644
index 00000000..19ee3fac
--- /dev/null
+++ b/test/sys/test_lio_listio_resubmit.rs
@@ -0,0 +1,111 @@
+// vim: tw=80
+
+// Annoyingly, Cargo is unable to conditionally build an entire test binary. So
+// we must disable the test here rather than in Cargo.toml
+#![cfg(target_os = "freebsd")]
+
+extern crate nix;
+extern crate sysctl;
+extern crate tempfile;
+
+use nix::Error;
+use nix::errno::*;
+use nix::libc::off_t;
+use nix::sys::aio::*;
+use nix::sys::signal::SigevNotify;
+use nix::unistd::{SysconfVar, sysconf};
+use std::os::unix::io::AsRawFd;
+use std::{thread, time};
+use sysctl::CtlValue;
+use tempfile::tempfile;
+
+const BYTES_PER_OP: usize = 512;
+
+/// Attempt to collect final status for all of `liocb`'s operations, freeing
+/// system resources
+fn finish_liocb(liocb: &mut LioCb) {
+ for j in 0..liocb.aiocbs.len() {
+ loop {
+ let e = liocb.error(j);
+ match e {
+ Ok(()) => break,
+ Err(Error::Sys(Errno::EINPROGRESS)) =>
+ thread::sleep(time::Duration::from_millis(10)),
+ Err(x) => panic!("aio_error({:?})", x)
+ }
+ }
+ assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize);
+ }
+}
+
+// Deliberately exceed system resource limits, causing lio_listio to return EIO.
+// This test must run in its own process since it deliberately uses all AIO
+// resources. ATM it is only enabled on FreeBSD, because I don't know how to
+// check system AIO limits on other operating systems.
+#[test]
+fn test_lio_listio_resubmit() {
+ let mut resubmit_count = 0;
+
+ // Lookup system resource limits
+ let alm = sysconf(SysconfVar::AIO_LISTIO_MAX)
+ .expect("sysconf").unwrap() as usize;
+ let maqpp = if let CtlValue::Int(x) = sysctl::value(
+ "vfs.aio.max_aio_queue_per_proc").unwrap(){
+ x as usize
+ } else {
+ panic!("unknown sysctl");
+ };
+
+ // Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also
+ // result in a final lio_listio call that can only partially be queued
+ let target_ops = maqpp + alm / 2;
+ let num_listios = (target_ops + alm - 3) / (alm - 2);
+ let ops_per_listio = (target_ops + num_listios - 1) / num_listios;
+ assert!((num_listios - 1) * ops_per_listio < maqpp,
+ "the last lio_listio won't make any progress; fix the algorithm");
+ println!("Using {:?} LioCbs of {:?} operations apiece", num_listios,
+ ops_per_listio);
+
+ let f = tempfile().unwrap();
+ let buffer_set = (0..num_listios).map(|_| {
+ (0..ops_per_listio).map(|_| {
+ vec![0u8; BYTES_PER_OP]
+ }).collect::<Vec<_>>()
+ }).collect::<Vec<_>>();
+
+ let mut liocbs = (0..num_listios).map(|i| {
+ let mut liocb = LioCb::with_capacity(ops_per_listio);
+ for j in 0..ops_per_listio {
+ let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t;
+ let wcb = AioCb::from_slice( f.as_raw_fd(),
+ offset,
+ &buffer_set[i][j][..],
+ 0, //priority
+ SigevNotify::SigevNone,
+ LioOpcode::LIO_WRITE);
+ liocb.aiocbs.push(wcb);
+ }
+ let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
+ while err == Err(Error::Sys(Errno::EIO)) ||
+ err == Err(Error::Sys(Errno::EAGAIN)) ||
+ err == Err(Error::Sys(Errno::EINTR)) {
+ //
+ thread::sleep(time::Duration::from_millis(10));
+ resubmit_count += 1;
+ err = liocb.listio_resubmit(LioMode::LIO_NOWAIT,
+ SigevNotify::SigevNone);
+ }
+ liocb
+ }).collect::<Vec<_>>();
+
+ // Ensure that every AioCb completed
+ for liocb in liocbs.iter_mut() {
+ finish_liocb(liocb);
+ }
+
+ if resubmit_count > 0 {
+ println!("Resubmitted {:?} times, test passed", resubmit_count);
+ } else {
+ println!("Never resubmitted. Test ambiguous");
+ }
+}