From 8127b4dd6d37ae028a31c013b7d5758d66362b04 Mon Sep 17 00:00:00 2001 From: Markus Jais Date: Mon, 27 Apr 2015 13:35:16 -0700 Subject: Basic Posix MQ support --- src/lib.rs | 3 ++ src/mq.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ test/test.rs | 3 ++ test/test_mq.rs | 60 +++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+) create mode 100644 src/mq.rs create mode 100644 test/test_mq.rs diff --git a/src/lib.rs b/src/lib.rs index 66dd6ac2..c3807876 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,9 @@ pub mod fcntl; #[cfg(any(target_os = "linux", target_os = "android"))] pub mod mount; +#[cfg(any(target_os = "linux"))] +pub mod mq; + #[cfg(any(target_os = "linux", target_os = "android"))] pub mod sched; diff --git a/src/mq.rs b/src/mq.rs new file mode 100644 index 00000000..63078192 --- /dev/null +++ b/src/mq.rs @@ -0,0 +1,97 @@ +use {Error, Result, from_ffi}; +use errno::Errno; + +use libc::{c_int, c_long, c_char, size_t, mode_t, strlen}; +use std::ffi::CString; +use sys::stat::Mode; + +pub use self::consts::*; + +pub type MQd = c_int; + +#[cfg(target_os = "linux")] +mod consts { + use libc::c_int; + + bitflags!( + flags MQ_OFlag: c_int { + const O_RDONLY = 0o00000000, + const O_WRONLY = 0o00000001, + const O_RDWR = 0o00000002, + const O_CREAT = 0o00000100, + const O_EXCL = 0o00000200, + const O_NONBLOCK = 0o00004000, + const O_CLOEXEC = 0o02000000, + } + ); + + bitflags!( + flags FdFlag: c_int { + const FD_CLOEXEC = 1 + } + ); +} + +mod ffi { + use libc::{c_char, size_t, ssize_t, c_uint, c_int, mode_t}; + use super::MQd; + use super::MqAttr; + + extern { + pub fn mq_open(name: *const c_char, oflag: c_int, mode: mode_t, attr: *const MqAttr) -> MQd; + + pub fn mq_close (mqdes: MQd) -> c_int; + + pub fn mq_receive (mqdes: MQd, msg_ptr: *const c_char, msg_len: size_t, msq_prio: *const c_uint) -> ssize_t; + + pub fn mq_send (mqdes: MQd, msg_ptr: *const c_char, msg_len: size_t, msq_prio: c_uint) -> c_int; + } +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct MqAttr { + pub mq_flags: c_long, + pub mq_maxmsg: c_long, + pub mq_msgsize: c_long, + pub mq_curmsgs: c_long, +} + +#[inline] +pub fn mq_open(name: &CString, oflag: MQ_OFlag, mode: Mode, attr: &MqAttr) -> Result { + let res = unsafe { ffi::mq_open(name.as_ptr(), oflag.bits(), mode.bits() as mode_t, attr as *const MqAttr) }; + + if res < 0 { + return Err(Error::Sys(Errno::last())); + } + + Ok(res) +} + +pub fn mq_close(mqdes: MQd) -> Result<()> { + let res = unsafe { ffi::mq_close(mqdes) }; + from_ffi(res) +} + + +pub fn mq_receive(mqdes: MQd, message: &mut [u8], msq_prio: u32) -> Result { + let len = message.len() as size_t; + let res = unsafe { ffi::mq_receive(mqdes, message.as_mut_ptr() as *mut c_char, len, &msq_prio) }; + + if res < 0 { + return Err(Error::Sys(Errno::last())); + } + + Ok(res as usize) +} + +pub fn mq_send(mqdes: MQd, message: &CString, msq_prio: u32) -> Result { + let len = unsafe { strlen(message.as_ptr()) as size_t }; + let res = unsafe { ffi::mq_send(mqdes, message.as_ptr(), len, msq_prio) }; + + if res < 0 { + return Err(Error::Sys(Errno::last())); + } + + Ok(res as usize) +} diff --git a/test/test.rs b/test/test.rs index be1e0ad8..338de581 100644 --- a/test/test.rs +++ b/test/test.rs @@ -7,6 +7,9 @@ mod test_nix_path; mod test_stat; mod test_unistd; +#[cfg(any(target_os = "linux"))] +mod test_mq; + mod ports { use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::atomic::Ordering::SeqCst; diff --git a/test/test_mq.rs b/test/test_mq.rs new file mode 100644 index 00000000..6d03bd06 --- /dev/null +++ b/test/test_mq.rs @@ -0,0 +1,60 @@ +use nix::mq::{mq_open, mq_close, mq_send, mq_receive}; +use nix::mq::{O_CREAT, O_WRONLY, O_RDONLY}; +use nix::mq::MqAttr; +use nix::sys::stat::{S_IWUSR, S_IRUSR, S_IRGRP, S_IROTH}; +use std::ffi::CString; +use libc::{c_long, strlen}; + +use nix::unistd::{fork, read, write, pipe}; +use nix::unistd::Fork::{Child, Parent}; +use nix::sys::wait::*; + + +#[test] +fn mq_send_and_receive() { + + const MSG_SIZE: c_long = 32; + + let attr = MqAttr { mq_flags: 0, mq_maxmsg: 10, mq_msgsize: MSG_SIZE, mq_curmsgs: 0 }; + let mq_name_in_parent = &CString::new(b"/a_nix_test_queue".as_ref()).unwrap(); + let mqd_in_parent = mq_open(mq_name_in_parent, O_CREAT | O_WRONLY, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH, &attr).unwrap(); + let msg_to_send = &CString::new("msg_1").unwrap(); + let len = unsafe { strlen(msg_to_send.as_ptr()) as usize }; + + mq_send(mqd_in_parent, msg_to_send, 1).unwrap(); + + let (reader, writer) = pipe().unwrap(); + + let pid = fork(); + match pid { + Ok(Child) => { + let mq_name_in_child = &CString::new(b"/a_nix_test_queue".as_ref()).unwrap(); + let mqd_in_child = mq_open(mq_name_in_child, O_CREAT | O_RDONLY, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH, &attr).unwrap(); + let mut buf = [0u8; 32]; + let length_msg_received = mq_receive(mqd_in_child, &mut buf, 1).unwrap(); + assert!(length_msg_received == len); + let message_str = String::from_utf8_lossy(&buf[0 .. len]); + let expected_str = String::from_utf8_lossy(msg_to_send.as_bytes()); + assert!(message_str == expected_str); + write(writer, &buf).unwrap(); // pipe result to parent process. Otherwise cargo does not report test failures correctly + mq_close(mqd_in_child).unwrap(); + } + Ok(Parent(child_pid)) => { + mq_close(mqd_in_parent).unwrap(); + + // Wait for the child to exit. + waitpid(child_pid, None).unwrap(); + // Read 1024 bytes. + let mut read_buf = [0u8; 32]; + read(reader, &mut read_buf).unwrap(); + let message_str = String::from_utf8_lossy(&read_buf); + assert!(message_str.contains("msg_1")); + }, + // panic, fork should never fail unless there is a serious problem with the OS + Err(_) => panic!("Error: Fork Failed") + } +} + + + +// cargo clean; cargo test -- --nocapture -- cgit v1.2.3