From 833369b2413a44ed0477cf8618e4088e523ca137 Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Wed, 8 Apr 2020 12:41:56 +0300 Subject: Support sendmmsg/recvmmsg --- src/sys/socket/mod.rs | 357 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 302 insertions(+), 55 deletions(-) (limited to 'src/sys/socket/mod.rs') diff --git a/src/sys/socket/mod.rs b/src/sys/socket/mod.rs index 18673e16..2d214e7e 100644 --- a/src/sys/socket/mod.rs +++ b/src/sys/socket/mod.rs @@ -819,7 +819,297 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage], // First size the buffer needed to hold the cmsgs. It must be zeroed, // because subsequent code will not clear the padding bytes. - let cmsg_buffer = vec![0u8; capacity]; + let mut cmsg_buffer = vec![0u8; capacity]; + + let mhdr = pack_mhdr_to_send(&mut cmsg_buffer[..], &iov, &cmsgs, addr); + + let ret = unsafe { libc::sendmsg(fd, &mhdr, flags.bits()) }; + + Errno::result(ret).map(|r| r as usize) +} + +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", +))] +#[derive(Debug)] +pub struct SendMmsgData<'a, I, C> + where + I: AsRef<[IoVec<&'a [u8]>]>, + C: AsRef<[ControlMessage<'a>]> +{ + pub iov: I, + pub cmsgs: C, + pub addr: Option, + pub _lt: std::marker::PhantomData<&'a I>, +} + +/// An extension of `sendmsg` that allows the caller to transmit multiple +/// messages on a socket using a single system call. This has performance +/// benefits for some applications. +/// +/// Allocations are performed for cmsgs and to build `msghdr` buffer +/// +/// # Arguments +/// +/// * `fd`: Socket file descriptor +/// * `data`: Struct that implements `IntoIterator` with `SendMmsgData` items +/// * `flags`: Optional flags passed directly to the operating system. +/// +/// # Returns +/// `Vec` with numbers of sent bytes on each sent message. +/// +/// # References +/// [`sendmsg`](fn.sendmsg.html) +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", +))] +pub fn sendmmsg<'a, I, C>( + fd: RawFd, + data: impl std::iter::IntoIterator>, + flags: MsgFlags +) -> Result> + where + I: AsRef<[IoVec<&'a [u8]>]> + 'a, + C: AsRef<[ControlMessage<'a>]> + 'a, +{ + let iter = data.into_iter(); + + let size_hint = iter.size_hint(); + let reserve_items = size_hint.1.unwrap_or(size_hint.0); + + let mut output = Vec::::with_capacity(reserve_items); + + let mut cmsgs_buffer = vec![0u8; 0]; + + for d in iter { + let cmsgs_start = cmsgs_buffer.len(); + let cmsgs_required_capacity: usize = d.cmsgs.as_ref().iter().map(|c| c.space()).sum(); + let cmsgs_buffer_need_capacity = cmsgs_start + cmsgs_required_capacity; + cmsgs_buffer.resize(cmsgs_buffer_need_capacity, 0); + + output.push(libc::mmsghdr { + msg_hdr: pack_mhdr_to_send( + &mut cmsgs_buffer[cmsgs_start..], + &d.iov, + &d.cmsgs, + d.addr.as_ref() + ), + msg_len: 0, + }); + }; + + let ret = unsafe { libc::sendmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _) }; + + let sent_messages = Errno::result(ret)? as usize; + let mut sent_bytes = Vec::with_capacity(sent_messages); + + for item in &output { + sent_bytes.push(item.msg_len as usize); + } + + Ok(sent_bytes) +} + + +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", +))] +#[derive(Debug)] +pub struct RecvMmsgData<'a, I> + where + I: AsRef<[IoVec<&'a mut [u8]>]> + 'a, +{ + pub iov: I, + pub cmsg_buffer: Option<&'a mut Vec>, +} + +/// An extension of `recvmsg` that allows the caller to receive multiple +/// messages from a socket using a single system call. This has +/// performance benefits for some applications. +/// +/// `iov` and `cmsg_buffer` should be constructed similarly to `recvmsg` +/// +/// Multiple allocations are performed +/// +/// # Arguments +/// +/// * `fd`: Socket file descriptor +/// * `data`: Struct that implements `IntoIterator` with `RecvMmsgData` items +/// * `flags`: Optional flags passed directly to the operating system. +/// +/// # RecvMmsgData +/// +/// * `iov`: Scatter-gather list of buffers to receive the message +/// * `cmsg_buffer`: Space to receive ancillary data. Should be created by +/// [`cmsg_space!`](macro.cmsg_space.html) +/// +/// # Returns +/// A `Vec` with multiple `RecvMsg`, one per received message +/// +/// # References +/// - [`recvmsg`](fn.recvmsg.html) +/// - [`RecvMsg`](struct.RecvMsg.html) +#[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", +))] +pub fn recvmmsg<'a, I>( + fd: RawFd, + data: impl std::iter::IntoIterator, + IntoIter=impl ExactSizeIterator + Iterator>>, + flags: MsgFlags, + timeout: Option +) -> Result>> + where + I: AsRef<[IoVec<&'a mut [u8]>]> + 'a, +{ + let iter = data.into_iter(); + + let num_messages = iter.len(); + + let mut output: Vec = Vec::with_capacity(num_messages); + + // Addresses should be pre-allocated and never change the address during building + // of the input data for `recvmmsg` + let mut addresses: Vec = vec![unsafe { mem::zeroed() }; num_messages]; + + let results: Vec<_> = iter.enumerate().map(|(i, d)| { + let (msg_controllen, mhdr) = unsafe { + pack_mhdr_to_receive( + d.iov.as_ref(), + &mut d.cmsg_buffer, + &mut addresses[i], + ) + }; + + output.push( + libc::mmsghdr { + msg_hdr: mhdr, + msg_len: 0, + } + ); + + (msg_controllen as usize, &mut d.cmsg_buffer) + }).collect(); + + let timeout = if let Some(mut t) = timeout { + t.as_mut() as *mut libc::timespec + } else { + ptr::null_mut() + }; + + let ret = unsafe { libc::recvmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _, timeout) }; + + let r = Errno::result(ret)?; + + Ok(output + .into_iter() + .zip(addresses.into_iter()) + .zip(results.into_iter()) + .map(|((mmsghdr, address), (msg_controllen, cmsg_buffer))| { + unsafe { + read_mhdr( + mmsghdr.msg_hdr, + r as isize, + msg_controllen, + address, + cmsg_buffer + ) + } + }) + .collect()) +} + +unsafe fn read_mhdr<'a, 'b>( + mhdr: msghdr, + r: isize, + msg_controllen: usize, + address: sockaddr_storage, + cmsg_buffer: &'a mut Option<&'b mut Vec> +) -> RecvMsg<'b> { + let cmsghdr = { + if mhdr.msg_controllen > 0 { + // got control message(s) + cmsg_buffer + .as_mut() + .unwrap() + .set_len(mhdr.msg_controllen as usize); + debug_assert!(!mhdr.msg_control.is_null()); + debug_assert!(msg_controllen >= mhdr.msg_controllen as usize); + CMSG_FIRSTHDR(&mhdr as *const msghdr) + } else { + ptr::null() + }.as_ref() + }; + + let address = sockaddr_storage_to_addr( + &address , + mhdr.msg_namelen as usize + ).ok(); + + RecvMsg { + bytes: r as usize, + cmsghdr, + address, + flags: MsgFlags::from_bits_truncate(mhdr.msg_flags), + mhdr, + } +} + +unsafe fn pack_mhdr_to_receive<'a, I>( + iov: I, + cmsg_buffer: &mut Option<&mut Vec>, + address: *mut sockaddr_storage, +) -> (usize, msghdr) + where + I: AsRef<[IoVec<&'a mut [u8]>]> + 'a, +{ + let (msg_control, msg_controllen) = cmsg_buffer.as_mut() + .map(|v| (v.as_mut_ptr(), v.capacity())) + .unwrap_or((ptr::null_mut(), 0)); + + let mhdr = { + // Musl's msghdr has private fields, so this is the only way to + // initialize it. + let mut mhdr = mem::MaybeUninit::::zeroed(); + let p = mhdr.as_mut_ptr(); + (*p).msg_name = address as *mut c_void; + (*p).msg_namelen = mem::size_of::() as socklen_t; + (*p).msg_iov = iov.as_ref().as_ptr() as *mut iovec; + (*p).msg_iovlen = iov.as_ref().len() as _; + (*p).msg_control = msg_control as *mut c_void; + (*p).msg_controllen = msg_controllen as _; + (*p).msg_flags = 0; + mhdr.assume_init() + }; + + (msg_controllen, mhdr) +} + +fn pack_mhdr_to_send<'a, I, C>( + cmsg_buffer: &mut [u8], + iov: I, + cmsgs: C, + addr: Option<&SockAddr> +) -> msghdr + where + I: AsRef<[IoVec<&'a [u8]>]>, + C: AsRef<[ControlMessage<'a>]> +{ + let capacity = cmsg_buffer.len(); // Next encode the sending address, if provided let (name, namelen) = match addr { @@ -846,8 +1136,8 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage], (*p).msg_namelen = namelen; // transmute iov into a mutable pointer. sendmsg doesn't really mutate // the buffer, but the standard says that it takes a mutable pointer - (*p).msg_iov = iov.as_ptr() as *mut _; - (*p).msg_iovlen = iov.len() as _; + (*p).msg_iov = iov.as_ref().as_ptr() as *mut _; + (*p).msg_iovlen = iov.as_ref().len() as _; (*p).msg_control = cmsg_ptr; (*p).msg_controllen = capacity as _; (*p).msg_flags = 0; @@ -857,19 +1147,17 @@ pub fn sendmsg(fd: RawFd, iov: &[IoVec<&[u8]>], cmsgs: &[ControlMessage], // Encode each cmsg. This must happen after initializing the header because // CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields. // CMSG_FIRSTHDR is always safe - let mut pmhdr: *mut cmsghdr = unsafe{CMSG_FIRSTHDR(&mhdr as *const msghdr)}; - for cmsg in cmsgs { + let mut pmhdr: *mut cmsghdr = unsafe { CMSG_FIRSTHDR(&mhdr as *const msghdr) }; + for cmsg in cmsgs.as_ref() { assert_ne!(pmhdr, ptr::null_mut()); // Safe because we know that pmhdr is valid, and we initialized it with // sufficient space unsafe { cmsg.encode_into(pmhdr) }; // Safe because mhdr is valid - pmhdr = unsafe{CMSG_NXTHDR(&mhdr as *const msghdr, pmhdr)}; + pmhdr = unsafe { CMSG_NXTHDR(&mhdr as *const msghdr, pmhdr) }; } - let ret = unsafe { libc::sendmsg(fd, &mhdr, flags.bits()) }; - - Errno::result(ret).map(|r| r as usize) + mhdr } /// Receive message in scatter-gather vectors from a socket, and @@ -891,57 +1179,16 @@ pub fn recvmsg<'a>(fd: RawFd, iov: &[IoVec<&mut [u8]>], flags: MsgFlags) -> Result> { let mut address = mem::MaybeUninit::uninit(); - let (msg_control, msg_controllen) = cmsg_buffer.as_mut() - .map(|v| (v.as_mut_ptr(), v.capacity())) - .unwrap_or((ptr::null_mut(), 0)); - let mut mhdr = { - unsafe { - // Musl's msghdr has private fields, so this is the only way to - // initialize it. - let mut mhdr = mem::MaybeUninit::::zeroed(); - let p = mhdr.as_mut_ptr(); - (*p).msg_name = address.as_mut_ptr() as *mut c_void; - (*p).msg_namelen = mem::size_of::() as socklen_t; - (*p).msg_iov = iov.as_ptr() as *mut iovec; - (*p).msg_iovlen = iov.len() as _; - (*p).msg_control = msg_control as *mut c_void; - (*p).msg_controllen = msg_controllen as _; - (*p).msg_flags = 0; - mhdr.assume_init() - } + + let (msg_controllen, mut mhdr) = unsafe { + pack_mhdr_to_receive(&iov, &mut cmsg_buffer, address.as_mut_ptr()) }; let ret = unsafe { libc::recvmsg(fd, &mut mhdr, flags.bits()) }; - Errno::result(ret).map(|r| { - let cmsghdr = unsafe { - if mhdr.msg_controllen > 0 { - // got control message(s) - cmsg_buffer - .as_mut() - .unwrap() - .set_len(mhdr.msg_controllen as usize); - debug_assert!(!mhdr.msg_control.is_null()); - debug_assert!(msg_controllen >= mhdr.msg_controllen as usize); - CMSG_FIRSTHDR(&mhdr as *const msghdr) - } else { - ptr::null() - }.as_ref() - }; + let r = Errno::result(ret)?; - let address = unsafe { - sockaddr_storage_to_addr(&address.assume_init(), - mhdr.msg_namelen as usize - ).ok() - }; - RecvMsg { - bytes: r as usize, - cmsghdr, - address, - flags: MsgFlags::from_bits_truncate(mhdr.msg_flags), - mhdr, - } - }) + Ok(unsafe { read_mhdr(mhdr, r, msg_controllen, address.assume_init(), &mut cmsg_buffer) }) } -- cgit v1.2.3