summaryrefslogtreecommitdiff
path: root/src/sys/aio.rs
diff options
context:
space:
mode:
authorAlan Somers <asomers@gmail.com>2018-03-22 09:44:54 -0600
committerAlan Somers <asomers@gmail.com>2018-04-06 21:26:40 -0600
commit4b769a42d5b55367e5908058e8ca59c3c474cacf (patch)
treee9363fa44160f16b083e7c9e24bed3ff5d7e4b9d /src/sys/aio.rs
parent4729935dec575d1d0353f3cdf7e318b4157d2cc3 (diff)
downloadnix-4b769a42d5b55367e5908058e8ca59c3c474cacf.zip
Add LioCb::listio_resubmit
It helps deal with errors like EAGAIN, which can result in a subset of an LioCb's operations being queued. The test is only enabled on FreeBSD, because it requires intimate knowledge of AIO system limits.
Diffstat (limited to 'src/sys/aio.rs')
-rw-r--r--src/sys/aio.rs160
1 files changed, 147 insertions, 13 deletions
diff --git a/src/sys/aio.rs b/src/sys/aio.rs
index 8a958c84..3d539821 100644
--- a/src/sys/aio.rs
+++ b/src/sys/aio.rs
@@ -1,3 +1,4 @@
+// vim: tw=80
//! POSIX Asynchronous I/O
//!
//! The POSIX AIO interface is used for asynchronous I/O on files and disk-like
@@ -31,8 +32,8 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
-use std::thread;
use sys::signal::*;
+use std::thread;
use sys::time::TimeSpec;
libc_enum! {
@@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> {
// not today.
// https://github.com/rust-lang/rust/issues/1563
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- match self {
- &Buffer::None => write!(fmt, "None"),
- &Buffer::Phantom(p) => p.fmt(fmt),
- &Buffer::BoxedSlice(ref bs) => {
+ match *self {
+ Buffer::None => write!(fmt, "None"),
+ Buffer::Phantom(p) => p.fmt(fmt),
+ Buffer::BoxedSlice(ref bs) => {
let borrowed : &Borrow<[u8]> = bs.borrow();
write!(fmt, "BoxedSlice({:?})",
borrowed as *const Borrow<[u8]>)
},
- &Buffer::BoxedMutSlice(ref bms) => {
+ Buffer::BoxedMutSlice(ref bms) => {
let borrowed : &BorrowMut<[u8]> = bms.borrow();
write!(fmt, "BoxedMutSlice({:?})",
borrowed as *const BorrowMut<[u8]>)
@@ -265,7 +266,7 @@ impl<'a> AioCb<'a> {
/// operations, but only if the borrow checker can guarantee that the slice
/// will outlive the `AioCb`. That will usually be the case if the `AioCb`
/// is stack-allocated. If the borrow checker gives you trouble, try using
- /// [`from_bytes_mut`](#method.from_bytes_mut) instead.
+ /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead.
///
/// # Parameters
///
@@ -1059,7 +1060,11 @@ pub struct LioCb<'a> {
/// It must live for as long as any of the operations are still being
/// processesed, because the aio subsystem uses its address as a unique
/// identifier.
- list: Vec<*mut libc::aiocb>
+ list: Vec<*mut libc::aiocb>,
+
+ /// A partial set of results. This field will get populated by
+ /// `listio_resubmit` when an `LioCb` is resubmitted after an error
+ results: Vec<Option<Result<isize>>>
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
@@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> {
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
aiocbs: Vec::with_capacity(capacity),
- list: Vec::with_capacity(capacity)
+ list: Vec::with_capacity(capacity),
+ results: Vec::with_capacity(capacity)
}
}
@@ -1087,8 +1093,8 @@ impl<'a> LioCb<'a> {
/// # Examples
///
/// Use `listio` to submit an aio operation and wait for its completion. In
- /// this case, there is no need to use `aio_suspend` to wait or
- /// `AioCb#error` to poll.
+ /// this case, there is no need to use [`aio_suspend`] to wait or
+ /// [`AioCb::error`] to poll.
///
/// ```
/// # extern crate tempfile;
@@ -1109,19 +1115,23 @@ impl<'a> LioCb<'a> {
/// LioOpcode::LIO_WRITE));
/// liocb.listio(LioMode::LIO_WAIT,
/// SigevNotify::SigevNone).unwrap();
- /// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
+ /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
/// # }
/// ```
///
/// # References
///
/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
+ ///
+ /// [`aio_suspend`]: fn.aio_suspend.html
+ /// [`AioCb::error`]: struct.AioCb.html#method.error
pub fn listio(&mut self, mode: LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
- for a in self.aiocbs.iter_mut() {
+ for a in &mut self.aiocbs {
+ a.in_progress = true;
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
@@ -1130,6 +1140,129 @@ impl<'a> LioCb<'a> {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(|_| ())
}
+
+ /// Resubmits any incomplete operations with [`lio_listio`].
+ ///
+ /// Sometimes, due to system resource limitations, an `lio_listio` call will
+ /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return
+ /// `EINTR`. In any of these cases, only a subset of its constituent
+ /// operations will actually have been initiated. `listio_resubmit` will
+ /// resubmit any operations that are still uninitiated.
+ ///
+ /// After calling `listio_resubmit`, results should be collected by
+ /// [`LioCb::aio_return`].
+ ///
+ /// # Examples
+ /// ```no_run
+ /// # extern crate tempfile;
+ /// # extern crate nix;
+ /// # use nix::Error;
+ /// # use nix::errno::Errno;
+ /// # use nix::sys::aio::*;
+ /// # use nix::sys::signal::SigevNotify;
+ /// # use std::os::unix::io::AsRawFd;
+ /// # use std::{thread, time};
+ /// # use tempfile::tempfile;
+ /// # fn main() {
+ /// const WBUF: &[u8] = b"abcdef123456";
+ /// let mut f = tempfile().unwrap();
+ /// let mut liocb = LioCb::with_capacity(1);
+ /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(),
+ /// 2, //offset
+ /// WBUF,
+ /// 0, //priority
+ /// SigevNotify::SigevNone,
+ /// LioOpcode::LIO_WRITE));
+ /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
+ /// while err == Err(Error::Sys(Errno::EIO)) ||
+ /// err == Err(Error::Sys(Errno::EAGAIN)) {
+ /// thread::sleep(time::Duration::from_millis(10));
+ /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone);
+ /// }
+ /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
+ /// # }
+ /// ```
+ ///
+ /// # References
+ ///
+ /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html)
+ ///
+ /// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return
+ // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be
+ // changed by this method, because the kernel relies on their addresses
+ // being stable.
+ // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the
+ // sigev_notify will immediately refire.
+ pub fn listio_resubmit(&mut self, mode:LioMode,
+ sigev_notify: SigevNotify) -> Result<()> {
+ let sigev = SigEvent::new(sigev_notify);
+ let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
+ self.list.clear();
+
+ while self.results.len() < self.aiocbs.len() {
+ self.results.push(None);
+ }
+
+ for (i, a) in self.aiocbs.iter_mut().enumerate() {
+ if self.results[i].is_some() {
+ // Already collected final status for this operation
+ continue;
+ }
+ match a.error() {
+ Ok(()) => {
+ // aiocb is complete; collect its status and don't resubmit
+ self.results[i] = Some(a.aio_return());
+ },
+ Err(Error::Sys(Errno::EAGAIN)) => {
+ self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
+ },
+ Err(Error::Sys(Errno::EINPROGRESS)) => {
+ // aiocb is was successfully queued; no need to do anything
+ ()
+ },
+ Err(Error::Sys(Errno::EINVAL)) => panic!(
+ "AioCb was never submitted, or already finalized"),
+ _ => unreachable!()
+ }
+ }
+ let p = self.list.as_ptr();
+ Errno::result(unsafe {
+ libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
+ }).map(|_| ())
+ }
+
+ /// Collect final status for an individual `AioCb` submitted as part of an
+ /// `LioCb`.
+ ///
+ /// This is just like [`AioCb::aio_return`], except it takes into account
+ /// operations that were restarted by [`LioCb::listio_resubmit`]
+ ///
+ /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return
+ /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
+ pub fn aio_return(&mut self, i: usize) -> Result<isize> {
+ if i >= self.results.len() || self.results[i].is_none() {
+ self.aiocbs[i].aio_return()
+ } else {
+ self.results[i].unwrap()
+ }
+ }
+
+ /// Retrieve error status of an individual `AioCb` submitted as part of an
+ /// `LioCb`.
+ ///
+ /// This is just like [`AioCb::error`], except it takes into account
+ /// operations that were restarted by [`LioCb::listio_resubmit`]
+ ///
+ /// [`AioCb::error`]: struct.AioCb.html#method.error
+ /// [`LioCb::listio_resubmit`]: #method.listio_resubmit
+ pub fn error(&mut self, i: usize) -> Result<()> {
+ if i >= self.results.len() || self.results[i].is_none() {
+ self.aiocbs[i].error()
+ } else {
+ Ok(())
+ }
+ }
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
@@ -1146,6 +1279,7 @@ impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
LioCb {
list: Vec::with_capacity(src.capacity()),
+ results: Vec::with_capacity(src.capacity()),
aiocbs: src,
}
}