summaryrefslogtreecommitdiff
path: root/src/sys/socket/mod.rs
diff options
context:
space:
mode:
authorGleb Pomykalov <gleb@lancastr.com>2020-04-08 12:41:56 +0300
committerGleb Pomykalov <gleb@lancastr.com>2020-04-26 03:07:03 +0300
commit833369b2413a44ed0477cf8618e4088e523ca137 (patch)
treec246dfe95339d12f9d157312ce0b044fbda84cfe /src/sys/socket/mod.rs
parent490e979518256ae38823fd5f4c6c89005d39da84 (diff)
downloadnix-833369b2413a44ed0477cf8618e4088e523ca137.zip
Support sendmmsg/recvmmsg
Diffstat (limited to 'src/sys/socket/mod.rs')
-rw-r--r--src/sys/socket/mod.rs357
1 files changed, 302 insertions, 55 deletions
diff --git a/src/sys/socket/mod.rs b/src/sys/socket/mod.rs
index 18673e16..2d214e7e 100644
--- a/src/sys/socket/mod.rs
+++ b/src/sys/socket/mod.rs
@@ -819,7 +819,297 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage],
// First size the buffer needed to hold the cmsgs. It must be zeroed,
// because subsequent code will not clear the padding bytes.
- let cmsg_buffer = vec![0u8; capacity];
+ let mut cmsg_buffer = vec![0u8; capacity];
+
+ let mhdr = pack_mhdr_to_send(&mut cmsg_buffer[..], &iov, &cmsgs, addr);
+
+ let ret = unsafe { libc::sendmsg(fd, &mhdr, flags.bits()) };
+
+ Errno::result(ret).map(|r| r as usize)
+}
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "openbsd",
+ target_os = "netbsd",
+))]
+#[derive(Debug)]
+pub struct SendMmsgData<'a, I, C>
+ where
+ I: AsRef<[IoVec<&'a [u8]>]>,
+ C: AsRef<[ControlMessage<'a>]>
+{
+ pub iov: I,
+ pub cmsgs: C,
+ pub addr: Option<SockAddr>,
+ pub _lt: std::marker::PhantomData<&'a I>,
+}
+
+/// An extension of `sendmsg` that allows the caller to transmit multiple
+/// messages on a socket using a single system call. This has performance
+/// benefits for some applications.
+///
+/// Allocations are performed for cmsgs and to build `msghdr` buffer
+///
+/// # Arguments
+///
+/// * `fd`: Socket file descriptor
+/// * `data`: Struct that implements `IntoIterator` with `SendMmsgData` items
+/// * `flags`: Optional flags passed directly to the operating system.
+///
+/// # Returns
+/// `Vec` with numbers of sent bytes on each sent message.
+///
+/// # References
+/// [`sendmsg`](fn.sendmsg.html)
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "openbsd",
+ target_os = "netbsd",
+))]
+pub fn sendmmsg<'a, I, C>(
+ fd: RawFd,
+ data: impl std::iter::IntoIterator<Item=&'a SendMmsgData<'a, I, C>>,
+ flags: MsgFlags
+) -> Result<Vec<usize>>
+ where
+ I: AsRef<[IoVec<&'a [u8]>]> + 'a,
+ C: AsRef<[ControlMessage<'a>]> + 'a,
+{
+ let iter = data.into_iter();
+
+ let size_hint = iter.size_hint();
+ let reserve_items = size_hint.1.unwrap_or(size_hint.0);
+
+ let mut output = Vec::<libc::mmsghdr>::with_capacity(reserve_items);
+
+ let mut cmsgs_buffer = vec![0u8; 0];
+
+ for d in iter {
+ let cmsgs_start = cmsgs_buffer.len();
+ let cmsgs_required_capacity: usize = d.cmsgs.as_ref().iter().map(|c| c.space()).sum();
+ let cmsgs_buffer_need_capacity = cmsgs_start + cmsgs_required_capacity;
+ cmsgs_buffer.resize(cmsgs_buffer_need_capacity, 0);
+
+ output.push(libc::mmsghdr {
+ msg_hdr: pack_mhdr_to_send(
+ &mut cmsgs_buffer[cmsgs_start..],
+ &d.iov,
+ &d.cmsgs,
+ d.addr.as_ref()
+ ),
+ msg_len: 0,
+ });
+ };
+
+ let ret = unsafe { libc::sendmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _) };
+
+ let sent_messages = Errno::result(ret)? as usize;
+ let mut sent_bytes = Vec::with_capacity(sent_messages);
+
+ for item in &output {
+ sent_bytes.push(item.msg_len as usize);
+ }
+
+ Ok(sent_bytes)
+}
+
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "netbsd",
+))]
+#[derive(Debug)]
+pub struct RecvMmsgData<'a, I>
+ where
+ I: AsRef<[IoVec<&'a mut [u8]>]> + 'a,
+{
+ pub iov: I,
+ pub cmsg_buffer: Option<&'a mut Vec<u8>>,
+}
+
+/// An extension of `recvmsg` that allows the caller to receive multiple
+/// messages from a socket using a single system call. This has
+/// performance benefits for some applications.
+///
+/// `iov` and `cmsg_buffer` should be constructed similarly to `recvmsg`
+///
+/// Multiple allocations are performed
+///
+/// # Arguments
+///
+/// * `fd`: Socket file descriptor
+/// * `data`: Struct that implements `IntoIterator` with `RecvMmsgData` items
+/// * `flags`: Optional flags passed directly to the operating system.
+///
+/// # RecvMmsgData
+///
+/// * `iov`: Scatter-gather list of buffers to receive the message
+/// * `cmsg_buffer`: Space to receive ancillary data. Should be created by
+/// [`cmsg_space!`](macro.cmsg_space.html)
+///
+/// # Returns
+/// A `Vec` with multiple `RecvMsg`, one per received message
+///
+/// # References
+/// - [`recvmsg`](fn.recvmsg.html)
+/// - [`RecvMsg`](struct.RecvMsg.html)
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "netbsd",
+))]
+pub fn recvmmsg<'a, I>(
+ fd: RawFd,
+ data: impl std::iter::IntoIterator<Item=&'a mut RecvMmsgData<'a, I>,
+ IntoIter=impl ExactSizeIterator + Iterator<Item=&'a mut RecvMmsgData<'a, I>>>,
+ flags: MsgFlags,
+ timeout: Option<crate::sys::time::TimeSpec>
+) -> Result<Vec<RecvMsg<'a>>>
+ where
+ I: AsRef<[IoVec<&'a mut [u8]>]> + 'a,
+{
+ let iter = data.into_iter();
+
+ let num_messages = iter.len();
+
+ let mut output: Vec<libc::mmsghdr> = Vec::with_capacity(num_messages);
+
+ // Addresses should be pre-allocated and never change the address during building
+ // of the input data for `recvmmsg`
+ let mut addresses: Vec<sockaddr_storage> = vec![unsafe { mem::zeroed() }; num_messages];
+
+ let results: Vec<_> = iter.enumerate().map(|(i, d)| {
+ let (msg_controllen, mhdr) = unsafe {
+ pack_mhdr_to_receive(
+ d.iov.as_ref(),
+ &mut d.cmsg_buffer,
+ &mut addresses[i],
+ )
+ };
+
+ output.push(
+ libc::mmsghdr {
+ msg_hdr: mhdr,
+ msg_len: 0,
+ }
+ );
+
+ (msg_controllen as usize, &mut d.cmsg_buffer)
+ }).collect();
+
+ let timeout = if let Some(mut t) = timeout {
+ t.as_mut() as *mut libc::timespec
+ } else {
+ ptr::null_mut()
+ };
+
+ let ret = unsafe { libc::recvmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _, timeout) };
+
+ let r = Errno::result(ret)?;
+
+ Ok(output
+ .into_iter()
+ .zip(addresses.into_iter())
+ .zip(results.into_iter())
+ .map(|((mmsghdr, address), (msg_controllen, cmsg_buffer))| {
+ unsafe {
+ read_mhdr(
+ mmsghdr.msg_hdr,
+ r as isize,
+ msg_controllen,
+ address,
+ cmsg_buffer
+ )
+ }
+ })
+ .collect())
+}
+
+unsafe fn read_mhdr<'a, 'b>(
+ mhdr: msghdr,
+ r: isize,
+ msg_controllen: usize,
+ address: sockaddr_storage,
+ cmsg_buffer: &'a mut Option<&'b mut Vec<u8>>
+) -> RecvMsg<'b> {
+ let cmsghdr = {
+ if mhdr.msg_controllen > 0 {
+ // got control message(s)
+ cmsg_buffer
+ .as_mut()
+ .unwrap()
+ .set_len(mhdr.msg_controllen as usize);
+ debug_assert!(!mhdr.msg_control.is_null());
+ debug_assert!(msg_controllen >= mhdr.msg_controllen as usize);
+ CMSG_FIRSTHDR(&mhdr as *const msghdr)
+ } else {
+ ptr::null()
+ }.as_ref()
+ };
+
+ let address = sockaddr_storage_to_addr(
+ &address ,
+ mhdr.msg_namelen as usize
+ ).ok();
+
+ RecvMsg {
+ bytes: r as usize,
+ cmsghdr,
+ address,
+ flags: MsgFlags::from_bits_truncate(mhdr.msg_flags),
+ mhdr,
+ }
+}
+
+unsafe fn pack_mhdr_to_receive<'a, I>(
+ iov: I,
+ cmsg_buffer: &mut Option<&mut Vec<u8>>,
+ address: *mut sockaddr_storage,
+) -> (usize, msghdr)
+ where
+ I: AsRef<[IoVec<&'a mut [u8]>]> + 'a,
+{
+ let (msg_control, msg_controllen) = cmsg_buffer.as_mut()
+ .map(|v| (v.as_mut_ptr(), v.capacity()))
+ .unwrap_or((ptr::null_mut(), 0));
+
+ let mhdr = {
+ // Musl's msghdr has private fields, so this is the only way to
+ // initialize it.
+ let mut mhdr = mem::MaybeUninit::<msghdr>::zeroed();
+ let p = mhdr.as_mut_ptr();
+ (*p).msg_name = address as *mut c_void;
+ (*p).msg_namelen = mem::size_of::<sockaddr_storage>() as socklen_t;
+ (*p).msg_iov = iov.as_ref().as_ptr() as *mut iovec;
+ (*p).msg_iovlen = iov.as_ref().len() as _;
+ (*p).msg_control = msg_control as *mut c_void;
+ (*p).msg_controllen = msg_controllen as _;
+ (*p).msg_flags = 0;
+ mhdr.assume_init()
+ };
+
+ (msg_controllen, mhdr)
+}
+
+fn pack_mhdr_to_send<'a, I, C>(
+ cmsg_buffer: &mut [u8],
+ iov: I,
+ cmsgs: C,
+ addr: Option<&SockAddr>
+) -> msghdr
+ where
+ I: AsRef<[IoVec<&'a [u8]>]>,
+ C: AsRef<[ControlMessage<'a>]>
+{
+ let capacity = cmsg_buffer.len();
// Next encode the sending address, if provided
let (name, namelen) = match addr {
@@ -846,8 +1136,8 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage],
(*p).msg_namelen = namelen;
// transmute iov into a mutable pointer. sendmsg doesn't really mutate
// the buffer, but the standard says that it takes a mutable pointer
- (*p).msg_iov = iov.as_ptr() as *mut _;
- (*p).msg_iovlen = iov.len() as _;
+ (*p).msg_iov = iov.as_ref().as_ptr() as *mut _;
+ (*p).msg_iovlen = iov.as_ref().len() as _;
(*p).msg_control = cmsg_ptr;
(*p).msg_controllen = capacity as _;
(*p).msg_flags = 0;
@@ -857,19 +1147,17 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage],
// Encode each cmsg. This must happen after initializing the header because
// CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields.
// CMSG_FIRSTHDR is always safe
- let mut pmhdr: *mut cmsghdr = unsafe{CMSG_FIRSTHDR(&mhdr as *const msghdr)};
- for cmsg in cmsgs {
+ let mut pmhdr: *mut cmsghdr = unsafe { CMSG_FIRSTHDR(&mhdr as *const msghdr) };
+ for cmsg in cmsgs.as_ref() {
assert_ne!(pmhdr, ptr::null_mut());
// Safe because we know that pmhdr is valid, and we initialized it with
// sufficient space
unsafe { cmsg.encode_into(pmhdr) };
// Safe because mhdr is valid
- pmhdr = unsafe{CMSG_NXTHDR(&mhdr as *const msghdr, pmhdr)};
+ pmhdr = unsafe { CMSG_NXTHDR(&mhdr as *const msghdr, pmhdr) };
}
- let ret = unsafe { libc::sendmsg(fd, &mhdr, flags.bits()) };
-
- Errno::result(ret).map(|r| r as usize)
+ mhdr
}
/// Receive message in scatter-gather vectors from a socket, and
@@ -891,57 +1179,16 @@ pub fn recvmsg<'a>(fd: RawFd, iov: &[IoVec<&mut [u8]>],
flags: MsgFlags) -> Result<RecvMsg<'a>>
{
let mut address = mem::MaybeUninit::uninit();
- let (msg_control, msg_controllen) = cmsg_buffer.as_mut()
- .map(|v| (v.as_mut_ptr(), v.capacity()))
- .unwrap_or((ptr::null_mut(), 0));
- let mut mhdr = {
- unsafe {
- // Musl's msghdr has private fields, so this is the only way to
- // initialize it.
- let mut mhdr = mem::MaybeUninit::<msghdr>::zeroed();
- let p = mhdr.as_mut_ptr();
- (*p).msg_name = address.as_mut_ptr() as *mut c_void;
- (*p).msg_namelen = mem::size_of::<sockaddr_storage>() as socklen_t;
- (*p).msg_iov = iov.as_ptr() as *mut iovec;
- (*p).msg_iovlen = iov.len() as _;
- (*p).msg_control = msg_control as *mut c_void;
- (*p).msg_controllen = msg_controllen as _;
- (*p).msg_flags = 0;
- mhdr.assume_init()
- }
+
+ let (msg_controllen, mut mhdr) = unsafe {
+ pack_mhdr_to_receive(&iov, &mut cmsg_buffer, address.as_mut_ptr())
};
let ret = unsafe { libc::recvmsg(fd, &mut mhdr, flags.bits()) };
- Errno::result(ret).map(|r| {
- let cmsghdr = unsafe {
- if mhdr.msg_controllen > 0 {
- // got control message(s)
- cmsg_buffer
- .as_mut()
- .unwrap()
- .set_len(mhdr.msg_controllen as usize);
- debug_assert!(!mhdr.msg_control.is_null());
- debug_assert!(msg_controllen >= mhdr.msg_controllen as usize);
- CMSG_FIRSTHDR(&mhdr as *const msghdr)
- } else {
- ptr::null()
- }.as_ref()
- };
+ let r = Errno::result(ret)?;
- let address = unsafe {
- sockaddr_storage_to_addr(&address.assume_init(),
- mhdr.msg_namelen as usize
- ).ok()
- };
- RecvMsg {
- bytes: r as usize,
- cmsghdr,
- address,
- flags: MsgFlags::from_bits_truncate(mhdr.msg_flags),
- mhdr,
- }
- })
+ Ok(unsafe { read_mhdr(mhdr, r, msg_controllen, address.assume_init(), &mut cmsg_buffer) })
}