diff options
author | Sergey Bugaev <bugaevc@serenityos.org> | 2020-11-21 21:55:00 +0300 |
---|---|---|
committer | Andreas Kling <kling@serenityos.org> | 2020-11-23 18:37:40 +0100 |
commit | fa2e3e2be42277b3c565c308e4aaef0972301563 (patch) | |
tree | a4e843a8959a76cb7f8c6c9590e955c1e8666c75 /Libraries | |
parent | d62346c0b1e3d55a27c1a35f9606154ca19d422f (diff) | |
download | serenity-fa2e3e2be42277b3c565c308e4aaef0972301563.zip |
LibIPC: Prepend each message with its size
This makes it much simpler to determine when we've read a complete message, and
will make it possible to integrate recvfd() in the future commit.
Diffstat (limited to 'Libraries')
-rw-r--r-- | Libraries/LibIPC/Connection.h | 47 |
1 files changed, 30 insertions, 17 deletions
diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 4db21a6dc7..781a2d4c4f 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -35,6 +35,7 @@ #include <LibCore/SyscallUtils.h> #include <LibCore/Timer.h> #include <LibIPC/Message.h> +#include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <sys/select.h> @@ -75,10 +76,13 @@ public: return; auto buffer = message.encode(); + // Prepend the message size. + uint32_t message_size = buffer.size(); + buffer.prepend(reinterpret_cast<const u8*>(&message_size), sizeof(message_size)); - auto bytes_remaining = buffer.size(); - while (bytes_remaining) { - auto nwritten = write(m_socket->fd(), buffer.data(), buffer.size()); + size_t total_nwritten = 0; + while (total_nwritten < buffer.size()) { + auto nwritten = write(m_socket->fd(), buffer.data() + total_nwritten, buffer.size() - total_nwritten); if (nwritten < 0) { switch (errno) { case EPIPE: @@ -95,7 +99,7 @@ public: return; } } - bytes_remaining -= nwritten; + total_nwritten += nwritten; } m_responsiveness_timer->start(); @@ -190,25 +194,34 @@ protected: did_become_responsive(); } - size_t decoded_bytes = 0; - for (size_t index = 0; index < bytes.size(); index += decoded_bytes) { + size_t index = 0; + uint32_t message_size = 0; + for (; index + sizeof(message_size) < bytes.size(); index += message_size) { + message_size = *reinterpret_cast<uint32_t*>(bytes.data() + index); + if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size) + break; + index += sizeof(message_size); auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index); - if (auto message = LocalEndpoint::decode_message(remaining_bytes, decoded_bytes)) { + if (auto message = LocalEndpoint::decode_message(remaining_bytes)) { m_unprocessed_messages.append(message.release_nonnull()); - } else if (auto message = PeerEndpoint::decode_message(remaining_bytes, decoded_bytes)) { + } else if (auto message = PeerEndpoint::decode_message(remaining_bytes)) { m_unprocessed_messages.append(message.release_nonnull()); } else { - // Sometimes we might receive a partial message. That's okay, just stash away - // the unprocessed bytes and we'll prepend them to the next incoming message - // in the next run of this function. - if (!m_unprocessed_bytes.is_empty()) { - dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes"; - shutdown(); - } - m_unprocessed_bytes = remaining_bytes.isolated_copy(); + dbgln("Failed to parse a message"); break; } - ASSERT(decoded_bytes); + } + + if (index < bytes.size()) { + // Sometimes we might receive a partial message. That's okay, just stash away + // the unprocessed bytes and we'll prepend them to the next incoming message + // in the next run of this function. + auto remaining_bytes = ByteBuffer::wrap(bytes.data() + index, bytes.size() - index); + if (!m_unprocessed_bytes.is_empty()) { + dbg() << *this << "::drain_messages_from_peer: Already have unprocessed bytes"; + shutdown(); + } + m_unprocessed_bytes = remaining_bytes.isolated_copy(); } if (!m_unprocessed_messages.is_empty()) { |