diff options
Diffstat (limited to 'src/message.rs')
-rw-r--r-- | src/message.rs | 111 |
1 files changed, 107 insertions, 4 deletions
diff --git a/src/message.rs b/src/message.rs index 6b093a4..8a2e1b6 100644 --- a/src/message.rs +++ b/src/message.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Write; -use byteorder::{LittleEndian, WriteBytesExt}; +use std::io::{Read, Write, Cursor}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use std::iter::once; +use std::collections::HashMap; use tag::Tag; use error::Error; @@ -21,7 +23,7 @@ use error::Error; /// /// A Roughtime protocol message; a map of u32 tags to arbitrary byte-strings. /// -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RtMessage { tags: Vec<Tag>, values: Vec<Vec<u8>>, @@ -34,13 +36,77 @@ impl RtMessage { /// /// * `num_fields` - Reserve space for this many fields. /// - pub fn new(num_fields: u8) -> Self { + pub fn new(num_fields: u32) -> Self { RtMessage { tags: Vec::with_capacity(num_fields as usize), values: Vec::with_capacity(num_fields as usize), } } + pub fn from_bytes(bytes: &[u8]) -> Result<Self, Error> { + let mut msg = Cursor::new(bytes); + + let num_tags = msg.read_u32::<LittleEndian>()?; + let mut rt_msg = RtMessage::new(num_tags); + + if num_tags == 1 { + let pos = msg.position() as usize; + let tag = Tag::from_wire(&bytes[pos..pos+4])?; + msg.set_position((pos + 4) as u64); + + let mut value = Vec::new(); + + msg.read_to_end(&mut value).unwrap(); + rt_msg.add_field(tag, &value)?; + return Ok(rt_msg) + } + + let mut offsets = Vec::with_capacity((num_tags - 1) as usize); + let mut tags = Vec::with_capacity(num_tags as usize); + + for _ in 0..num_tags - 1 { + let offset = msg.read_u32::<LittleEndian>()?; + if offset % 4 != 0 { + panic!("Invalid offset {:?} in message {:?}", offset, bytes); + } + offsets.push(offset as usize); + } + + + let mut buf = [0; 4]; + for _ in 0..num_tags { + msg.read_exact(&mut buf).unwrap(); + let tag = Tag::from_wire(&buf)?; + + if let Some(last_tag) = tags.last() { + if tag <= *last_tag { + return Err(Error::TagNotStrictlyIncreasing(tag)) + } + } + tags.push(tag); + } + + // All offsets are relative to the end of the header, + // which is our current position + let header_end = msg.position() as usize; + // Compute the end of the last value, + // as an offset from the end of the header + let msg_end = bytes.len() - header_end; + + assert_eq!(offsets.len(), tags.len() - 1); + + for (tag, (value_start, value_end)) in tags.into_iter().zip( + once(&0).chain(offsets.iter()).zip( + offsets.iter().chain(once(&msg_end)) + ) + ) { + + let value = bytes[(header_end + value_start)..(header_end + value_end)].to_vec(); + rt_msg.add_field(tag, &value)?; + } + Ok(rt_msg) + } + /// Add a field to this `RtMessage` /// /// ## Arguments @@ -69,6 +135,18 @@ impl RtMessage { self.tags.len() as u32 } + pub fn tags(&self) -> &[Tag] { + &self.tags + } + + pub fn values(&self) -> &[Vec<u8>] { + &self.values + } + + pub fn into_hash_map(self) -> HashMap<Tag, Vec<u8>> { + self.tags.into_iter().zip(self.values.into_iter()).collect() + } + /// Encode this message into its on-the-wire representation. pub fn encode(&self) -> Result<Vec<u8>, Error> { let num_tags = self.tags.len(); @@ -112,6 +190,31 @@ impl RtMessage { 4 + tags_size + offsets_size + values_size } + + /// Adds a PAD tag to the end of this message, with a length + /// set such that the final encoded size of this message is 1KB + /// + /// If the encoded size of this message is already >= 1KB, + /// this method does nothing + pub fn pad_to_kilobyte(&mut self) { + let size = self.encoded_size(); + if size >= 1024 { + return; + } + + let mut padding_needed = 1024 - size; + if self.tags.len() == 1 { + // If we currently only have one tag, adding a padding tag will cause + // a 32-bit offset values to be written + padding_needed -= 4; + } + padding_needed -= Tag::PAD.wire_value().len(); + let padding = vec![0; padding_needed]; + + self.add_field(Tag::PAD, &padding).unwrap(); + + assert_eq!(self.encoded_size(), 1024); + } } #[cfg(test)] |