diff options
author | Yehuda Katz <wycats@gmail.com> | 2014-10-10 16:16:45 -0700 |
---|---|---|
committer | Yehuda Katz <wycats@gmail.com> | 2014-10-10 16:16:45 -0700 |
commit | c4ff023569ad58f31e4c3eab1ab646afd8cce66a (patch) | |
tree | e2719b6ec28bb15be9f02e4f876311a5681dd53a | |
parent | 7e173143af1d82c6213739480967f6749477214a (diff) | |
parent | 28caf48e6fd199eb13b54239d67cf609964aa83e (diff) | |
download | nix-c4ff023569ad58f31e4c3eab1ab646afd8cce66a.zip |
Merge pull request #8 from little-arhat/feature-scatter-gather
Implement scatter/gather IO: writev & readv.
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/unistd.rs | 82 | ||||
-rw-r--r-- | tests/unistd.rs | 101 |
3 files changed, 183 insertions, 1 deletions
@@ -6,6 +6,7 @@ #![allow(non_camel_case_types)] extern crate libc; +extern crate core; // Re-export some libc constants pub use libc::{c_int, c_void}; diff --git a/src/unistd.rs b/src/unistd.rs index 4f3140dd..7abaf410 100644 --- a/src/unistd.rs +++ b/src/unistd.rs @@ -3,14 +3,17 @@ use std::c_str::{CString, ToCStr}; use libc::{c_char, c_void, c_int, size_t, pid_t}; use fcntl::{fcntl, Fd, OFlag, O_NONBLOCK, O_CLOEXEC, FD_CLOEXEC, F_SETFD, F_SETFL}; use errno::{SysResult, SysError, from_ffi}; +use core::raw::Slice as RawSlice; #[cfg(target_os = "linux")] pub use self::linux::*; mod ffi { - use libc::{c_char, c_int, size_t}; + use super::{IovecR,IovecW}; + use libc::{c_char, c_int, size_t, ssize_t}; pub use libc::{close, read, write, pipe}; pub use libc::funcs::posix88::unistd::fork; + use fcntl::Fd; extern { // duplicate a file descriptor @@ -37,6 +40,14 @@ mod ffi { // gets the hostname // doc: http://man7.org/linux/man-pages/man2/gethostname.2.html pub fn sethostname(name: *const c_char, len: size_t) -> c_int; + + // vectorized version of write + // doc: http://man7.org/linux/man-pages/man2/writev.2.html + pub fn writev(fd: Fd, iov: *const IovecW, iovcnt: c_int) -> ssize_t; + + // vectorized version of read + // doc: http://man7.org/linux/man-pages/man2/readv.2.html + pub fn readv(fd: Fd, iov: *const IovecR, iovcnt: c_int) -> ssize_t; } } @@ -73,6 +84,56 @@ pub fn fork() -> SysResult<Fork> { } } +// We use phantom types to maintain memory safety. +// If readv/writev were using simple &[Iovec] we could initialize +// Iovec with immutable slice and then pass it to readv, overwriting content +// we dont have write access to: +// let mut v = Vec::new(); +// let iov = Iovec::from_slice(immutable_vec.as_slice()); +// v.push(iov); +// let _:SysResult<uint> = readv(fd, v.as_slice()); + +// We do not want <T> to appear in ffi functions, so we provide this aliases. +type IovecR = Iovec<ToRead>; +type IovecW = Iovec<ToWrite>; + +pub struct ToRead; +pub struct ToWrite; + +#[repr(C)] +pub struct Iovec<T> { + iov_base: *mut c_void, + iov_len: size_t, +} + +impl <T> Iovec<T> { + #[inline] + pub fn as_slice<'a>(&'a self) -> &'a [u8] { + unsafe { mem::transmute(RawSlice { data: self.iov_base as *const u8, len: self.iov_len as uint }) } + } +} + +impl Iovec<ToWrite> { + #[inline] + pub fn from_slice(buf: &[u8]) -> Iovec<ToWrite> { + Iovec { + iov_base: buf.as_ptr() as *mut c_void, + iov_len: buf.len() as size_t + } + } +} + +impl Iovec<ToRead> { + #[inline] + pub fn from_mut_slice(buf: &mut [u8]) -> Iovec<ToRead> { + Iovec { + iov_base: buf.as_ptr() as *mut c_void, + iov_len: buf.len() as size_t + } + } +} + + #[inline] pub fn dup(oldfd: Fd) -> SysResult<Fd> { let res = unsafe { ffi::dup(oldfd) }; @@ -222,6 +283,25 @@ pub fn write(fd: Fd, buf: &[u8]) -> SysResult<uint> { return Ok(res as uint) } +pub fn writev(fd: Fd, iov: &[Iovec<ToWrite>]) -> SysResult<uint> { + let res = unsafe { ffi::writev(fd, iov.as_ptr(), iov.len() as c_int) }; + if res < 0 { + return Err(SysError::last()); + } + + return Ok(res as uint) +} + +pub fn readv(fd: Fd, iov: &mut [Iovec<ToRead>]) -> SysResult<uint> { + let res = unsafe { ffi::readv(fd, iov.as_ptr(), iov.len() as c_int) }; + if res < 0 { + return Err(SysError::last()); + } + + return Ok(res as uint) +} + + pub fn pipe() -> SysResult<(Fd, Fd)> { unsafe { let mut res; diff --git a/tests/unistd.rs b/tests/unistd.rs new file mode 100644 index 00000000..ba43246c --- /dev/null +++ b/tests/unistd.rs @@ -0,0 +1,101 @@ + +extern crate nix; +extern crate native; + +#[cfg(test)] +mod test { + use nix::unistd::{writev, readv, Iovec, pipe, close}; + use native::io::FileDesc; + use std::rt::rtio::RtioFileStream; + use std::rand::{task_rng, Rng}; + use std::cmp::min; + + #[test] + fn test_writev() { + let mut to_write = Vec::with_capacity(16 * 128); + for _ in range(0u, 16) { + let s:String = task_rng().gen_ascii_chars().take(128).collect(); + let b = s.as_bytes(); + to_write.extend(b.iter().map(|x| x.clone())); + } + // Allocate and fill iovecs + let mut iovecs = Vec::new(); + let mut consumed = 0; + while consumed < to_write.len() { + let left = to_write.len() - consumed; + let slice_len = if left < 64 { left } else { task_rng().gen_range(64, min(256, left)) }; + let b = to_write.slice(consumed, consumed + slice_len); + iovecs.push(Iovec::from_slice(b)); + consumed += slice_len; + } + let pipe_res = pipe(); + assert!(pipe_res.is_ok()); + let (reader, writer) = pipe_res.ok().unwrap(); + // FileDesc will close its filedesc (reader). + let mut reader = FileDesc::new(reader, true); + let mut read_buf = Vec::from_elem(128 * 16, 0u8); + // Blocking io, should write all data. + let write_res = writev(writer, iovecs.as_slice()); + // Successful write + assert!(write_res.is_ok()); + let written = write_res.ok().unwrap(); + // Check whether we written all data + assert_eq!(to_write.len(), written); + let read_res = reader.read(read_buf.as_mut_slice()); + // Successful read + assert!(read_res.is_ok()); + let read = read_res.ok().unwrap() as uint; + // Check we have read as much as we written + assert_eq!(read, written); + // Check equality of written and read data + assert_eq!(to_write.as_slice(), read_buf.as_slice()); + let close_res = close(writer); + assert!(close_res.is_ok()); + } + + #[test] + fn test_readv() { + let s:String = task_rng().gen_ascii_chars().take(128).collect(); + let to_write = s.as_bytes().to_vec(); + let mut storage = Vec::new(); + let mut allocated = 0; + while allocated < to_write.len() { + let left = to_write.len() - allocated; + let vec_len = if left < 64 { left } else { task_rng().gen_range(64, min(256, left)) }; + let v = Vec::from_elem(vec_len, 0u8); + storage.push(v); + allocated += vec_len; + } + let mut iovecs = Vec::with_capacity(storage.len()); + for v in storage.iter_mut() { + iovecs.push(Iovec::from_mut_slice(v.as_mut_slice())); + } + let pipe_res = pipe(); + assert!(pipe_res.is_ok()); + let (reader, writer) = pipe_res.ok().unwrap(); + // FileDesc will close its filedesc (writer). + let mut writer = FileDesc::new(writer, true); + // Blocking io, should write all data. + let write_res = writer.write(to_write.as_slice()); + // Successful write + assert!(write_res.is_ok()); + let read_res = readv(reader, iovecs.as_mut_slice()); + assert!(read_res.is_ok()); + let read = read_res.ok().unwrap(); + // Check whether we've read all data + assert_eq!(to_write.len(), read); + // Cccumulate data from iovecs + let mut read_buf = Vec::with_capacity(to_write.len()); + for iovec in iovecs.iter() { + read_buf.extend(iovec.as_slice().iter().map(|x| x.clone())); + } + // Check whether iovecs contain all written data + assert_eq!(read_buf.len(), to_write.len()); + // Check equality of written and read data + assert_eq!(read_buf.as_slice(), to_write.as_slice()); + let close_res = close(reader); + assert!(close_res.is_ok()); + } + + +} |