From 4b769a42d5b55367e5908058e8ca59c3c474cacf Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Thu, 22 Mar 2018 09:44:54 -0600 Subject: Add LioCb::listio_resubmit It helps deal with errors like EAGAIN, which can result in a subset of an LioCb's operations being queued. The test is only enabled on FreeBSD, because it requires intimate knowledge of AIO system limits. --- src/sys/aio.rs | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 147 insertions(+), 13 deletions(-) (limited to 'src/sys') diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 8a958c84..3d539821 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -1,3 +1,4 @@ +// vim: tw=80 //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like @@ -31,8 +32,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::mem; use std::ptr::{null, null_mut}; -use std::thread; use sys::signal::*; +use std::thread; use sys::time::TimeSpec; libc_enum! { @@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> { // not today. // https://github.com/rust-lang/rust/issues/1563 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - &Buffer::None => write!(fmt, "None"), - &Buffer::Phantom(p) => p.fmt(fmt), - &Buffer::BoxedSlice(ref bs) => { + match *self { + Buffer::None => write!(fmt, "None"), + Buffer::Phantom(p) => p.fmt(fmt), + Buffer::BoxedSlice(ref bs) => { let borrowed : &Borrow<[u8]> = bs.borrow(); write!(fmt, "BoxedSlice({:?})", borrowed as *const Borrow<[u8]>) }, - &Buffer::BoxedMutSlice(ref bms) => { + Buffer::BoxedMutSlice(ref bms) => { let borrowed : &BorrowMut<[u8]> = bms.borrow(); write!(fmt, "BoxedMutSlice({:?})", borrowed as *const BorrowMut<[u8]>) @@ -265,7 +266,7 @@ impl<'a> AioCb<'a> { /// operations, but only if the borrow checker can guarantee that the slice /// will outlive the `AioCb`. That will usually be the case if the `AioCb` /// is stack-allocated. If the borrow checker gives you trouble, try using - /// [`from_bytes_mut`](#method.from_bytes_mut) instead. + /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead. /// /// # Parameters /// @@ -1059,7 +1060,11 @@ pub struct LioCb<'a> { /// It must live for as long as any of the operations are still being /// processesed, because the aio subsystem uses its address as a unique /// identifier. - list: Vec<*mut libc::aiocb> + list: Vec<*mut libc::aiocb>, + + /// A partial set of results. This field will get populated by + /// `listio_resubmit` when an `LioCb` is resubmitted after an error + results: Vec>> } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> { pub fn with_capacity(capacity: usize) -> LioCb<'a> { LioCb { aiocbs: Vec::with_capacity(capacity), - list: Vec::with_capacity(capacity) + list: Vec::with_capacity(capacity), + results: Vec::with_capacity(capacity) } } @@ -1087,8 +1093,8 @@ impl<'a> LioCb<'a> { /// # Examples /// /// Use `listio` to submit an aio operation and wait for its completion. In - /// this case, there is no need to use `aio_suspend` to wait or - /// `AioCb#error` to poll. + /// this case, there is no need to use [`aio_suspend`] to wait or + /// [`AioCb::error`] to poll. /// /// ``` /// # extern crate tempfile; @@ -1109,19 +1115,23 @@ impl<'a> LioCb<'a> { /// LioOpcode::LIO_WRITE)); /// liocb.listio(LioMode::LIO_WAIT, /// SigevNotify::SigevNone).unwrap(); - /// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len()); + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); /// # } /// ``` /// /// # References /// /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`aio_suspend`]: fn.aio_suspend.html + /// [`AioCb::error`]: struct.AioCb.html#method.error pub fn listio(&mut self, mode: LioMode, sigev_notify: SigevNotify) -> Result<()> { let sigev = SigEvent::new(sigev_notify); let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; self.list.clear(); - for a in self.aiocbs.iter_mut() { + for a in &mut self.aiocbs { + a.in_progress = true; self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); } @@ -1130,6 +1140,129 @@ impl<'a> LioCb<'a> { libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) }).map(|_| ()) } + + /// Resubmits any incomplete operations with [`lio_listio`]. + /// + /// Sometimes, due to system resource limitations, an `lio_listio` call will + /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return + /// `EINTR`. In any of these cases, only a subset of its constituent + /// operations will actually have been initiated. `listio_resubmit` will + /// resubmit any operations that are still uninitiated. + /// + /// After calling `listio_resubmit`, results should be collected by + /// [`LioCb::aio_return`]. + /// + /// # Examples + /// ```no_run + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::Error; + /// # use nix::errno::Errno; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use std::{thread, time}; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// while err == Err(Error::Sys(Errno::EIO)) || + /// err == Err(Error::Sys(Errno::EAGAIN)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// } + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return + // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be + // changed by this method, because the kernel relies on their addresses + // being stable. + // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the + // sigev_notify will immediately refire. + pub fn listio_resubmit(&mut self, mode:LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + + while self.results.len() < self.aiocbs.len() { + self.results.push(None); + } + + for (i, a) in self.aiocbs.iter_mut().enumerate() { + if self.results[i].is_some() { + // Already collected final status for this operation + continue; + } + match a.error() { + Ok(()) => { + // aiocb is complete; collect its status and don't resubmit + self.results[i] = Some(a.aio_return()); + }, + Err(Error::Sys(Errno::EAGAIN)) => { + self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); + }, + Err(Error::Sys(Errno::EINPROGRESS)) => { + // aiocb is was successfully queued; no need to do anything + () + }, + Err(Error::Sys(Errno::EINVAL)) => panic!( + "AioCb was never submitted, or already finalized"), + _ => unreachable!() + } + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } + + /// Collect final status for an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::aio_return`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn aio_return(&mut self, i: usize) -> Result { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].aio_return() + } else { + self.results[i].unwrap() + } + } + + /// Retrieve error status of an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::error`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::error`]: struct.AioCb.html#method.error + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn error(&mut self, i: usize) -> Result<()> { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].error() + } else { + Ok(()) + } + } } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1146,6 +1279,7 @@ impl<'a> From>> for LioCb<'a> { fn from(src: Vec>) -> LioCb<'a> { LioCb { list: Vec::with_capacity(src.capacity()), + results: Vec::with_capacity(src.capacity()), aiocbs: src, } } -- cgit v1.2.3