summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Userland/Libraries/LibIPC/ClientConnection.h2
-rw-r--r--Userland/Libraries/LibIPC/Connection.cpp32
-rw-r--r--Userland/Libraries/LibIPC/Connection.h36
3 files changed, 39 insertions, 31 deletions
diff --git a/Userland/Libraries/LibIPC/ClientConnection.h b/Userland/Libraries/LibIPC/ClientConnection.h
index 52d8cdcaef..48767fa4e5 100644
--- a/Userland/Libraries/LibIPC/ClientConnection.h
+++ b/Userland/Libraries/LibIPC/ClientConnection.h
@@ -30,7 +30,7 @@ public:
, m_client_id(client_id)
{
VERIFY(this->socket().is_connected());
- this->socket().on_ready_to_read = [this] { this->drain_messages_from_peer(); };
+ this->socket().on_ready_to_read = [this] { this->drain_messages_from_peer(ServerEndpoint::static_magic()); };
}
virtual ~ClientConnection() override
diff --git a/Userland/Libraries/LibIPC/Connection.cpp b/Userland/Libraries/LibIPC/Connection.cpp
index 91b24a2553..4d2c29ea80 100644
--- a/Userland/Libraries/LibIPC/Connection.cpp
+++ b/Userland/Libraries/LibIPC/Connection.cpp
@@ -149,4 +149,36 @@ Result<Vector<u8>, bool> ConnectionBase::read_as_much_as_possible_from_socket_wi
return bytes;
}
+bool ConnectionBase::drain_messages_from_peer(u32 local_endpoint_magic)
+{
+ auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking());
+
+ size_t index = 0;
+ try_parse_messages(bytes, index);
+
+ if (index < bytes.size()) {
+ // Sometimes we might receive a partial message. That's okay, just stash away
+ // the unprocessed bytes and we'll prepend them to the next incoming message
+ // in the next run of this function.
+ auto remaining_bytes_result = ByteBuffer::copy(bytes.span().slice(index));
+ if (!remaining_bytes_result.has_value()) {
+ dbgln("{}::drain_messages_from_peer: Failed to allocate buffer", static_cast<Core::Object const&>(*this));
+ return false;
+ }
+ if (!m_unprocessed_bytes.is_empty()) {
+ dbgln("{}::drain_messages_from_peer: Already have unprocessed bytes", static_cast<Core::Object const&>(*this));
+ shutdown();
+ return false;
+ }
+ m_unprocessed_bytes = remaining_bytes_result.release_value();
+ }
+
+ if (!m_unprocessed_messages.is_empty()) {
+ deferred_invoke([this, local_endpoint_magic] {
+ handle_messages(local_endpoint_magic);
+ });
+ }
+ return true;
+}
+
}
diff --git a/Userland/Libraries/LibIPC/Connection.h b/Userland/Libraries/LibIPC/Connection.h
index 81856941be..43707f2486 100644
--- a/Userland/Libraries/LibIPC/Connection.h
+++ b/Userland/Libraries/LibIPC/Connection.h
@@ -46,9 +46,12 @@ protected:
virtual void may_have_become_unresponsive() { }
virtual void did_become_responsive() { }
+ virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) = 0;
void wait_for_socket_to_become_readable();
Result<Vector<u8>, bool> read_as_much_as_possible_from_socket_without_blocking();
+ bool drain_messages_from_peer(u32 local_endpoint_magic);
+
void post_message(MessageBuffer);
void handle_messages(u32 local_endpoint_magic);
@@ -70,7 +73,7 @@ public:
{
m_notifier->on_ready_to_read = [this] {
NonnullRefPtr protect = *this;
- drain_messages_from_peer();
+ drain_messages_from_peer(LocalEndpoint::static_magic());
handle_messages(LocalEndpoint::static_magic());
};
}
@@ -117,17 +120,14 @@ protected:
wait_for_socket_to_become_readable();
- if (!drain_messages_from_peer())
+ if (!drain_messages_from_peer(LocalEndpoint::static_magic()))
break;
}
return {};
}
- bool drain_messages_from_peer()
+ virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) override
{
- auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking());
-
- size_t index = 0;
u32 message_size = 0;
for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
memcpy(&message_size, bytes.data() + index, sizeof(message_size));
@@ -144,30 +144,6 @@ protected:
break;
}
}
-
- if (index < bytes.size()) {
- // Sometimes we might receive a partial message. That's okay, just stash away
- // the unprocessed bytes and we'll prepend them to the next incoming message
- // in the next run of this function.
- auto remaining_bytes_result = ByteBuffer::copy(bytes.span().slice(index));
- if (!remaining_bytes_result.has_value()) {
- dbgln("{}::drain_messages_from_peer: Failed to allocate buffer", *this);
- return false;
- }
- if (!m_unprocessed_bytes.is_empty()) {
- dbgln("{}::drain_messages_from_peer: Already have unprocessed bytes", *this);
- shutdown();
- return false;
- }
- m_unprocessed_bytes = remaining_bytes_result.release_value();
- }
-
- if (!m_unprocessed_messages.is_empty()) {
- deferred_invoke([this] {
- handle_messages(LocalEndpoint::static_magic());
- });
- }
- return true;
}
};