summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/sys/socket/mod.rs509
-rw-r--r--test/sys/test_socket.rs108
2 files changed, 420 insertions, 197 deletions
diff --git a/src/sys/socket/mod.rs b/src/sys/socket/mod.rs
index cabc2301..aec09dc8 100644
--- a/src/sys/socket/mod.rs
+++ b/src/sys/socket/mod.rs
@@ -567,15 +567,20 @@ macro_rules! cmsg_space {
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub struct RecvMsg<'a, S> {
+/// Contains outcome of sending or receiving a message
+///
+/// Use [`cmsgs`][RecvMsg::cmsgs] to access all the control messages present, and
+/// [`iovs`][RecvMsg::iovs`] to access underlying io slices.
+pub struct RecvMsg<'a, 's, S> {
pub bytes: usize,
cmsghdr: Option<&'a cmsghdr>,
pub address: Option<S>,
pub flags: MsgFlags,
+ iobufs: std::marker::PhantomData<& 's()>,
mhdr: msghdr,
}
-impl<'a, S> RecvMsg<'a, S> {
+impl<'a, S> RecvMsg<'a, '_, S> {
/// Iterate over the valid control messages pointed to by this
/// msghdr.
pub fn cmsgs(&self) -> CmsgIterator {
@@ -1454,24 +1459,6 @@ pub fn sendmsg<S>(fd: RawFd, iov: &[IoSlice<'_>], cmsgs: &[ControlMessage],
Errno::result(ret).map(|r| r as usize)
}
-#[cfg(any(
- target_os = "linux",
- target_os = "android",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-#[derive(Debug)]
-pub struct SendMmsgData<'a, I, C, S>
- where
- I: AsRef<[IoSlice<'a>]>,
- C: AsRef<[ControlMessage<'a>]>,
- S: SockaddrLike + 'a
-{
- pub iov: I,
- pub cmsgs: C,
- pub addr: Option<S>,
- pub _lt: std::marker::PhantomData<&'a I>,
-}
/// An extension of `sendmsg` that allows the caller to transmit multiple
/// messages on a socket using a single system call. This has performance
@@ -1496,51 +1483,66 @@ pub struct SendMmsgData<'a, I, C, S>
target_os = "freebsd",
target_os = "netbsd",
))]
-pub fn sendmmsg<'a, I, C, S>(
+pub fn sendmmsg<'a, XS, AS, C, I, S>(
fd: RawFd,
- data: impl std::iter::IntoIterator<Item=&'a SendMmsgData<'a, I, C, S>>,
+ data: &'a mut MultiHeaders<S>,
+ slices: XS,
+ // one address per group of slices
+ addrs: AS,
+ // shared across all the messages
+ cmsgs: C,
flags: MsgFlags
-) -> Result<Vec<usize>>
+) -> crate::Result<MultiResults<'a, S>>
where
+ XS: IntoIterator<Item = &'a I>,
+ AS: AsRef<[Option<S>]>,
I: AsRef<[IoSlice<'a>]> + 'a,
C: AsRef<[ControlMessage<'a>]> + 'a,
S: SockaddrLike + 'a
{
- let iter = data.into_iter();
- let size_hint = iter.size_hint();
- let reserve_items = size_hint.1.unwrap_or(size_hint.0);
+ let mut count = 0;
- let mut output = Vec::<libc::mmsghdr>::with_capacity(reserve_items);
- let mut cmsgs_buffers = Vec::<Vec<u8>>::with_capacity(reserve_items);
+ for (i, ((slice, addr), mmsghdr)) in slices.into_iter().zip(addrs.as_ref()).zip(data.items.iter_mut() ).enumerate() {
+ let mut p = &mut mmsghdr.msg_hdr;
+ p.msg_iov = slice.as_ref().as_ptr() as *mut libc::iovec;
+ p.msg_iovlen = slice.as_ref().len() as _;
- for d in iter {
- let capacity: usize = d.cmsgs.as_ref().iter().map(|c| c.space()).sum();
- let mut cmsgs_buffer = vec![0u8; capacity];
+ p.msg_namelen = addr.as_ref().map_or(0, S::len);
+ p.msg_name = addr.as_ref().map_or(ptr::null(), S::as_ptr) as _;
- output.push(libc::mmsghdr {
- msg_hdr: pack_mhdr_to_send(
- &mut cmsgs_buffer,
- &d.iov,
- &d.cmsgs,
- d.addr.as_ref()
- ),
- msg_len: 0,
- });
- cmsgs_buffers.push(cmsgs_buffer);
- };
+ // Encode each cmsg. This must happen after initializing the header because
+ // CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields.
+ // CMSG_FIRSTHDR is always safe
+ let mut pmhdr: *mut cmsghdr = unsafe { CMSG_FIRSTHDR(p) };
+ for cmsg in cmsgs.as_ref() {
+ assert_ne!(pmhdr, ptr::null_mut());
+ // Safe because we know that pmhdr is valid, and we initialized it with
+ // sufficient space
+ unsafe { cmsg.encode_into(pmhdr) };
+ // Safe because mhdr is valid
+ pmhdr = unsafe { CMSG_NXTHDR(p, pmhdr) };
+ }
- let ret = unsafe { libc::sendmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _) };
+ count = i+1;
+ }
- let sent_messages = Errno::result(ret)? as usize;
- let mut sent_bytes = Vec::with_capacity(sent_messages);
+ let sent = Errno::result(unsafe {
+ libc::sendmmsg(
+ fd,
+ data.items.as_mut_ptr(),
+ count as _,
+ flags.bits() as _
+ )
+ })? as usize;
- for item in &output {
- sent_bytes.push(item.msg_len as usize);
- }
+ Ok(MultiResults {
+ rmm: data,
+ current_index: 0,
+ received: sent
+ })
- Ok(sent_bytes)
}
@@ -1551,128 +1553,334 @@ pub fn sendmmsg<'a, I, C, S>(
target_os = "netbsd",
))]
#[derive(Debug)]
-pub struct RecvMmsgData<'a, I>
+/// Preallocated structures needed for [`recvmmsg`] and [`sendmmsg`] functions
+pub struct MultiHeaders<S> {
+ // preallocated boxed slice of mmsghdr
+ items: Box<[libc::mmsghdr]>,
+ addresses: Box<[mem::MaybeUninit<S>]>,
+ // while we are not using it directly - this is used to store control messages
+ // and we retain pointers to them inside items array
+ #[allow(dead_code)]
+ cmsg_buffers: Option<Box<[u8]>>,
+ msg_controllen: usize,
+}
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "netbsd",
+))]
+impl<S> MultiHeaders<S> {
+ /// Preallocate structure used by [`recvmmsg`] and [`sendmmsg`] takes number of headers to preallocate
+ ///
+ /// `cmsg_buffer` should be created with [`cmsg_space!`] if needed
+ pub fn preallocate(num_slices: usize, cmsg_buffer: Option<Vec<u8>>) -> Self
where
- I: AsRef<[IoSliceMut<'a>]> + 'a,
-{
- pub iov: I,
- pub cmsg_buffer: Option<&'a mut Vec<u8>>,
+ S: Copy + SockaddrLike,
+ {
+ // we will be storing pointers to addresses inside mhdr - convert it into boxed
+ // slice so it can'be changed later by pushing anything into self.addresses
+ let mut addresses = vec![std::mem::MaybeUninit::uninit(); num_slices].into_boxed_slice();
+
+ let msg_controllen = cmsg_buffer.as_ref().map_or(0, |v| v.capacity());
+
+ // we'll need a cmsg_buffer for each slice, we preallocate a vector and split
+ // it into "slices" parts
+ let cmsg_buffers =
+ cmsg_buffer.map(|v| vec![0u8; v.capacity() * num_slices].into_boxed_slice());
+
+ let items = addresses
+ .iter_mut()
+ .enumerate()
+ .map(|(ix, address)| {
+ let (ptr, cap) = match &cmsg_buffers {
+ Some(v) => ((&v[ix * msg_controllen] as *const u8), msg_controllen),
+ None => (std::ptr::null(), 0),
+ };
+ let msg_hdr = unsafe { pack_mhdr_to_receive(std::ptr::null(), 0, ptr, cap, address.as_mut_ptr()) };
+ libc::mmsghdr {
+ msg_hdr,
+ msg_len: 0,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ Self {
+ items: items.into_boxed_slice(),
+ addresses,
+ cmsg_buffers,
+ msg_controllen,
+ }
+ }
}
-/// An extension of `recvmsg` that allows the caller to receive multiple
-/// messages from a socket using a single system call. This has
-/// performance benefits for some applications.
-///
-/// `iov` and `cmsg_buffer` should be constructed similarly to `recvmsg`
-///
-/// Multiple allocations are performed
+/// An extension of recvmsg that allows the caller to receive multiple messages from a socket using a single system call.
///
-/// # Arguments
-///
-/// * `fd`: Socket file descriptor
-/// * `data`: Struct that implements `IntoIterator` with `RecvMmsgData` items
-/// * `flags`: Optional flags passed directly to the operating system.
+/// This has performance benefits for some applications.
///
-/// # RecvMmsgData
+/// This method performs no allocations.
///
-/// * `iov`: Scatter-gather list of buffers to receive the message
-/// * `cmsg_buffer`: Space to receive ancillary data. Should be created by
-/// [`cmsg_space!`](../../macro.cmsg_space.html)
+/// Returns an iterator producing [`RecvMsg`], one per received messages. Each `RecvMsg` can produce
+/// iterators over [`IoSlice`] with [`iovs`][RecvMsg::iovs`] and
+/// `ControlMessageOwned` with [`cmsgs`][RecvMsg::cmsgs].
///
-/// # Returns
-/// A `Vec` with multiple `RecvMsg`, one per received message
+/// # Bugs (in underlying implementation, at least in Linux)
+/// The timeout argument does not work as intended. The timeout is checked only after the receipt
+/// of each datagram, so that if up to `vlen`-1 datagrams are received before the timeout expires,
+/// but then no further datagrams are received, the call will block forever.
///
-/// # References
-/// - [`recvmsg`](fn.recvmsg.html)
-/// - [`RecvMsg`](struct.RecvMsg.html)
+/// If an error occurs after at least one message has been received, the call succeeds, and returns
+/// the number of messages received. The error code is expected to be returned on a subsequent
+/// call to recvmmsg(). In the current implementation, however, the error code can be
+/// overwritten in the meantime by an unrelated network event on a socket, for example an
+/// incoming ICMP packet.
#[cfg(any(
target_os = "linux",
target_os = "android",
target_os = "freebsd",
target_os = "netbsd",
))]
-#[allow(clippy::needless_collect)] // Complicated false positive
-pub fn recvmmsg<'a, I, S>(
+pub fn recvmmsg<'a, XS, S, I>(
fd: RawFd,
- data: impl std::iter::IntoIterator<Item=&'a mut RecvMmsgData<'a, I>,
- IntoIter=impl ExactSizeIterator + Iterator<Item=&'a mut RecvMmsgData<'a, I>>>,
+ data: &'a mut MultiHeaders<S>,
+ slices: XS,
flags: MsgFlags,
- timeout: Option<crate::sys::time::TimeSpec>
-) -> Result<Vec<RecvMsg<'a, S>>>
- where
- I: AsRef<[IoSliceMut<'a>]> + 'a,
- S: Copy + SockaddrLike + 'a
+ mut timeout: Option<crate::sys::time::TimeSpec>,
+) -> crate::Result<MultiResults<'a, S>>
+where
+ XS: IntoIterator<Item = &'a I>,
+ I: AsRef<[IoSliceMut<'a>]> + 'a,
{
- let iter = data.into_iter();
-
- let num_messages = iter.len();
-
- let mut output: Vec<libc::mmsghdr> = Vec::with_capacity(num_messages);
-
- // Addresses should be pre-allocated. pack_mhdr_to_receive will store them
- // as raw pointers, so we may not move them. Turn the vec into a boxed
- // slice so we won't inadvertently reallocate the vec.
- let mut addresses = vec![mem::MaybeUninit::uninit(); num_messages]
- .into_boxed_slice();
-
- let results: Vec<_> = iter.enumerate().map(|(i, d)| {
- let (msg_control, msg_controllen) = d.cmsg_buffer.as_mut()
- .map(|v| (v.as_mut_ptr(), v.capacity()))
- .unwrap_or((ptr::null_mut(), 0));
- let mhdr = unsafe {
- pack_mhdr_to_receive(
- d.iov.as_ref().as_ptr(),
- d.iov.as_ref().len(),
- msg_control,
- msg_controllen,
- addresses[i].as_mut_ptr(),
+ let mut count = 0;
+ for (i, (slice, mmsghdr)) in slices.into_iter().zip(data.items.iter_mut()).enumerate() {
+ let mut p = &mut mmsghdr.msg_hdr;
+ p.msg_iov = slice.as_ref().as_ptr() as *mut libc::iovec;
+ p.msg_iovlen = slice.as_ref().len() as _;
+ count = i + 1;
+ }
+
+ let timeout_ptr = timeout
+ .as_mut()
+ .map_or_else(std::ptr::null_mut, |t| t as *mut _ as *mut libc::timespec);
+
+ let received = Errno::result(unsafe {
+ libc::recvmmsg(
+ fd,
+ data.items.as_mut_ptr(),
+ count as _,
+ flags.bits() as _,
+ timeout_ptr,
+ )
+ })? as usize;
+
+ Ok(MultiResults {
+ rmm: data,
+ current_index: 0,
+ received,
+ })
+}
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "netbsd",
+))]
+#[derive(Debug)]
+/// Iterator over results of [`recvmmsg`]/[`sendmmsg`]
+///
+///
+pub struct MultiResults<'a, S> {
+ // preallocated structures
+ rmm: &'a MultiHeaders<S>,
+ current_index: usize,
+ received: usize,
+}
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ target_os = "freebsd",
+ target_os = "netbsd",
+))]
+impl<'a, S> Iterator for MultiResults<'a, S>
+where
+ S: Copy + SockaddrLike,
+{
+ type Item = RecvMsg<'a, 'a, S>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.current_index >= self.received {
+ return None;
+ }
+ let mmsghdr = self.rmm.items[self.current_index];
+
+ // as long as we are not reading past the index writen by recvmmsg - address
+ // will be initialized
+ let address = unsafe { self.rmm.addresses[self.current_index].assume_init() };
+
+ self.current_index += 1;
+ Some(unsafe {
+ read_mhdr(
+ mmsghdr.msg_hdr,
+ mmsghdr.msg_len as isize,
+ self.rmm.msg_controllen,
+ address,
)
+ })
+ }
+}
+
+impl<'a, S> RecvMsg<'_, 'a, S> {
+ /// Iterate over the filled io slices pointed by this msghdr
+ pub fn iovs(&self) -> IoSliceIterator<'a> {
+ IoSliceIterator {
+ index: 0,
+ remaining: self.bytes,
+ slices: unsafe {
+ // safe for as long as mgdr is properly initialized and references are valid.
+ // for multi messages API we initialize it with an empty
+ // slice and replace with a concrete buffer
+ // for single message API we hold a lifetime reference to ioslices
+ std::slice::from_raw_parts(self.mhdr.msg_iov as *const _, self.mhdr.msg_iovlen as _)
+ },
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct IoSliceIterator<'a> {
+ index: usize,
+ remaining: usize,
+ slices: &'a [IoSlice<'a>],
+}
+
+impl<'a> Iterator for IoSliceIterator<'a> {
+ type Item = &'a [u8];
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.index >= self.slices.len() {
+ return None;
+ }
+ let slice = &self.slices[self.index][..self.remaining.min(self.slices[self.index].len())];
+ self.remaining -= slice.len();
+ self.index += 1;
+ if slice.is_empty() {
+ return None;
+ }
+
+ Some(slice)
+ }
+}
+
+// test contains both recvmmsg and timestaping which is linux only
+// there are existing tests for recvmmsg only in tests/
+#[cfg(target_os = "linux")]
+#[cfg(test)]
+mod test {
+ use crate::sys::socket::{AddressFamily, ControlMessageOwned};
+ use crate::*;
+ use std::str::FromStr;
+
+ #[cfg_attr(qemu, ignore)]
+ #[test]
+ fn test_recvmm2() -> crate::Result<()> {
+ use crate::sys::socket::{
+ sendmsg, setsockopt, socket, sockopt::Timestamping, MsgFlags, SockFlag, SockType,
+ SockaddrIn, TimestampingFlag,
};
+ use std::io::{IoSlice, IoSliceMut};
- output.push(
- libc::mmsghdr {
- msg_hdr: mhdr,
- msg_len: 0,
- }
- );
+ let sock_addr = SockaddrIn::from_str("127.0.0.1:6790").unwrap();
- msg_controllen as usize
- }).collect();
+ let ssock = socket(
+ AddressFamily::Inet,
+ SockType::Datagram,
+ SockFlag::empty(),
+ None,
+ )?;
- let timeout = if let Some(mut t) = timeout {
- t.as_mut() as *mut libc::timespec
- } else {
- ptr::null_mut()
- };
+ let rsock = socket(
+ AddressFamily::Inet,
+ SockType::Datagram,
+ SockFlag::SOCK_NONBLOCK,
+ None,
+ )?;
+
+ crate::sys::socket::bind(rsock, &sock_addr)?;
+
+ setsockopt(rsock, Timestamping, &TimestampingFlag::all())?;
+
+ let sbuf = (0..400).map(|i| i as u8).collect::<Vec<_>>();
+
+ let mut recv_buf = vec![0; 1024];
- let ret = unsafe { libc::recvmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _, timeout) };
-
- let _ = Errno::result(ret)?;
-
- Ok(output
- .into_iter()
- .take(ret as usize)
- .zip(addresses.iter().map(|addr| unsafe{addr.assume_init()}))
- .zip(results.into_iter())
- .map(|((mmsghdr, address), msg_controllen)| {
- unsafe {
- read_mhdr(
- mmsghdr.msg_hdr,
- mmsghdr.msg_len as isize,
- msg_controllen,
- address,
- )
+ let mut recv_iovs = Vec::new();
+ let mut pkt_iovs = Vec::new();
+
+ for (ix, chunk) in recv_buf.chunks_mut(256).enumerate() {
+ pkt_iovs.push(IoSliceMut::new(chunk));
+ if ix % 2 == 1 {
+ recv_iovs.push(pkt_iovs);
+ pkt_iovs = Vec::new();
+ }
+ }
+ drop(pkt_iovs);
+
+ let flags = MsgFlags::empty();
+ let iov1 = [IoSlice::new(&sbuf)];
+
+ let cmsg = cmsg_space!(crate::sys::socket::Timestamps);
+ sendmsg(ssock, &iov1, &[], flags, Some(&sock_addr)).unwrap();
+
+ let mut data = super::MultiHeaders::<()>::preallocate(recv_iovs.len(), Some(cmsg));
+
+ let t = sys::time::TimeSpec::from_duration(std::time::Duration::from_secs(10));
+
+ let recv = super::recvmmsg(rsock, &mut data, recv_iovs.iter(), flags, Some(t))?;
+
+ for rmsg in recv {
+ #[cfg(not(qemu))]
+ let mut saw_time = false;
+ let mut recvd = 0;
+ for cmsg in rmsg.cmsgs() {
+ if let ControlMessageOwned::ScmTimestampsns(timestamps) = cmsg {
+ let ts = timestamps.system;
+
+ let sys_time =
+ crate::time::clock_gettime(crate::time::ClockId::CLOCK_REALTIME)?;
+ let diff = if ts > sys_time {
+ ts - sys_time
+ } else {
+ sys_time - ts
+ };
+ assert!(std::time::Duration::from(diff).as_secs() < 60);
+ #[cfg(not(qemu))]
+ {
+ saw_time = true;
+ }
+ }
}
- })
- .collect())
-}
-unsafe fn read_mhdr<'a, S>(
+ #[cfg(not(qemu))]
+ assert!(saw_time);
+
+ for iov in rmsg.iovs() {
+ recvd += iov.len();
+ }
+ assert_eq!(recvd, 400);
+ }
+
+ Ok(())
+ }
+}
+unsafe fn read_mhdr<'a, 'i, S>(
mhdr: msghdr,
r: isize,
msg_controllen: usize,
address: S,
-) -> RecvMsg<'a, S>
+) -> RecvMsg<'a, 'i, S>
where S: SockaddrLike
{
// The cast is not unnecessary on all platforms.
@@ -1693,9 +1901,21 @@ unsafe fn read_mhdr<'a, S>(
address: Some(address),
flags: MsgFlags::from_bits_truncate(mhdr.msg_flags),
mhdr,
+ iobufs: std::marker::PhantomData,
}
}
+/// Pack pointers to various structures into into msghdr
+///
+/// # Safety
+/// `iov_buffer` and `iov_buffer_len` must point to a slice
+/// of `IoSliceMut` and number of available elements or be a null pointer and 0
+///
+/// `cmsg_buffer` and `cmsg_capacity` must point to a byte buffer used
+/// to store control headers later or be a null pointer and 0 if control
+/// headers are not used
+///
+/// Buffers must remain valid for the whole lifetime of msghdr
unsafe fn pack_mhdr_to_receive<S>(
iov_buffer: *const IoSliceMut,
iov_buffer_len: usize,
@@ -1789,8 +2009,9 @@ fn pack_mhdr_to_send<'a, I, C, S>(
/// [recvmsg(2)](https://pubs.opengroup.org/onlinepubs/9699919799/functions/recvmsg.html)
pub fn recvmsg<'a, 'outer, 'inner, S>(fd: RawFd, iov: &'outer mut [IoSliceMut<'inner>],
mut cmsg_buffer: Option<&'a mut Vec<u8>>,
- flags: MsgFlags) -> Result<RecvMsg<'a, S>>
- where S: SockaddrLike + 'a
+ flags: MsgFlags) -> Result<RecvMsg<'a, 'inner, S>>
+ where S: SockaddrLike + 'a,
+ 'inner: 'outer
{
let mut address = mem::MaybeUninit::uninit();
diff --git a/test/sys/test_socket.rs b/test/sys/test_socket.rs
index b4ca279d..7ab60ecc 100644
--- a/test/sys/test_socket.rs
+++ b/test/sys/test_socket.rs
@@ -501,31 +501,31 @@ mod recvfrom {
rsock,
ssock,
move |s, m, flags| {
- let iov = [IoSlice::new(m)];
- let mut msgs = vec![SendMmsgData {
- iov: &iov,
- cmsgs: &[],
- addr: Some(sock_addr),
- _lt: Default::default(),
- }];
-
let batch_size = 15;
+ let mut iovs = Vec::with_capacity(1 + batch_size);
+ let mut addrs = Vec::with_capacity(1 + batch_size);
+ let mut data = MultiHeaders::preallocate(1 + batch_size, None);
+ let iov = IoSlice::new(m);
+ // first chunk:
+ iovs.push([iov]);
+ addrs.push(Some(sock_addr));
for _ in 0..batch_size {
- msgs.push(SendMmsgData {
- iov: &iov,
- cmsgs: &[],
- addr: Some(sock_addr2),
- _lt: Default::default(),
- });
+ iovs.push([iov]);
+ addrs.push(Some(sock_addr2));
}
- sendmmsg(s, msgs.iter(), flags).map(move |sent_bytes| {
- assert!(!sent_bytes.is_empty());
- for sent in &sent_bytes {
- assert_eq!(*sent, m.len());
- }
- sent_bytes.len()
- })
+
+ let res = sendmmsg(s, &mut data, &iovs, addrs, [], flags)?;
+ let mut sent_messages = 0;
+ let mut sent_bytes = 0;
+ for item in res {
+ sent_messages += 1;
+ sent_bytes += item.bytes;
+ }
+ //
+ assert_eq!(sent_messages, iovs.len());
+ assert_eq!(sent_bytes, sent_messages * m.len());
+ Ok(sent_messages)
},
|_, _| {},
);
@@ -577,21 +577,19 @@ mod recvfrom {
// Buffers to receive exactly `NUM_MESSAGES_SENT` messages
let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT];
- let iovs: Vec<_> = receive_buffers
- .iter_mut()
- .map(|buf| [IoSliceMut::new(&mut buf[..])])
- .collect();
+ msgs.extend(
+ receive_buffers
+ .iter_mut()
+ .map(|buf| [IoSliceMut::new(&mut buf[..])]),
+ );
- for iov in &iovs {
- msgs.push_back(RecvMmsgData {
- iov,
- cmsg_buffer: None,
- })
- }
+ let mut data =
+ MultiHeaders::<SockaddrIn>::preallocate(msgs.len(), None);
let res: Vec<RecvMsg<SockaddrIn>> =
- recvmmsg(rsock, &mut msgs, MsgFlags::empty(), None)
- .expect("recvmmsg");
+ recvmmsg(rsock, &mut data, msgs.iter(), MsgFlags::empty(), None)
+ .expect("recvmmsg")
+ .collect();
assert_eq!(res.len(), DATA.len());
for RecvMsg { address, bytes, .. } in res.into_iter() {
@@ -655,21 +653,26 @@ mod recvfrom {
// will return when there are fewer than requested messages in the
// kernel buffers when using `MSG_DONTWAIT`.
let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT + 2];
- let iovs: Vec<_> = receive_buffers
- .iter_mut()
- .map(|buf| [IoSliceMut::new(&mut buf[..])])
- .collect();
+ msgs.extend(
+ receive_buffers
+ .iter_mut()
+ .map(|buf| [IoSliceMut::new(&mut buf[..])]),
+ );
- for iov in &iovs {
- msgs.push_back(RecvMmsgData {
- iov,
- cmsg_buffer: None,
- })
- }
+ let mut data = MultiHeaders::<SockaddrIn>::preallocate(
+ NUM_MESSAGES_SENT + 2,
+ None,
+ );
- let res: Vec<RecvMsg<SockaddrIn>> =
- recvmmsg(rsock, &mut msgs, MsgFlags::MSG_DONTWAIT, None)
- .expect("recvmmsg");
+ let res: Vec<RecvMsg<SockaddrIn>> = recvmmsg(
+ rsock,
+ &mut data,
+ msgs.iter(),
+ MsgFlags::MSG_DONTWAIT,
+ None,
+ )
+ .expect("recvmmsg")
+ .collect();
assert_eq!(res.len(), NUM_MESSAGES_SENT);
for RecvMsg { address, bytes, .. } in res.into_iter() {
@@ -2205,14 +2208,13 @@ fn test_recvmmsg_timestampns() {
assert_eq!(message.len(), l);
// Receive the message
let mut buffer = vec![0u8; message.len()];
- let mut cmsgspace = nix::cmsg_space!(TimeSpec);
- let iov = [IoSliceMut::new(&mut buffer)];
- let mut data = vec![RecvMmsgData {
- iov,
- cmsg_buffer: Some(&mut cmsgspace),
- }];
+ let cmsgspace = nix::cmsg_space!(TimeSpec);
+ let iov = vec![[IoSliceMut::new(&mut buffer)]];
+ let mut data = MultiHeaders::preallocate(1, Some(cmsgspace));
let r: Vec<RecvMsg<()>> =
- recvmmsg(in_socket, &mut data, flags, None).unwrap();
+ recvmmsg(in_socket, &mut data, iov.iter(), flags, None)
+ .unwrap()
+ .collect();
let rtime = match r[0].cmsgs().next() {
Some(ControlMessageOwned::ScmTimestampns(rtime)) => rtime,
Some(_) => panic!("Unexpected control message"),