diff options
-rw-r--r-- | Userland/Libraries/LibIPC/ClientConnection.h | 2 | ||||
-rw-r--r-- | Userland/Libraries/LibIPC/Connection.cpp | 32 | ||||
-rw-r--r-- | Userland/Libraries/LibIPC/Connection.h | 36 |
3 files changed, 39 insertions, 31 deletions
diff --git a/Userland/Libraries/LibIPC/ClientConnection.h b/Userland/Libraries/LibIPC/ClientConnection.h index 52d8cdcaef..48767fa4e5 100644 --- a/Userland/Libraries/LibIPC/ClientConnection.h +++ b/Userland/Libraries/LibIPC/ClientConnection.h @@ -30,7 +30,7 @@ public: , m_client_id(client_id) { VERIFY(this->socket().is_connected()); - this->socket().on_ready_to_read = [this] { this->drain_messages_from_peer(); }; + this->socket().on_ready_to_read = [this] { this->drain_messages_from_peer(ServerEndpoint::static_magic()); }; } virtual ~ClientConnection() override diff --git a/Userland/Libraries/LibIPC/Connection.cpp b/Userland/Libraries/LibIPC/Connection.cpp index 91b24a2553..4d2c29ea80 100644 --- a/Userland/Libraries/LibIPC/Connection.cpp +++ b/Userland/Libraries/LibIPC/Connection.cpp @@ -149,4 +149,36 @@ Result<Vector<u8>, bool> ConnectionBase::read_as_much_as_possible_from_socket_wi return bytes; } +bool ConnectionBase::drain_messages_from_peer(u32 local_endpoint_magic) +{ + auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking()); + + size_t index = 0; + try_parse_messages(bytes, index); + + if (index < bytes.size()) { + // Sometimes we might receive a partial message. That's okay, just stash away + // the unprocessed bytes and we'll prepend them to the next incoming message + // in the next run of this function. + auto remaining_bytes_result = ByteBuffer::copy(bytes.span().slice(index)); + if (!remaining_bytes_result.has_value()) { + dbgln("{}::drain_messages_from_peer: Failed to allocate buffer", static_cast<Core::Object const&>(*this)); + return false; + } + if (!m_unprocessed_bytes.is_empty()) { + dbgln("{}::drain_messages_from_peer: Already have unprocessed bytes", static_cast<Core::Object const&>(*this)); + shutdown(); + return false; + } + m_unprocessed_bytes = remaining_bytes_result.release_value(); + } + + if (!m_unprocessed_messages.is_empty()) { + deferred_invoke([this, local_endpoint_magic] { + handle_messages(local_endpoint_magic); + }); + } + return true; +} + } diff --git a/Userland/Libraries/LibIPC/Connection.h b/Userland/Libraries/LibIPC/Connection.h index 81856941be..43707f2486 100644 --- a/Userland/Libraries/LibIPC/Connection.h +++ b/Userland/Libraries/LibIPC/Connection.h @@ -46,9 +46,12 @@ protected: virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } + virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) = 0; void wait_for_socket_to_become_readable(); Result<Vector<u8>, bool> read_as_much_as_possible_from_socket_without_blocking(); + bool drain_messages_from_peer(u32 local_endpoint_magic); + void post_message(MessageBuffer); void handle_messages(u32 local_endpoint_magic); @@ -70,7 +73,7 @@ public: { m_notifier->on_ready_to_read = [this] { NonnullRefPtr protect = *this; - drain_messages_from_peer(); + drain_messages_from_peer(LocalEndpoint::static_magic()); handle_messages(LocalEndpoint::static_magic()); }; } @@ -117,17 +120,14 @@ protected: wait_for_socket_to_become_readable(); - if (!drain_messages_from_peer()) + if (!drain_messages_from_peer(LocalEndpoint::static_magic())) break; } return {}; } - bool drain_messages_from_peer() + virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) override { - auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking()); - - size_t index = 0; u32 message_size = 0; for (; index + sizeof(message_size) < bytes.size(); index += message_size) { memcpy(&message_size, bytes.data() + index, sizeof(message_size)); @@ -144,30 +144,6 @@ protected: break; } } - - if (index < bytes.size()) { - // Sometimes we might receive a partial message. That's okay, just stash away - // the unprocessed bytes and we'll prepend them to the next incoming message - // in the next run of this function. - auto remaining_bytes_result = ByteBuffer::copy(bytes.span().slice(index)); - if (!remaining_bytes_result.has_value()) { - dbgln("{}::drain_messages_from_peer: Failed to allocate buffer", *this); - return false; - } - if (!m_unprocessed_bytes.is_empty()) { - dbgln("{}::drain_messages_from_peer: Already have unprocessed bytes", *this); - shutdown(); - return false; - } - m_unprocessed_bytes = remaining_bytes_result.release_value(); - } - - if (!m_unprocessed_messages.is_empty()) { - deferred_invoke([this] { - handle_messages(LocalEndpoint::static_magic()); - }); - } - return true; } }; |