summaryrefslogtreecommitdiff
path: root/src/message.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message.rs')
-rw-r--r--src/message.rs111
1 files changed, 107 insertions, 4 deletions
diff --git a/src/message.rs b/src/message.rs
index 6b093a4..8a2e1b6 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::io::Write;
-use byteorder::{LittleEndian, WriteBytesExt};
+use std::io::{Read, Write, Cursor};
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+use std::iter::once;
+use std::collections::HashMap;
use tag::Tag;
use error::Error;
@@ -21,7 +23,7 @@ use error::Error;
///
/// A Roughtime protocol message; a map of u32 tags to arbitrary byte-strings.
///
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RtMessage {
tags: Vec<Tag>,
values: Vec<Vec<u8>>,
@@ -34,13 +36,77 @@ impl RtMessage {
///
/// * `num_fields` - Reserve space for this many fields.
///
- pub fn new(num_fields: u8) -> Self {
+ pub fn new(num_fields: u32) -> Self {
RtMessage {
tags: Vec::with_capacity(num_fields as usize),
values: Vec::with_capacity(num_fields as usize),
}
}
+ pub fn from_bytes(bytes: &[u8]) -> Result<Self, Error> {
+ let mut msg = Cursor::new(bytes);
+
+ let num_tags = msg.read_u32::<LittleEndian>()?;
+ let mut rt_msg = RtMessage::new(num_tags);
+
+ if num_tags == 1 {
+ let pos = msg.position() as usize;
+ let tag = Tag::from_wire(&bytes[pos..pos+4])?;
+ msg.set_position((pos + 4) as u64);
+
+ let mut value = Vec::new();
+
+ msg.read_to_end(&mut value).unwrap();
+ rt_msg.add_field(tag, &value)?;
+ return Ok(rt_msg)
+ }
+
+ let mut offsets = Vec::with_capacity((num_tags - 1) as usize);
+ let mut tags = Vec::with_capacity(num_tags as usize);
+
+ for _ in 0..num_tags - 1 {
+ let offset = msg.read_u32::<LittleEndian>()?;
+ if offset % 4 != 0 {
+ panic!("Invalid offset {:?} in message {:?}", offset, bytes);
+ }
+ offsets.push(offset as usize);
+ }
+
+
+ let mut buf = [0; 4];
+ for _ in 0..num_tags {
+ msg.read_exact(&mut buf).unwrap();
+ let tag = Tag::from_wire(&buf)?;
+
+ if let Some(last_tag) = tags.last() {
+ if tag <= *last_tag {
+ return Err(Error::TagNotStrictlyIncreasing(tag))
+ }
+ }
+ tags.push(tag);
+ }
+
+ // All offsets are relative to the end of the header,
+ // which is our current position
+ let header_end = msg.position() as usize;
+ // Compute the end of the last value,
+ // as an offset from the end of the header
+ let msg_end = bytes.len() - header_end;
+
+ assert_eq!(offsets.len(), tags.len() - 1);
+
+ for (tag, (value_start, value_end)) in tags.into_iter().zip(
+ once(&0).chain(offsets.iter()).zip(
+ offsets.iter().chain(once(&msg_end))
+ )
+ ) {
+
+ let value = bytes[(header_end + value_start)..(header_end + value_end)].to_vec();
+ rt_msg.add_field(tag, &value)?;
+ }
+ Ok(rt_msg)
+ }
+
/// Add a field to this `RtMessage`
///
/// ## Arguments
@@ -69,6 +135,18 @@ impl RtMessage {
self.tags.len() as u32
}
+ pub fn tags(&self) -> &[Tag] {
+ &self.tags
+ }
+
+ pub fn values(&self) -> &[Vec<u8>] {
+ &self.values
+ }
+
+ pub fn into_hash_map(self) -> HashMap<Tag, Vec<u8>> {
+ self.tags.into_iter().zip(self.values.into_iter()).collect()
+ }
+
/// Encode this message into its on-the-wire representation.
pub fn encode(&self) -> Result<Vec<u8>, Error> {
let num_tags = self.tags.len();
@@ -112,6 +190,31 @@ impl RtMessage {
4 + tags_size + offsets_size + values_size
}
+
+ /// Adds a PAD tag to the end of this message, with a length
+ /// set such that the final encoded size of this message is 1KB
+ ///
+ /// If the encoded size of this message is already >= 1KB,
+ /// this method does nothing
+ pub fn pad_to_kilobyte(&mut self) {
+ let size = self.encoded_size();
+ if size >= 1024 {
+ return;
+ }
+
+ let mut padding_needed = 1024 - size;
+ if self.tags.len() == 1 {
+ // If we currently only have one tag, adding a padding tag will cause
+ // a 32-bit offset values to be written
+ padding_needed -= 4;
+ }
+ padding_needed -= Tag::PAD.wire_value().len();
+ let padding = vec![0; padding_needed];
+
+ self.add_field(Tag::PAD, &padding).unwrap();
+
+ assert_eq!(self.encoded_size(), 1024);
+ }
}
#[cfg(test)]