summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarkus Jais <markusjais@gmx.de>2015-04-27 13:35:16 -0700
committerCarl Lerche <me@carllerche.com>2015-04-27 13:37:18 -0700
commit8127b4dd6d37ae028a31c013b7d5758d66362b04 (patch)
tree7820835fbcec08166f7a9cd45e7efc2eb0e61ef6
parentdc363728f17e4c6085f6855f5a9a191cceb408ab (diff)
downloadnix-8127b4dd6d37ae028a31c013b7d5758d66362b04.zip
Basic Posix MQ support
-rw-r--r--src/lib.rs3
-rw-r--r--src/mq.rs97
-rw-r--r--test/test.rs3
-rw-r--r--test/test_mq.rs60
4 files changed, 163 insertions, 0 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 66dd6ac2..c3807876 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -29,6 +29,9 @@ pub mod fcntl;
#[cfg(any(target_os = "linux", target_os = "android"))]
pub mod mount;
+#[cfg(any(target_os = "linux"))]
+pub mod mq;
+
#[cfg(any(target_os = "linux", target_os = "android"))]
pub mod sched;
diff --git a/src/mq.rs b/src/mq.rs
new file mode 100644
index 00000000..63078192
--- /dev/null
+++ b/src/mq.rs
@@ -0,0 +1,97 @@
+use {Error, Result, from_ffi};
+use errno::Errno;
+
+use libc::{c_int, c_long, c_char, size_t, mode_t, strlen};
+use std::ffi::CString;
+use sys::stat::Mode;
+
+pub use self::consts::*;
+
+pub type MQd = c_int;
+
+#[cfg(target_os = "linux")]
+mod consts {
+ use libc::c_int;
+
+ bitflags!(
+ flags MQ_OFlag: c_int {
+ const O_RDONLY = 0o00000000,
+ const O_WRONLY = 0o00000001,
+ const O_RDWR = 0o00000002,
+ const O_CREAT = 0o00000100,
+ const O_EXCL = 0o00000200,
+ const O_NONBLOCK = 0o00004000,
+ const O_CLOEXEC = 0o02000000,
+ }
+ );
+
+ bitflags!(
+ flags FdFlag: c_int {
+ const FD_CLOEXEC = 1
+ }
+ );
+}
+
+mod ffi {
+ use libc::{c_char, size_t, ssize_t, c_uint, c_int, mode_t};
+ use super::MQd;
+ use super::MqAttr;
+
+ extern {
+ pub fn mq_open(name: *const c_char, oflag: c_int, mode: mode_t, attr: *const MqAttr) -> MQd;
+
+ pub fn mq_close (mqdes: MQd) -> c_int;
+
+ pub fn mq_receive (mqdes: MQd, msg_ptr: *const c_char, msg_len: size_t, msq_prio: *const c_uint) -> ssize_t;
+
+ pub fn mq_send (mqdes: MQd, msg_ptr: *const c_char, msg_len: size_t, msq_prio: c_uint) -> c_int;
+ }
+}
+
+#[repr(C)]
+#[derive(Clone, Copy, Debug)]
+pub struct MqAttr {
+ pub mq_flags: c_long,
+ pub mq_maxmsg: c_long,
+ pub mq_msgsize: c_long,
+ pub mq_curmsgs: c_long,
+}
+
+#[inline]
+pub fn mq_open(name: &CString, oflag: MQ_OFlag, mode: Mode, attr: &MqAttr) -> Result<MQd> {
+ let res = unsafe { ffi::mq_open(name.as_ptr(), oflag.bits(), mode.bits() as mode_t, attr as *const MqAttr) };
+
+ if res < 0 {
+ return Err(Error::Sys(Errno::last()));
+ }
+
+ Ok(res)
+}
+
+pub fn mq_close(mqdes: MQd) -> Result<()> {
+ let res = unsafe { ffi::mq_close(mqdes) };
+ from_ffi(res)
+}
+
+
+pub fn mq_receive(mqdes: MQd, message: &mut [u8], msq_prio: u32) -> Result<usize> {
+ let len = message.len() as size_t;
+ let res = unsafe { ffi::mq_receive(mqdes, message.as_mut_ptr() as *mut c_char, len, &msq_prio) };
+
+ if res < 0 {
+ return Err(Error::Sys(Errno::last()));
+ }
+
+ Ok(res as usize)
+}
+
+pub fn mq_send(mqdes: MQd, message: &CString, msq_prio: u32) -> Result<usize> {
+ let len = unsafe { strlen(message.as_ptr()) as size_t };
+ let res = unsafe { ffi::mq_send(mqdes, message.as_ptr(), len, msq_prio) };
+
+ if res < 0 {
+ return Err(Error::Sys(Errno::last()));
+ }
+
+ Ok(res as usize)
+}
diff --git a/test/test.rs b/test/test.rs
index be1e0ad8..338de581 100644
--- a/test/test.rs
+++ b/test/test.rs
@@ -7,6 +7,9 @@ mod test_nix_path;
mod test_stat;
mod test_unistd;
+#[cfg(any(target_os = "linux"))]
+mod test_mq;
+
mod ports {
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::SeqCst;
diff --git a/test/test_mq.rs b/test/test_mq.rs
new file mode 100644
index 00000000..6d03bd06
--- /dev/null
+++ b/test/test_mq.rs
@@ -0,0 +1,60 @@
+use nix::mq::{mq_open, mq_close, mq_send, mq_receive};
+use nix::mq::{O_CREAT, O_WRONLY, O_RDONLY};
+use nix::mq::MqAttr;
+use nix::sys::stat::{S_IWUSR, S_IRUSR, S_IRGRP, S_IROTH};
+use std::ffi::CString;
+use libc::{c_long, strlen};
+
+use nix::unistd::{fork, read, write, pipe};
+use nix::unistd::Fork::{Child, Parent};
+use nix::sys::wait::*;
+
+
+#[test]
+fn mq_send_and_receive() {
+
+ const MSG_SIZE: c_long = 32;
+
+ let attr = MqAttr { mq_flags: 0, mq_maxmsg: 10, mq_msgsize: MSG_SIZE, mq_curmsgs: 0 };
+ let mq_name_in_parent = &CString::new(b"/a_nix_test_queue".as_ref()).unwrap();
+ let mqd_in_parent = mq_open(mq_name_in_parent, O_CREAT | O_WRONLY, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH, &attr).unwrap();
+ let msg_to_send = &CString::new("msg_1").unwrap();
+ let len = unsafe { strlen(msg_to_send.as_ptr()) as usize };
+
+ mq_send(mqd_in_parent, msg_to_send, 1).unwrap();
+
+ let (reader, writer) = pipe().unwrap();
+
+ let pid = fork();
+ match pid {
+ Ok(Child) => {
+ let mq_name_in_child = &CString::new(b"/a_nix_test_queue".as_ref()).unwrap();
+ let mqd_in_child = mq_open(mq_name_in_child, O_CREAT | O_RDONLY, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH, &attr).unwrap();
+ let mut buf = [0u8; 32];
+ let length_msg_received = mq_receive(mqd_in_child, &mut buf, 1).unwrap();
+ assert!(length_msg_received == len);
+ let message_str = String::from_utf8_lossy(&buf[0 .. len]);
+ let expected_str = String::from_utf8_lossy(msg_to_send.as_bytes());
+ assert!(message_str == expected_str);
+ write(writer, &buf).unwrap(); // pipe result to parent process. Otherwise cargo does not report test failures correctly
+ mq_close(mqd_in_child).unwrap();
+ }
+ Ok(Parent(child_pid)) => {
+ mq_close(mqd_in_parent).unwrap();
+
+ // Wait for the child to exit.
+ waitpid(child_pid, None).unwrap();
+ // Read 1024 bytes.
+ let mut read_buf = [0u8; 32];
+ read(reader, &mut read_buf).unwrap();
+ let message_str = String::from_utf8_lossy(&read_buf);
+ assert!(message_str.contains("msg_1"));
+ },
+ // panic, fork should never fail unless there is a serious problem with the OS
+ Err(_) => panic!("Error: Fork Failed")
+ }
+}
+
+
+
+// cargo clean; cargo test -- --nocapture