From d3f9b96adc81bb862770f330466d2f2e8d9ab1bc Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Sun, 18 Sep 2016 21:17:41 -0600 Subject: Add POSIX AIO support POSIX AIO is a standard for asynchronous file I/O. Read, write, and fsync operations can all take place in the background, with completion notification delivered by a signal, by a new thread, by kqueue, or not at all. This commit supports all standard AIO functions. However, lio_listio is disabled on macos because it doesn't seem to work, even though the syscall is present. The SigEvent class, used for AIO notifications among other things, is also added. Also, impl AsRef for TimeVal and TimeSpec --- test/sys/test_aio.rs | 375 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100644 test/sys/test_aio.rs (limited to 'test/sys/test_aio.rs') diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs new file mode 100644 index 00000000..4659fd50 --- /dev/null +++ b/test/sys/test_aio.rs @@ -0,0 +1,375 @@ +use libc::c_int; +use nix::{Error, Result}; +use nix::errno::*; +use nix::sys::aio::*; +use nix::sys::signal::*; +use nix::sys::time::{TimeSpec, TimeValLike}; +use std::io::{Write, Read, Seek, SeekFrom}; +use std::os::unix::io::AsRawFd; +use std::{thread, time}; +use tempfile::tempfile; + +// Helper that polls an AioCb for completion or error +fn poll_aio(mut aiocb: &mut AioCb) -> Result<()> { + loop { + let err = aio_error(&mut aiocb); + if err != Err(Error::from(Errno::EINPROGRESS)) { return err; }; + thread::sleep(time::Duration::from_millis(10)); + } +} + +// Tests aio_cancel. We aren't trying to test the OS's implementation, only our +// bindings. So it's sufficient to check that aio_cancel returned any +// AioCancelStat value. +#[test] +fn test_aio_cancel() { + let mut wbuf = "CDEF".to_string().into_bytes(); + + let f = tempfile().unwrap(); + let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), + 0, //offset + &mut wbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_NOP); + aio_write(&mut aiocb).unwrap(); + let err = aio_error(&mut aiocb); + assert!(err == Ok(()) || err == Err(Error::from(Errno::EINPROGRESS))); + + let cancelstat = aio_cancel(f.as_raw_fd(), Some(&mut aiocb)); + assert!(cancelstat.is_ok()); + + // Wait for aiocb to complete, but don't care whether it succeeded + let _ = poll_aio(&mut aiocb); + let _ = aio_return(&mut aiocb); +} + +// Tests using aio_cancel for all outstanding IOs. +#[test] +fn test_aio_cancel_all() { + let mut wbuf = "CDEF".to_string().into_bytes(); + + let f = tempfile().unwrap(); + let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), + 0, //offset + &mut wbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_NOP); + aio_write(&mut aiocb).unwrap(); + let err = aio_error(&mut aiocb); + assert!(err == Ok(()) || err == Err(Error::from(Errno::EINPROGRESS))); + + let cancelstat = aio_cancel(f.as_raw_fd(), None); + assert!(cancelstat.is_ok()); + + // Wait for aiocb to complete, but don't care whether it succeeded + let _ = poll_aio(&mut aiocb); + let _ = aio_return(&mut aiocb); +} + +#[test] +fn test_aio_fsync() { + const INITIAL: &'static [u8] = b"abcdef123456"; + let mut f = tempfile().unwrap(); + f.write(INITIAL).unwrap(); + let mut aiocb = AioCb::from_fd( f.as_raw_fd(), + 0, //priority + SigevNotify::SigevNone); + let err = aio_fsync(AioFsyncMode::O_SYNC, &mut aiocb); + assert!(err.is_ok()); + poll_aio(&mut aiocb).unwrap(); + aio_return(&mut aiocb).unwrap(); +} + + +#[test] +fn test_aio_suspend() { + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; + let timeout = TimeSpec::seconds(10); + let mut rbuf = vec![0; 4]; + let mut f = tempfile().unwrap(); + f.write(INITIAL).unwrap(); + + let mut wcb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &mut WBUF, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_WRITE) + }; + + let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_READ); + aio_write(&mut wcb).unwrap(); + aio_read(&mut rcb).unwrap(); + loop { + { + let cbbuf = [&wcb, &rcb]; + assert!(aio_suspend(&cbbuf[..], Some(timeout)).is_ok()); + } + if aio_error(&mut rcb) != Err(Error::from(Errno::EINPROGRESS)) && + aio_error(&mut wcb) != Err(Error::from(Errno::EINPROGRESS)) { + break + } + } + + assert!(aio_return(&mut wcb).unwrap() as usize == WBUF.len()); + assert!(aio_return(&mut rcb).unwrap() as usize == rbuf.len()); +} + +// Test a simple aio operation with no completion notification. We must poll +// for completion +#[test] +fn test_aio_read() { + const INITIAL: &'static [u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + const EXPECT: &'static [u8] = b"cdef"; + let mut f = tempfile().unwrap(); + f.write(INITIAL).unwrap(); + let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), + 2, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_NOP); + aio_read(&mut aiocb).unwrap(); + + let err = poll_aio(&mut aiocb); + assert!(err == Ok(())); + assert!(aio_return(&mut aiocb).unwrap() as usize == rbuf.len()); + + assert!(rbuf == EXPECT); +} + +// Test a simple aio operation with no completion notification. We must poll +// for completion. Unlike test_aio_read, this test uses AioCb::from_slice +#[test] +fn test_aio_write() { + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; //"CDEF".to_string().into_bytes(); + let mut rbuf = Vec::new(); + const EXPECT: &'static [u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write(INITIAL).unwrap(); + let mut aiocb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &WBUF, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_NOP) + }; + aio_write(&mut aiocb).unwrap(); + + let err = poll_aio(&mut aiocb); + assert!(err == Ok(())); + assert!(aio_return(&mut aiocb).unwrap() as usize == WBUF.len()); + + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert!(len == EXPECT.len()); + assert!(rbuf == EXPECT); +} + +// XXX: should be sig_atomic_t, but rust's libc doesn't define that yet +static mut signaled: i32 = 0; + +extern fn sigfunc(_: c_int) { + // It's a pity that Rust can't understand that static mutable sig_atomic_t + // variables can be safely accessed + unsafe { signaled = 1 }; +} + +// Test an aio operation with completion delivered by a signal +#[test] +fn test_aio_write_sigev_signal() { + let sa = SigAction::new(SigHandler::Handler(sigfunc), + SA_RESETHAND, + SigSet::empty()); + unsafe {signaled = 0 }; + unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); + + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; + let mut rbuf = Vec::new(); + const EXPECT: &'static [u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write(INITIAL).unwrap(); + let mut aiocb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &WBUF, + 0, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 0 //TODO: validate in sigfunc + }, + LioOpcode::LIO_NOP) + }; + aio_write(&mut aiocb).unwrap(); + while unsafe { signaled == 0 } { + thread::sleep(time::Duration::from_millis(10)); + } + + assert!(aio_return(&mut aiocb).unwrap() as usize == WBUF.len()); + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert!(len == EXPECT.len()); + assert!(rbuf == EXPECT); +} + +// Test lio_listio with LIO_WAIT, so all AIO ops should be complete by the time +// lio_listio returns. +#[test] +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +fn test_lio_listio_wait() { + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; + let mut rbuf = vec![0; 4]; + let mut rbuf2 = Vec::new(); + const EXPECT: &'static [u8] = b"abCDEF123456"; + let mut f = tempfile().unwrap(); + + f.write(INITIAL).unwrap(); + + let mut wcb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &WBUF, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_WRITE) + }; + + let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_READ); + { + let cbbuf = [&mut wcb, &mut rcb]; + let err = lio_listio(LioMode::LIO_WAIT, &cbbuf[..], SigevNotify::SigevNone); + err.expect("lio_listio failed"); + } + + assert!(aio_return(&mut wcb).unwrap() as usize == WBUF.len()); + assert!(aio_return(&mut rcb).unwrap() as usize == rbuf.len()); + assert!(rbuf == b"3456"); + + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf2).unwrap(); + assert!(len == EXPECT.len()); + assert!(rbuf2 == EXPECT); +} + +// Test lio_listio with LIO_NOWAIT and no SigEvent, so we must use some other +// mechanism to check for the individual AioCb's completion. +#[test] +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +fn test_lio_listio_nowait() { + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; + let mut rbuf = vec![0; 4]; + let mut rbuf2 = Vec::new(); + const EXPECT: &'static [u8] = b"abCDEF123456"; + let mut f = tempfile().unwrap(); + + f.write(INITIAL).unwrap(); + + let mut wcb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &WBUF, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_WRITE) + }; + + let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_READ); + { + let cbbuf = [&mut wcb, &mut rcb]; + let err = lio_listio(LioMode::LIO_NOWAIT, &cbbuf[..], SigevNotify::SigevNone); + err.expect("lio_listio failed"); + } + + poll_aio(&mut wcb).unwrap(); + poll_aio(&mut rcb).unwrap(); + assert!(aio_return(&mut wcb).unwrap() as usize == WBUF.len()); + assert!(aio_return(&mut rcb).unwrap() as usize == rbuf.len()); + assert!(rbuf == b"3456"); + + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf2).unwrap(); + assert!(len == EXPECT.len()); + assert!(rbuf2 == EXPECT); +} + +// Test lio_listio with LIO_NOWAIT and a SigEvent to indicate when all AioCb's +// are complete. +#[test] +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +fn test_lio_listio_signal() { + const INITIAL: &'static [u8] = b"abcdef123456"; + const WBUF: &'static [u8] = b"CDEF"; + let mut rbuf = vec![0; 4]; + let mut rbuf2 = Vec::new(); + const EXPECT: &'static [u8] = b"abCDEF123456"; + let mut f = tempfile().unwrap(); + let sa = SigAction::new(SigHandler::Handler(sigfunc), + SA_RESETHAND, + SigSet::empty()); + let sigev_notify = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, + si_value: 0 }; + + f.write(INITIAL).unwrap(); + + let mut wcb = unsafe { + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + &WBUF, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_WRITE) + }; + + let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_READ); + unsafe {signaled = 0 }; + unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); + { + let cbbuf = [&mut wcb, &mut rcb]; + let err = lio_listio(LioMode::LIO_NOWAIT, &cbbuf[..], sigev_notify); + err.expect("lio_listio failed"); + } + while unsafe { signaled == 0 } { + thread::sleep(time::Duration::from_millis(10)); + } + + assert!(aio_return(&mut wcb).unwrap() as usize == WBUF.len()); + assert!(aio_return(&mut rcb).unwrap() as usize == rbuf.len()); + assert!(rbuf == b"3456"); + + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf2).unwrap(); + assert!(len == EXPECT.len()); + assert!(rbuf2 == EXPECT); +} -- cgit v1.2.3