diff options
author | bors[bot] <bors[bot]@users.noreply.github.com> | 2018-04-07 04:33:36 +0000 |
---|---|---|
committer | bors[bot] <bors[bot]@users.noreply.github.com> | 2018-04-07 04:33:36 +0000 |
commit | 31e901b4bf36be5a4b27f28ae43105913c5d6245 (patch) | |
tree | 8d04f119124460ac48915ed7f3b87ccf7c67b67a /src/sys | |
parent | 2def43d3bcb4de42d4783e4c18413f7999ee5caf (diff) | |
parent | 4b769a42d5b55367e5908058e8ca59c3c474cacf (diff) | |
download | nix-31e901b4bf36be5a4b27f28ae43105913c5d6245.zip |
Merge #872
872: Change sys::aio::lio_listio to sys::aio::LioCb::listio r=asomers a=asomers
The new LioCb structure allows us to control the exact arguments passed
to lio_listio, guaranteeing that each call gets a unique storage
location for the list argument. This prevents clients from misusing
lio_listio in a way that causes events to get dropped from a kqueue
Fixes #870
Diffstat (limited to 'src/sys')
-rw-r--r-- | src/sys/aio.rs | 613 |
1 files changed, 426 insertions, 187 deletions
diff --git a/src/sys/aio.rs b/src/sys/aio.rs index fe8e9ed2..3d539821 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -1,3 +1,4 @@ +// vim: tw=80 //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like @@ -21,18 +22,18 @@ //! not support this for all filesystems and devices. use {Error, Result}; -use bytes::{Bytes, BytesMut}; use errno::Errno; use std::os::unix::io::RawFd; use libc::{c_void, off_t, size_t}; use libc; +use std::borrow::{Borrow, BorrowMut}; use std::fmt; use std::fmt::Debug; use std::marker::PhantomData; use std::mem; -use std::ops::Deref; use std::ptr::{null, null_mut}; use sys::signal::*; +use std::thread; use sys::time::TimeSpec; libc_enum! { @@ -92,46 +93,38 @@ pub enum AioCancelStat { /// Owns (uniquely or shared) a memory buffer to keep it from `Drop`ing while /// the kernel has a pointer to it. -#[derive(Clone, Debug)] pub enum Buffer<'a> { /// No buffer to own. /// /// Used for operations like `aio_fsync` that have no data, or for unsafe /// operations that work with raw pointers. None, - /// Immutable shared ownership `Bytes` object - // Must use out-of-line allocation so the address of the data will be - // stable. `Bytes` and `BytesMut` sometimes dynamically allocate a buffer, - // and sometimes inline the data within the struct itself. - Bytes(Bytes), - /// Mutable uniquely owned `BytesMut` object - BytesMut(BytesMut), /// Keeps a reference to a slice - Phantom(PhantomData<&'a mut [u8]>) + Phantom(PhantomData<&'a mut [u8]>), + /// Generic thing that keeps a buffer from dropping + BoxedSlice(Box<Borrow<[u8]>>), + /// Generic thing that keeps a mutable buffer from dropping + BoxedMutSlice(Box<BorrowMut<[u8]>>), } -impl<'a> Buffer<'a> { - /// Return the inner `Bytes`, if any - pub fn bytes(&self) -> Option<&Bytes> { - match *self { - Buffer::Bytes(ref x) => Some(x), - _ => None - } - } - - /// Return the inner `BytesMut`, if any - pub fn bytes_mut(&self) -> Option<&BytesMut> { - match *self { - Buffer::BytesMut(ref x) => Some(x), - _ => None - } - } - - /// Is this `Buffer` `None`? - pub fn is_none(&self) -> bool { +impl<'a> Debug for Buffer<'a> { + // Note: someday it may be possible to Derive Debug for a trait object, but + // not today. + // https://github.com/rust-lang/rust/issues/1563 + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match *self { - Buffer::None => true, - _ => false, + Buffer::None => write!(fmt, "None"), + Buffer::Phantom(p) => p.fmt(fmt), + Buffer::BoxedSlice(ref bs) => { + let borrowed : &Borrow<[u8]> = bs.borrow(); + write!(fmt, "BoxedSlice({:?})", + borrowed as *const Borrow<[u8]>) + }, + Buffer::BoxedMutSlice(ref bms) => { + let borrowed : &BorrowMut<[u8]> = bms.borrow(); + write!(fmt, "BoxedMutSlice({:?})", + borrowed as *const BorrowMut<[u8]>) + } } } } @@ -149,7 +142,7 @@ pub struct AioCb<'a> { /// Optionally keeps a reference to the data. /// /// Used to keep buffers from `Drop`'ing, and may be returned once the - /// `AioCb` is completed by `into_buffer`. + /// `AioCb` is completed by [`buffer`](#method.buffer). buffer: Buffer<'a> } @@ -165,6 +158,50 @@ impl<'a> AioCb<'a> { x } + /// Remove the inner boxed slice, if any, and return it. + /// + /// The returned value will be the argument that was passed to + /// `from_boxed_slice` when this `AioCb` was created. + /// + /// It is an error to call this method while the `AioCb` is still in + /// progress. + pub fn boxed_slice(&mut self) -> Option<Box<Borrow<[u8]>>> { + assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); + if let Buffer::BoxedSlice(_) = self.buffer { + let mut oldbuffer = Buffer::None; + mem::swap(&mut self.buffer, &mut oldbuffer); + if let Buffer::BoxedSlice(inner) = oldbuffer { + Some(inner) + } else { + unreachable!(); + } + } else { + None + } + } + + /// Remove the inner boxed mutable slice, if any, and return it. + /// + /// The returned value will be the argument that was passed to + /// `from_boxed_mut_slice` when this `AioCb` was created. + /// + /// It is an error to call this method while the `AioCb` is still in + /// progress. + pub fn boxed_mut_slice(&mut self) -> Option<Box<BorrowMut<[u8]>>> { + assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); + if let Buffer::BoxedMutSlice(_) = self.buffer { + let mut oldbuffer = Buffer::None; + mem::swap(&mut self.buffer, &mut oldbuffer); + if let Buffer::BoxedMutSlice(inner) = oldbuffer { + Some(inner) + } else { + unreachable!(); + } + } else { + None + } + } + /// Returns the underlying file descriptor associated with the `AioCb` pub fn fd(&self) -> RawFd { self.aiocb.aio_fildes @@ -186,7 +223,7 @@ impl<'a> AioCb<'a> { /// # Examples /// /// Create an `AioCb` from a raw file descriptor and use it for an - /// [`fsync`](#method.from_bytes_mut) operation. + /// [`fsync`](#method.fsync) operation. /// /// ``` /// # extern crate tempfile; @@ -229,7 +266,7 @@ impl<'a> AioCb<'a> { /// operations, but only if the borrow checker can guarantee that the slice /// will outlive the `AioCb`. That will usually be the case if the `AioCb` /// is stack-allocated. If the borrow checker gives you trouble, try using - /// [`from_bytes_mut`](#method.from_bytes_mut) instead. + /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead. /// /// # Parameters /// @@ -299,17 +336,19 @@ impl<'a> AioCb<'a> { } } - /// Constructs a new `AioCb` from a `Bytes` object. + /// The safest and most flexible way to create an `AioCb`. /// - /// Unlike `from_slice`, this method returns a structure suitable for + /// Unlike [`from_slice`], this method returns a structure suitable for /// placement on the heap. It may be used for write operations, but not - /// read operations. + /// read operations. Unlike `from_ptr`, this method will ensure that the + /// buffer doesn't `drop` while the kernel is still processing it. Any + /// object that can be borrowed as a boxed slice will work. /// /// # Parameters /// /// * `fd`: File descriptor. Required for all aio functions. /// * `offs`: File offset - /// * `buf`: A shared memory buffer + /// * `buf`: A boxed slice-like object /// * `prio`: If POSIX Prioritized IO is supported, then the /// operation will be prioritized at the process's /// priority level minus `prio` @@ -321,15 +360,13 @@ impl<'a> AioCb<'a> { /// /// # Examples /// - /// Create an `AioCb` from a `Bytes` object and use it for writing. + /// Create an `AioCb` from a Vector and use it for writing /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -337,11 +374,12 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { - /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let wbuf = Box::new(Vec::from("CDEF")); + /// let expected_len = wbuf.len(); /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), /// 2, //offset - /// wbuf.clone(), + /// wbuf, /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); @@ -349,72 +387,103 @@ impl<'a> AioCb<'a> { /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) { /// thread::sleep(time::Duration::from_millis(10)); /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); + /// assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len); /// # } /// ``` - pub fn from_bytes(fd: RawFd, offs: off_t, buf: Bytes, + /// + /// Create an `AioCb` from a `Bytes` object + /// + /// ``` + /// # extern crate bytes; + /// # extern crate tempfile; + /// # extern crate nix; + /// # use bytes::Bytes; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// # fn main() { + /// let wbuf = Box::new(Bytes::from(&b"CDEF"[..])); + /// let mut f = tempfile().unwrap(); + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), + /// 2, //offset + /// wbuf, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_NOP); + /// # } + /// ``` + /// + /// If a library needs to work with buffers that aren't `Box`ed, it can + /// create a `Box`ed container for use with this method. Here's an example + /// using an un`Box`ed `Bytes` object. + /// + /// ``` + /// # extern crate bytes; + /// # extern crate tempfile; + /// # extern crate nix; + /// # use bytes::Bytes; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::borrow::Borrow; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// struct BytesContainer(Bytes); + /// impl Borrow<[u8]> for BytesContainer { + /// fn borrow(&self) -> &[u8] { + /// self.0.as_ref() + /// } + /// } + /// fn main() { + /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let boxed_wbuf = Box::new(BytesContainer(wbuf)); + /// let mut f = tempfile().unwrap(); + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), + /// 2, //offset + /// boxed_wbuf, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_NOP); + /// } + /// ``` + /// + /// [`from_slice`]: #method.from_slice + pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box<Borrow<[u8]>>, prio: libc::c_int, sigev_notify: SigevNotify, opcode: LioOpcode) -> AioCb<'a> { - // Small BytesMuts are stored inline. Inline storage is a no-no, - // because we store a pointer to the buffer in the AioCb before - // returning the Buffer by move. If the buffer is too small, reallocate - // it to force out-of-line storage - // TODO: Add an is_inline() method to BytesMut, and a way to explicitly - // force out-of-line allocation. - let buf2 = if buf.len() < 64 { - // Reallocate to force out-of-line allocation - let mut ool = Bytes::with_capacity(64); - ool.extend_from_slice(buf.deref()); - ool - } else { - buf - }; let mut a = AioCb::common_init(fd, prio, sigev_notify); + { + let borrowed : &Borrow<[u8]> = buf.borrow(); + let slice : &[u8] = borrowed.borrow(); + a.aio_nbytes = slice.len() as size_t; + a.aio_buf = slice.as_ptr() as *mut c_void; + } a.aio_offset = offs; - a.aio_nbytes = buf2.len() as size_t; - a.aio_buf = buf2.as_ptr() as *mut c_void; a.aio_lio_opcode = opcode as libc::c_int; AioCb { aiocb: a, mutable: false, in_progress: false, - buffer: Buffer::Bytes(buf2), + buffer: Buffer::BoxedSlice(buf), } } - /// Constructs a new `AioCb` from a `BytesMut` object. + /// The safest and most flexible way to create an `AioCb` for reading. /// - /// Unlike `from_mut_slice`, this method returns a structure suitable for - /// placement on the heap. It may be used for both reads and writes. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: An owned memory buffer - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb + /// Like [`from_boxed_slice`], but the slice is a mutable one. More + /// flexible than [`from_mut_slice`], because a wide range of objects can be + /// used. /// /// # Examples /// - /// Create an `AioCb` from a `BytesMut` and use it for reading. In this - /// example the `AioCb` is stack-allocated, so we could've used - /// `from_mut_slice` instead. + /// Create an `AioCb` from a Vector and use it for reading /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::BytesMut; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -424,10 +493,10 @@ impl<'a> AioCb<'a> { /// # fn main() { /// const INITIAL: &[u8] = b"abcdef123456"; /// const LEN: usize = 4; - /// let rbuf = BytesMut::from(vec![0; LEN]); + /// let rbuf = Box::new(vec![0; LEN]); /// let mut f = tempfile().unwrap(); /// f.write_all(INITIAL).unwrap(); - /// let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(), /// 2, //offset /// rbuf, /// 0, //priority @@ -438,33 +507,33 @@ impl<'a> AioCb<'a> { /// thread::sleep(time::Duration::from_millis(10)); /// } /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN); - /// let buffer = aiocb.into_buffer(); + /// let mut buffer = aiocb.boxed_mut_slice().unwrap(); /// const EXPECT: &[u8] = b"cdef"; - /// assert_eq!(buffer.bytes_mut().unwrap(), EXPECT); + /// assert_eq!(buffer.borrow_mut(), EXPECT); /// # } /// ``` - pub fn from_bytes_mut(fd: RawFd, offs: off_t, buf: BytesMut, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { - let mut buf2 = if buf.len() < 64 { - // Reallocate to force out-of-line allocation - let mut ool = BytesMut::with_capacity(64); - ool.extend_from_slice(buf.deref()); - ool - } else { - buf - }; + /// + /// [`from_boxed_slice`]: #method.from_boxed_slice + /// [`from_mut_slice`]: #method.from_mut_slice + pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t, + mut buf: Box<BorrowMut<[u8]>>, + prio: libc::c_int, sigev_notify: SigevNotify, + opcode: LioOpcode) -> AioCb<'a> { let mut a = AioCb::common_init(fd, prio, sigev_notify); + { + let borrowed : &mut BorrowMut<[u8]> = buf.borrow_mut(); + let slice : &mut [u8] = borrowed.borrow_mut(); + a.aio_nbytes = slice.len() as size_t; + a.aio_buf = slice.as_mut_ptr() as *mut c_void; + } a.aio_offset = offs; - a.aio_nbytes = buf2.len() as size_t; - a.aio_buf = buf2.as_mut_ptr() as *mut c_void; a.aio_lio_opcode = opcode as libc::c_int; AioCb { aiocb: a, mutable: true, in_progress: false, - buffer: Buffer::BytesMut(buf2), + buffer: Buffer::BoxedMutSlice(buf), } } @@ -474,7 +543,7 @@ impl<'a> AioCb<'a> { /// placement on the heap. It may be used for both reads and writes. Due /// to its unsafety, this method is not recommended. It is most useful when /// heap allocation is required but for some reason the data cannot be - /// converted to a `BytesMut`. + /// wrapped in a `struct` that implements `BorrowMut<[u8]>` /// /// # Parameters /// @@ -518,7 +587,8 @@ impl<'a> AioCb<'a> { /// Unlike `from_slice`, this method returns a structure suitable for /// placement on the heap. Due to its unsafety, this method is not /// recommended. It is most useful when heap allocation is required but for - /// some reason the data cannot be converted to a `Bytes`. + /// some reason the data cannot be wrapped in a `struct` that implements + /// `Borrow<[u8]>` /// /// # Parameters /// @@ -623,19 +693,6 @@ impl<'a> AioCb<'a> { } } - /// Consumes the `aiocb` and returns its inner `Buffer`, if any. - /// - /// This method is especially useful when reading into a `BytesMut`, because - /// that type does not support shared ownership. - pub fn into_buffer(mut self) -> Buffer<'static> { - let buf = self.buffer(); - match buf { - Buffer::BytesMut(x) => Buffer::BytesMut(x), - Buffer::Bytes(x) => Buffer::Bytes(x), - _ => Buffer::None - } - } - fn common_init(fd: RawFd, prio: libc::c_int, sigev_notify: SigevNotify) -> libc::aiocb { // Use mem::zeroed instead of explicitly zeroing each field, because the @@ -669,12 +726,10 @@ impl<'a> AioCb<'a> { /// result. /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -682,11 +737,11 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { - /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), /// 2, //offset - /// wbuf.clone(), + /// &wbuf[..], /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); @@ -870,12 +925,10 @@ impl<'a> AioCb<'a> { /// descriptor. /// /// ``` -/// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; -/// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -883,11 +936,11 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { -/// let wbuf = Bytes::from(&b"CDEF"[..]); +/// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), +/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), /// 2, //offset -/// wbuf.clone(), +/// &wbuf[..], /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); @@ -965,63 +1018,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> { }).map(drop) } - -/// Submits multiple asynchronous I/O requests with a single system call. -/// -/// They are not guaranteed to complete atomically, and the order in which the -/// requests are carried out is not specified. Reads, writes, and fsyncs may be -/// freely mixed. -/// -/// This function is useful for reducing the context-switch overhead of -/// submitting many AIO operations. It can also be used with -/// `LioMode::LIO_WAIT` to block on the result of several independent -/// operations. Used that way, it is often useful in programs that otherwise -/// make little use of AIO. -/// -/// # Examples -/// -/// Use `lio_listio` to submit an aio operation and wait for its completion. In -/// this case, there is no need to use `aio_suspend` to wait or `AioCb#error` to -/// poll. -/// -/// ``` -/// # extern crate tempfile; -/// # extern crate nix; -/// # use nix::sys::aio::*; -/// # use nix::sys::signal::SigevNotify; -/// # use std::os::unix::io::AsRawFd; -/// # use tempfile::tempfile; -/// # fn main() { -/// const WBUF: &[u8] = b"abcdef123456"; -/// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), -/// 2, //offset -/// WBUF, -/// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_WRITE); -/// lio_listio(LioMode::LIO_WAIT, -/// &[&mut aiocb], -/// SigevNotify::SigevNone).unwrap(); -/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); -/// # } -/// ``` -/// -/// # References -/// -/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -pub fn lio_listio(mode: LioMode, list: &[&mut AioCb], - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - let plist = list as *const [&mut AioCb] as *const [*mut libc::aiocb]; - let p = plist as *const *mut libc::aiocb; - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, list.len() as i32, sigevp) - }).map(drop) -} - impl<'a> Debug for AioCb<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("AioCb") @@ -1042,6 +1038,249 @@ impl<'a> Drop for AioCb<'a> { /// If the `AioCb` has no remaining state in the kernel, just drop it. /// Otherwise, dropping constitutes a resource leak, which is an error fn drop(&mut self) { - assert!(!self.in_progress, "Dropped an in-progress AioCb"); + assert!(thread::panicking() || !self.in_progress, + "Dropped an in-progress AioCb"); + } +} + +/// LIO Control Block. +/// +/// The basic structure used to issue multiple AIO operations simultaneously. +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +pub struct LioCb<'a> { + /// A collection of [`AioCb`]s. All of these will be issued simultaneously + /// by the [`listio`] method. + /// + /// [`AioCb`]: struct.AioCb.html + /// [`listio`]: #method.listio + pub aiocbs: Vec<AioCb<'a>>, + + /// The actual list passed to `libc::lio_listio`. + /// + /// It must live for as long as any of the operations are still being + /// processesed, because the aio subsystem uses its address as a unique + /// identifier. + list: Vec<*mut libc::aiocb>, + + /// A partial set of results. This field will get populated by + /// `listio_resubmit` when an `LioCb` is resubmitted after an error + results: Vec<Option<Result<isize>>> +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> LioCb<'a> { + /// Initialize an empty `LioCb` + pub fn with_capacity(capacity: usize) -> LioCb<'a> { + LioCb { + aiocbs: Vec::with_capacity(capacity), + list: Vec::with_capacity(capacity), + results: Vec::with_capacity(capacity) + } + } + + /// Submits multiple asynchronous I/O requests with a single system call. + /// + /// They are not guaranteed to complete atomically, and the order in which + /// the requests are carried out is not specified. Reads, writes, and + /// fsyncs may be freely mixed. + /// + /// This function is useful for reducing the context-switch overhead of + /// submitting many AIO operations. It can also be used with + /// `LioMode::LIO_WAIT` to block on the result of several independent + /// operations. Used that way, it is often useful in programs that + /// otherwise make little use of AIO. + /// + /// # Examples + /// + /// Use `listio` to submit an aio operation and wait for its completion. In + /// this case, there is no need to use [`aio_suspend`] to wait or + /// [`AioCb::error`] to poll. + /// + /// ``` + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// liocb.listio(LioMode::LIO_WAIT, + /// SigevNotify::SigevNone).unwrap(); + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`aio_suspend`]: fn.aio_suspend.html + /// [`AioCb::error`]: struct.AioCb.html#method.error + pub fn listio(&mut self, mode: LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + for a in &mut self.aiocbs { + a.in_progress = true; + self.list.push(a as *mut AioCb<'a> + as *mut libc::aiocb); + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } + + /// Resubmits any incomplete operations with [`lio_listio`]. + /// + /// Sometimes, due to system resource limitations, an `lio_listio` call will + /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return + /// `EINTR`. In any of these cases, only a subset of its constituent + /// operations will actually have been initiated. `listio_resubmit` will + /// resubmit any operations that are still uninitiated. + /// + /// After calling `listio_resubmit`, results should be collected by + /// [`LioCb::aio_return`]. + /// + /// # Examples + /// ```no_run + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::Error; + /// # use nix::errno::Errno; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use std::{thread, time}; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// while err == Err(Error::Sys(Errno::EIO)) || + /// err == Err(Error::Sys(Errno::EAGAIN)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// } + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return + // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be + // changed by this method, because the kernel relies on their addresses + // being stable. + // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the + // sigev_notify will immediately refire. + pub fn listio_resubmit(&mut self, mode:LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + + while self.results.len() < self.aiocbs.len() { + self.results.push(None); + } + + for (i, a) in self.aiocbs.iter_mut().enumerate() { + if self.results[i].is_some() { + // Already collected final status for this operation + continue; + } + match a.error() { + Ok(()) => { + // aiocb is complete; collect its status and don't resubmit + self.results[i] = Some(a.aio_return()); + }, + Err(Error::Sys(Errno::EAGAIN)) => { + self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); + }, + Err(Error::Sys(Errno::EINPROGRESS)) => { + // aiocb is was successfully queued; no need to do anything + () + }, + Err(Error::Sys(Errno::EINVAL)) => panic!( + "AioCb was never submitted, or already finalized"), + _ => unreachable!() + } + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } + + /// Collect final status for an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::aio_return`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn aio_return(&mut self, i: usize) -> Result<isize> { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].aio_return() + } else { + self.results[i].unwrap() + } + } + + /// Retrieve error status of an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::error`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::error`]: struct.AioCb.html#method.error + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn error(&mut self, i: usize) -> Result<()> { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].error() + } else { + Ok(()) + } + } +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> Debug for LioCb<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("LioCb") + .field("aiocbs", &self.aiocbs) + .finish() + } +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> { + fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> { + LioCb { + list: Vec::with_capacity(src.capacity()), + results: Vec::with_capacity(src.capacity()), + aiocbs: src, + } } } |