summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndreas Kling <kling@serenityos.org>2023-04-23 19:45:12 +0200
committerAndreas Kling <kling@serenityos.org>2023-04-25 14:48:40 +0200
commit1587caef842c66ed382fc75c8f116d09ea7acc06 (patch)
tree731e8f371274050da5559a2387ef3a1c272d859b
parent3a70a16ca78d2627c0b754ae872b8363c8154f8f (diff)
downloadserenity-1587caef842c66ed382fc75c8f116d09ea7acc06.zip
LibCore: Move event queueing to a per-thread event queue
Instead of juggling events between individual instances of Core::EventLoop, move queueing and processing to a separate per-thread queue (ThreadEventQueue).
-rw-r--r--Userland/Libraries/LibCore/CMakeLists.txt1
-rw-r--r--Userland/Libraries/LibCore/Event.h1
-rw-r--r--Userland/Libraries/LibCore/EventLoop.cpp98
-rw-r--r--Userland/Libraries/LibCore/EventLoop.h19
-rw-r--r--Userland/Libraries/LibCore/ThreadEventQueue.cpp123
-rw-r--r--Userland/Libraries/LibCore/ThreadEventQueue.h44
6 files changed, 184 insertions, 102 deletions
diff --git a/Userland/Libraries/LibCore/CMakeLists.txt b/Userland/Libraries/LibCore/CMakeLists.txt
index 1a5b3fcbd3..17359fad50 100644
--- a/Userland/Libraries/LibCore/CMakeLists.txt
+++ b/Userland/Libraries/LibCore/CMakeLists.txt
@@ -30,6 +30,7 @@ set(SOURCES
System.cpp
SystemServerTakeover.cpp
TCPServer.cpp
+ ThreadEventQueue.cpp
Timer.cpp
UDPServer.cpp
Version.cpp
diff --git a/Userland/Libraries/LibCore/Event.h b/Userland/Libraries/LibCore/Event.h
index fa3cf1697a..f3da4419e4 100644
--- a/Userland/Libraries/LibCore/Event.h
+++ b/Userland/Libraries/LibCore/Event.h
@@ -49,6 +49,7 @@ private:
class DeferredInvocationEvent : public Event {
friend class EventLoop;
+ friend class ThreadEventQueue;
public:
DeferredInvocationEvent(NonnullRefPtr<DeferredInvocationContext> context, Function<void()> invokee)
diff --git a/Userland/Libraries/LibCore/EventLoop.cpp b/Userland/Libraries/LibCore/EventLoop.cpp
index f07ef2fed4..979e551047 100644
--- a/Userland/Libraries/LibCore/EventLoop.cpp
+++ b/Userland/Libraries/LibCore/EventLoop.cpp
@@ -25,6 +25,7 @@
#include <LibCore/Promise.h>
#include <LibCore/SessionManagement.h>
#include <LibCore/Socket.h>
+#include <LibCore/ThreadEventQueue.h>
#include <LibThreading/Mutex.h>
#include <LibThreading/MutexProtected.h>
#include <errno.h>
@@ -65,6 +66,12 @@ struct EventLoopTimer {
struct EventLoop::Private {
Threading::Mutex lock;
+ ThreadEventQueue& thread_event_queue;
+
+ Private()
+ : thread_event_queue(ThreadEventQueue::current())
+ {
+ }
};
static Threading::MutexProtected<NeverDestroyed<IDAllocator>> s_id_allocator;
@@ -78,7 +85,6 @@ static thread_local HashTable<Notifier*>* s_notifiers;
// While wake() pushes zero into the pipe, signal numbers (by defintion nonzero, see signal_numbers.h) are pushed into the pipe verbatim.
thread_local int EventLoop::s_wake_pipe_fds[2];
thread_local bool EventLoop::s_wake_pipe_initialized { false };
-thread_local bool s_warned_promise_count { false };
void EventLoop::initialize_wake_pipes()
{
@@ -421,7 +427,6 @@ public:
: m_event_loop(event_loop)
{
if (EventLoop::has_been_instantiated()) {
- m_event_loop.take_pending_events_from(EventLoop::current());
s_event_loop_stack->append(event_loop);
}
}
@@ -429,12 +434,6 @@ public:
{
if (EventLoop::has_been_instantiated()) {
s_event_loop_stack->take_last();
- for (auto& job : m_event_loop.m_pending_promises) {
- // When this event loop was not running below another event loop, the jobs may very well have finished in the meantime.
- if (!job->is_resolved())
- job->cancel(Error::from_string_view("EventLoop is exiting"sv));
- }
- EventLoop::current().take_pending_events_from(m_event_loop);
}
}
@@ -462,72 +461,21 @@ void EventLoop::spin_until(Function<bool()> goal_condition)
size_t EventLoop::pump(WaitMode mode)
{
- wait_for_event(mode);
+ // Pumping the event loop from another thread is not allowed.
+ VERIFY(&m_private->thread_event_queue == &ThreadEventQueue::current());
- decltype(m_queued_events) events;
- {
- Threading::MutexLocker locker(m_private->lock);
- events = move(m_queued_events);
- }
-
- m_pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
-
- size_t processed_events = 0;
- for (size_t i = 0; i < events.size(); ++i) {
- auto& queued_event = events.at(i);
- auto receiver = queued_event.receiver.strong_ref();
- auto& event = *queued_event.event;
- if (receiver)
- dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: {} event {}", *receiver, event.type());
-
- if (!receiver) {
- switch (event.type()) {
- case Event::Quit:
- VERIFY_NOT_REACHED();
- default:
- dbgln_if(EVENTLOOP_DEBUG, "Event type {} with no receiver :(", event.type());
- break;
- }
- } else if (event.type() == Event::Type::DeferredInvoke) {
- dbgln_if(DEFERRED_INVOKE_DEBUG, "DeferredInvoke: receiver = {}", *receiver);
- static_cast<DeferredInvocationEvent&>(event).m_invokee();
- } else {
- NonnullRefPtr<Object> protector(*receiver);
- receiver->dispatch_event(event);
- }
- ++processed_events;
-
- if (m_exit_requested) {
- Threading::MutexLocker locker(m_private->lock);
- dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: Exit requested. Rejigging {} events.", events.size() - i);
- decltype(m_queued_events) new_event_queue;
- new_event_queue.ensure_capacity(m_queued_events.size() + events.size());
- for (++i; i < events.size(); ++i)
- new_event_queue.unchecked_append(move(events[i]));
- new_event_queue.extend(move(m_queued_events));
- m_queued_events = move(new_event_queue);
- break;
- }
- }
-
- if (m_pending_promises.size() > 30 && !s_warned_promise_count) {
- s_warned_promise_count = true;
- dbgln("EventLoop {:p} warning: Job queue wasn't designed for this load ({} promises). Please begin optimizing EventLoop::pump() -> m_pending_promises.remove_all_matching", this, m_pending_promises.size());
- }
-
- return processed_events;
+ wait_for_event(mode);
+ return m_private->thread_event_queue.process();
}
void EventLoop::post_event(Object& receiver, NonnullOwnPtr<Event>&& event)
{
- Threading::MutexLocker lock(m_private->lock);
- dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop::post_event: ({}) << receiver={}, event={}", m_queued_events.size(), receiver, event);
- m_queued_events.empend(receiver, move(event));
+ m_private->thread_event_queue.post_event(receiver, move(event));
}
void EventLoop::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> job_promise)
{
- m_pending_promises.append(move(job_promise));
+ ThreadEventQueue::current().add_job(move(job_promise));
}
SignalHandlers::SignalHandlers(int signo, void (*handle_signal)(int))
@@ -711,18 +659,14 @@ retry:
VERIFY_NOT_REACHED();
}
- bool queued_events_is_empty;
- {
- Threading::MutexLocker locker(m_private->lock);
- queued_events_is_empty = m_queued_events.is_empty();
- }
+ bool has_pending_events = m_private->thread_event_queue.has_pending_events();
// Figure out how long to wait at maximum.
// This mainly depends on the WaitMode and whether we have pending events, but also the next expiring timer.
Time now;
struct timeval timeout = { 0, 0 };
bool should_wait_forever = false;
- if (mode == WaitMode::WaitForEvents && queued_events_is_empty) {
+ if (mode == WaitMode::WaitForEvents && !has_pending_events) {
auto next_timer_expiration = get_next_timer_expiration();
if (next_timer_expiration.has_value()) {
now = Time::now_monotonic_coarse();
@@ -905,16 +849,4 @@ void EventLoop::wake()
}
}
-EventLoop::QueuedEvent::QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
- : receiver(receiver)
- , event(move(event))
-{
-}
-
-EventLoop::QueuedEvent::QueuedEvent(QueuedEvent&& other)
- : receiver(other.receiver)
- , event(move(other.event))
-{
-}
-
}
diff --git a/Userland/Libraries/LibCore/EventLoop.h b/Userland/Libraries/LibCore/EventLoop.h
index d9a75cb4e4..8feaf4e84e 100644
--- a/Userland/Libraries/LibCore/EventLoop.h
+++ b/Userland/Libraries/LibCore/EventLoop.h
@@ -111,11 +111,6 @@ public:
};
static void notify_forked(ForkEvent);
- void take_pending_events_from(EventLoop& other)
- {
- m_queued_events.extend(move(other.m_queued_events));
- }
-
static EventLoop& current();
private:
@@ -124,20 +119,6 @@ private:
static void dispatch_signal(int);
static void handle_signal(int);
- struct QueuedEvent {
- AK_MAKE_NONCOPYABLE(QueuedEvent);
-
- public:
- QueuedEvent(Object& receiver, NonnullOwnPtr<Event>);
- QueuedEvent(QueuedEvent&&);
- ~QueuedEvent() = default;
-
- WeakPtr<Object> receiver;
- NonnullOwnPtr<Event> event;
- };
-
- Vector<QueuedEvent, 64> m_queued_events;
- Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>> m_pending_promises;
static pid_t s_pid;
bool m_exit_requested { false };
diff --git a/Userland/Libraries/LibCore/ThreadEventQueue.cpp b/Userland/Libraries/LibCore/ThreadEventQueue.cpp
new file mode 100644
index 0000000000..97249a2a69
--- /dev/null
+++ b/Userland/Libraries/LibCore/ThreadEventQueue.cpp
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include <AK/Vector.h>
+#include <LibCore/DeferredInvocationContext.h>
+#include <LibCore/Object.h>
+#include <LibCore/Promise.h>
+#include <LibCore/ThreadEventQueue.h>
+#include <LibThreading/Mutex.h>
+
+namespace Core {
+
+struct ThreadEventQueue::Private {
+ struct QueuedEvent {
+ AK_MAKE_NONCOPYABLE(QueuedEvent);
+
+ public:
+ QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
+ : receiver(receiver)
+ , event(move(event))
+ {
+ }
+
+ QueuedEvent(QueuedEvent&& other)
+ : receiver(other.receiver)
+ , event(move(other.event))
+ {
+ }
+
+ ~QueuedEvent() = default;
+
+ WeakPtr<Object> receiver;
+ NonnullOwnPtr<Event> event;
+ };
+
+ Threading::Mutex mutex;
+ Vector<QueuedEvent, 128> queued_events;
+ Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises;
+ bool warned_promise_count { false };
+};
+
+static thread_local ThreadEventQueue* s_current_thread_event_queue;
+
+ThreadEventQueue& ThreadEventQueue::current()
+{
+ if (!s_current_thread_event_queue) {
+ // FIXME: Don't leak these.
+ s_current_thread_event_queue = new ThreadEventQueue;
+ }
+ return *s_current_thread_event_queue;
+}
+
+ThreadEventQueue::ThreadEventQueue()
+ : m_private(make<Private>())
+{
+}
+
+ThreadEventQueue::~ThreadEventQueue() = default;
+
+void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event)
+{
+ Threading::MutexLocker lock(m_private->mutex);
+ m_private->queued_events.empend(receiver, move(event));
+}
+
+void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise)
+{
+ Threading::MutexLocker lock(m_private->mutex);
+ m_private->pending_promises.append(move(promise));
+}
+
+size_t ThreadEventQueue::process()
+{
+ decltype(m_private->queued_events) events;
+ {
+ Threading::MutexLocker locker(m_private->mutex);
+ events = move(m_private->queued_events);
+ m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
+ }
+
+ size_t processed_events = 0;
+ for (size_t i = 0; i < events.size(); ++i) {
+ auto& queued_event = events.at(i);
+ auto receiver = queued_event.receiver.strong_ref();
+ auto& event = *queued_event.event;
+
+ if (!receiver) {
+ switch (event.type()) {
+ case Event::Quit:
+ VERIFY_NOT_REACHED();
+ default:
+ dbgln("ThreadEventQueue::process: Event of type {} with no receiver", event.type());
+ break;
+ }
+ } else if (event.type() == Event::Type::DeferredInvoke) {
+ static_cast<DeferredInvocationEvent&>(event).m_invokee();
+ } else {
+ NonnullRefPtr<Object> protector(*receiver);
+ receiver->dispatch_event(event);
+ }
+ ++processed_events;
+ }
+
+ {
+ Threading::MutexLocker locker(m_private->mutex);
+ if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
+ m_private->warned_promise_count = true;
+ dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
+ }
+ }
+ return processed_events;
+}
+
+bool ThreadEventQueue::has_pending_events() const
+{
+ Threading::MutexLocker locker(m_private->mutex);
+ return !m_private->queued_events.is_empty();
+}
+
+}
diff --git a/Userland/Libraries/LibCore/ThreadEventQueue.h b/Userland/Libraries/LibCore/ThreadEventQueue.h
new file mode 100644
index 0000000000..675e7b8e76
--- /dev/null
+++ b/Userland/Libraries/LibCore/ThreadEventQueue.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <AK/NonnullOwnPtr.h>
+#include <AK/OwnPtr.h>
+
+namespace Core {
+
+// Per-thread global event queue. This is where events are queued for the EventLoop to process.
+// There is only one ThreadEventQueue per thread, and it is accessed via ThreadEventQueue::current().
+// It is allowed to post events to other threads' event queues.
+class ThreadEventQueue {
+ AK_MAKE_NONCOPYABLE(ThreadEventQueue);
+ AK_MAKE_NONMOVABLE(ThreadEventQueue);
+
+public:
+ static ThreadEventQueue& current();
+
+ // Process all queued events. Returns the number of events that were processed.
+ size_t process();
+
+ // Posts an event to the event queue.
+ void post_event(Object& receiver, NonnullOwnPtr<Event>);
+
+ // Used by Threading::BackgroundAction.
+ void add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>>);
+
+ // Returns true if there are events waiting to be flushed.
+ bool has_pending_events() const;
+
+private:
+ ThreadEventQueue();
+ ~ThreadEventQueue();
+
+ struct Private;
+ OwnPtr<Private> m_private;
+};
+
+}