summaryrefslogtreecommitdiff
path: root/Libraries
diff options
context:
space:
mode:
authorSergey Bugaev <bugaevc@serenityos.org>2020-11-21 21:55:00 +0300
committerAndreas Kling <kling@serenityos.org>2020-11-23 18:37:40 +0100
commitfa2e3e2be42277b3c565c308e4aaef0972301563 (patch)
treea4e843a8959a76cb7f8c6c9590e955c1e8666c75 /Libraries
parentd62346c0b1e3d55a27c1a35f9606154ca19d422f (diff)
downloadserenity-fa2e3e2be42277b3c565c308e4aaef0972301563.zip
LibIPC: Prepend each message with its size
This makes it much simpler to determine when we've read a complete message, and will make it possible to integrate recvfd() in the future commit.
Diffstat (limited to 'Libraries')
-rw-r--r--Libraries/LibIPC/Connection.h47
1 files changed, 30 insertions, 17 deletions
diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h
index 4db21a6dc7..781a2d4c4f 100644
--- a/Libraries/LibIPC/Connection.h
+++ b/Libraries/LibIPC/Connection.h
@@ -35,6 +35,7 @@
#include <LibCore/SyscallUtils.h>
#include <LibCore/Timer.h>
#include <LibIPC/Message.h>
+#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
@@ -75,10 +76,13 @@ public:
return;
auto buffer = message.encode();
+ // Prepend the message size.
+ uint32_t message_size = buffer.size();
+ buffer.prepend(reinterpret_cast<const u8*>(&message_size), sizeof(message_size));
- auto bytes_remaining = buffer.size();
- while (bytes_remaining) {
- auto nwritten = write(m_socket->fd(), buffer.data(), buffer.size());
+ size_t total_nwritten = 0;
+ while (total_nwritten < buffer.size()) {
+ auto nwritten = write(m_socket->fd(), buffer.data() + total_nwritten, buffer.size() - total_nwritten);
if (nwritten < 0) {
switch (errno) {
case EPIPE:
@@ -95,7 +99,7 @@ public:
return;
}
}
- bytes_remaining -= nwritten;
+ total_nwritten += nwritten;
}
m_responsiveness_timer->start();
@@ -190,25 +194,34 @@ protected:
did_become_responsive();
}
- size_t decoded_bytes = 0;
- for (size_t index = 0; index < bytes.size(); index += decoded_bytes) {
+ size_t index = 0;
+ uint32_t message_size = 0;
+ for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
+ message_size = *reinterpret_cast<uint32_t*>(bytes.data() + index);
+ if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
+ break;
+ index += sizeof(message_size);
auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
- if (auto message = LocalEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
+ if (auto message = LocalEndpoint::decode_message(remaining_bytes)) {
m_unprocessed_messages.append(message.release_nonnull());
- } else if (auto message = PeerEndpoint::decode_message(remaining_bytes, decoded_bytes)) {
+ } else if (auto message = PeerEndpoint::decode_message(remaining_bytes)) {
m_unprocessed_messages.append(message.release_nonnull());
} else {
- // Sometimes we might receive a partial message. That's okay, just stash away
- // the unprocessed bytes and we'll prepend them to the next incoming message
- // in the next run of this function.
- if (!m_unprocessed_bytes.is_empty()) {
- dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes";
- shutdown();
- }
- m_unprocessed_bytes = remaining_bytes.isolated_copy();
+ dbgln("Failed to parse a message");
break;
}
- ASSERT(decoded_bytes);
+ }
+
+ if (index < bytes.size()) {
+ // Sometimes we might receive a partial message. That's okay, just stash away
+ // the unprocessed bytes and we'll prepend them to the next incoming message
+ // in the next run of this function.
+ auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index);
+ if (!m_unprocessed_bytes.is_empty()) {
+ dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes";
+ shutdown();
+ }
+ m_unprocessed_bytes = remaining_bytes.isolated_copy();
}
if (!m_unprocessed_messages.is_empty()) {