diff options
author | Andreas Kling <kling@serenityos.org> | 2023-04-23 19:45:12 +0200 |
---|---|---|
committer | Andreas Kling <kling@serenityos.org> | 2023-04-25 14:48:40 +0200 |
commit | 1587caef842c66ed382fc75c8f116d09ea7acc06 (patch) | |
tree | 731e8f371274050da5559a2387ef3a1c272d859b | |
parent | 3a70a16ca78d2627c0b754ae872b8363c8154f8f (diff) | |
download | serenity-1587caef842c66ed382fc75c8f116d09ea7acc06.zip |
LibCore: Move event queueing to a per-thread event queue
Instead of juggling events between individual instances of
Core::EventLoop, move queueing and processing to a separate per-thread
queue (ThreadEventQueue).
-rw-r--r-- | Userland/Libraries/LibCore/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Userland/Libraries/LibCore/Event.h | 1 | ||||
-rw-r--r-- | Userland/Libraries/LibCore/EventLoop.cpp | 98 | ||||
-rw-r--r-- | Userland/Libraries/LibCore/EventLoop.h | 19 | ||||
-rw-r--r-- | Userland/Libraries/LibCore/ThreadEventQueue.cpp | 123 | ||||
-rw-r--r-- | Userland/Libraries/LibCore/ThreadEventQueue.h | 44 |
6 files changed, 184 insertions, 102 deletions
diff --git a/Userland/Libraries/LibCore/CMakeLists.txt b/Userland/Libraries/LibCore/CMakeLists.txt index 1a5b3fcbd3..17359fad50 100644 --- a/Userland/Libraries/LibCore/CMakeLists.txt +++ b/Userland/Libraries/LibCore/CMakeLists.txt @@ -30,6 +30,7 @@ set(SOURCES System.cpp SystemServerTakeover.cpp TCPServer.cpp + ThreadEventQueue.cpp Timer.cpp UDPServer.cpp Version.cpp diff --git a/Userland/Libraries/LibCore/Event.h b/Userland/Libraries/LibCore/Event.h index fa3cf1697a..f3da4419e4 100644 --- a/Userland/Libraries/LibCore/Event.h +++ b/Userland/Libraries/LibCore/Event.h @@ -49,6 +49,7 @@ private: class DeferredInvocationEvent : public Event { friend class EventLoop; + friend class ThreadEventQueue; public: DeferredInvocationEvent(NonnullRefPtr<DeferredInvocationContext> context, Function<void()> invokee) diff --git a/Userland/Libraries/LibCore/EventLoop.cpp b/Userland/Libraries/LibCore/EventLoop.cpp index f07ef2fed4..979e551047 100644 --- a/Userland/Libraries/LibCore/EventLoop.cpp +++ b/Userland/Libraries/LibCore/EventLoop.cpp @@ -25,6 +25,7 @@ #include <LibCore/Promise.h> #include <LibCore/SessionManagement.h> #include <LibCore/Socket.h> +#include <LibCore/ThreadEventQueue.h> #include <LibThreading/Mutex.h> #include <LibThreading/MutexProtected.h> #include <errno.h> @@ -65,6 +66,12 @@ struct EventLoopTimer { struct EventLoop::Private { Threading::Mutex lock; + ThreadEventQueue& thread_event_queue; + + Private() + : thread_event_queue(ThreadEventQueue::current()) + { + } }; static Threading::MutexProtected<NeverDestroyed<IDAllocator>> s_id_allocator; @@ -78,7 +85,6 @@ static thread_local HashTable<Notifier*>* s_notifiers; // While wake() pushes zero into the pipe, signal numbers (by defintion nonzero, see signal_numbers.h) are pushed into the pipe verbatim. thread_local int EventLoop::s_wake_pipe_fds[2]; thread_local bool EventLoop::s_wake_pipe_initialized { false }; -thread_local bool s_warned_promise_count { false }; void EventLoop::initialize_wake_pipes() { @@ -421,7 +427,6 @@ public: : m_event_loop(event_loop) { if (EventLoop::has_been_instantiated()) { - m_event_loop.take_pending_events_from(EventLoop::current()); s_event_loop_stack->append(event_loop); } } @@ -429,12 +434,6 @@ public: { if (EventLoop::has_been_instantiated()) { s_event_loop_stack->take_last(); - for (auto& job : m_event_loop.m_pending_promises) { - // When this event loop was not running below another event loop, the jobs may very well have finished in the meantime. - if (!job->is_resolved()) - job->cancel(Error::from_string_view("EventLoop is exiting"sv)); - } - EventLoop::current().take_pending_events_from(m_event_loop); } } @@ -462,72 +461,21 @@ void EventLoop::spin_until(Function<bool()> goal_condition) size_t EventLoop::pump(WaitMode mode) { - wait_for_event(mode); + // Pumping the event loop from another thread is not allowed. + VERIFY(&m_private->thread_event_queue == &ThreadEventQueue::current()); - decltype(m_queued_events) events; - { - Threading::MutexLocker locker(m_private->lock); - events = move(m_queued_events); - } - - m_pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); }); - - size_t processed_events = 0; - for (size_t i = 0; i < events.size(); ++i) { - auto& queued_event = events.at(i); - auto receiver = queued_event.receiver.strong_ref(); - auto& event = *queued_event.event; - if (receiver) - dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: {} event {}", *receiver, event.type()); - - if (!receiver) { - switch (event.type()) { - case Event::Quit: - VERIFY_NOT_REACHED(); - default: - dbgln_if(EVENTLOOP_DEBUG, "Event type {} with no receiver :(", event.type()); - break; - } - } else if (event.type() == Event::Type::DeferredInvoke) { - dbgln_if(DEFERRED_INVOKE_DEBUG, "DeferredInvoke: receiver = {}", *receiver); - static_cast<DeferredInvocationEvent&>(event).m_invokee(); - } else { - NonnullRefPtr<Object> protector(*receiver); - receiver->dispatch_event(event); - } - ++processed_events; - - if (m_exit_requested) { - Threading::MutexLocker locker(m_private->lock); - dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: Exit requested. Rejigging {} events.", events.size() - i); - decltype(m_queued_events) new_event_queue; - new_event_queue.ensure_capacity(m_queued_events.size() + events.size()); - for (++i; i < events.size(); ++i) - new_event_queue.unchecked_append(move(events[i])); - new_event_queue.extend(move(m_queued_events)); - m_queued_events = move(new_event_queue); - break; - } - } - - if (m_pending_promises.size() > 30 && !s_warned_promise_count) { - s_warned_promise_count = true; - dbgln("EventLoop {:p} warning: Job queue wasn't designed for this load ({} promises). Please begin optimizing EventLoop::pump() -> m_pending_promises.remove_all_matching", this, m_pending_promises.size()); - } - - return processed_events; + wait_for_event(mode); + return m_private->thread_event_queue.process(); } void EventLoop::post_event(Object& receiver, NonnullOwnPtr<Event>&& event) { - Threading::MutexLocker lock(m_private->lock); - dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop::post_event: ({}) << receiver={}, event={}", m_queued_events.size(), receiver, event); - m_queued_events.empend(receiver, move(event)); + m_private->thread_event_queue.post_event(receiver, move(event)); } void EventLoop::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> job_promise) { - m_pending_promises.append(move(job_promise)); + ThreadEventQueue::current().add_job(move(job_promise)); } SignalHandlers::SignalHandlers(int signo, void (*handle_signal)(int)) @@ -711,18 +659,14 @@ retry: VERIFY_NOT_REACHED(); } - bool queued_events_is_empty; - { - Threading::MutexLocker locker(m_private->lock); - queued_events_is_empty = m_queued_events.is_empty(); - } + bool has_pending_events = m_private->thread_event_queue.has_pending_events(); // Figure out how long to wait at maximum. // This mainly depends on the WaitMode and whether we have pending events, but also the next expiring timer. Time now; struct timeval timeout = { 0, 0 }; bool should_wait_forever = false; - if (mode == WaitMode::WaitForEvents && queued_events_is_empty) { + if (mode == WaitMode::WaitForEvents && !has_pending_events) { auto next_timer_expiration = get_next_timer_expiration(); if (next_timer_expiration.has_value()) { now = Time::now_monotonic_coarse(); @@ -905,16 +849,4 @@ void EventLoop::wake() } } -EventLoop::QueuedEvent::QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event) - : receiver(receiver) - , event(move(event)) -{ -} - -EventLoop::QueuedEvent::QueuedEvent(QueuedEvent&& other) - : receiver(other.receiver) - , event(move(other.event)) -{ -} - } diff --git a/Userland/Libraries/LibCore/EventLoop.h b/Userland/Libraries/LibCore/EventLoop.h index d9a75cb4e4..8feaf4e84e 100644 --- a/Userland/Libraries/LibCore/EventLoop.h +++ b/Userland/Libraries/LibCore/EventLoop.h @@ -111,11 +111,6 @@ public: }; static void notify_forked(ForkEvent); - void take_pending_events_from(EventLoop& other) - { - m_queued_events.extend(move(other.m_queued_events)); - } - static EventLoop& current(); private: @@ -124,20 +119,6 @@ private: static void dispatch_signal(int); static void handle_signal(int); - struct QueuedEvent { - AK_MAKE_NONCOPYABLE(QueuedEvent); - - public: - QueuedEvent(Object& receiver, NonnullOwnPtr<Event>); - QueuedEvent(QueuedEvent&&); - ~QueuedEvent() = default; - - WeakPtr<Object> receiver; - NonnullOwnPtr<Event> event; - }; - - Vector<QueuedEvent, 64> m_queued_events; - Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>> m_pending_promises; static pid_t s_pid; bool m_exit_requested { false }; diff --git a/Userland/Libraries/LibCore/ThreadEventQueue.cpp b/Userland/Libraries/LibCore/ThreadEventQueue.cpp new file mode 100644 index 0000000000..97249a2a69 --- /dev/null +++ b/Userland/Libraries/LibCore/ThreadEventQueue.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2023, Andreas Kling <kling@serenityos.org> + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include <AK/Vector.h> +#include <LibCore/DeferredInvocationContext.h> +#include <LibCore/Object.h> +#include <LibCore/Promise.h> +#include <LibCore/ThreadEventQueue.h> +#include <LibThreading/Mutex.h> + +namespace Core { + +struct ThreadEventQueue::Private { + struct QueuedEvent { + AK_MAKE_NONCOPYABLE(QueuedEvent); + + public: + QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event) + : receiver(receiver) + , event(move(event)) + { + } + + QueuedEvent(QueuedEvent&& other) + : receiver(other.receiver) + , event(move(other.event)) + { + } + + ~QueuedEvent() = default; + + WeakPtr<Object> receiver; + NonnullOwnPtr<Event> event; + }; + + Threading::Mutex mutex; + Vector<QueuedEvent, 128> queued_events; + Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises; + bool warned_promise_count { false }; +}; + +static thread_local ThreadEventQueue* s_current_thread_event_queue; + +ThreadEventQueue& ThreadEventQueue::current() +{ + if (!s_current_thread_event_queue) { + // FIXME: Don't leak these. + s_current_thread_event_queue = new ThreadEventQueue; + } + return *s_current_thread_event_queue; +} + +ThreadEventQueue::ThreadEventQueue() + : m_private(make<Private>()) +{ +} + +ThreadEventQueue::~ThreadEventQueue() = default; + +void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event) +{ + Threading::MutexLocker lock(m_private->mutex); + m_private->queued_events.empend(receiver, move(event)); +} + +void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise) +{ + Threading::MutexLocker lock(m_private->mutex); + m_private->pending_promises.append(move(promise)); +} + +size_t ThreadEventQueue::process() +{ + decltype(m_private->queued_events) events; + { + Threading::MutexLocker locker(m_private->mutex); + events = move(m_private->queued_events); + m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); }); + } + + size_t processed_events = 0; + for (size_t i = 0; i < events.size(); ++i) { + auto& queued_event = events.at(i); + auto receiver = queued_event.receiver.strong_ref(); + auto& event = *queued_event.event; + + if (!receiver) { + switch (event.type()) { + case Event::Quit: + VERIFY_NOT_REACHED(); + default: + dbgln("ThreadEventQueue::process: Event of type {} with no receiver", event.type()); + break; + } + } else if (event.type() == Event::Type::DeferredInvoke) { + static_cast<DeferredInvocationEvent&>(event).m_invokee(); + } else { + NonnullRefPtr<Object> protector(*receiver); + receiver->dispatch_event(event); + } + ++processed_events; + } + + { + Threading::MutexLocker locker(m_private->mutex); + if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) { + m_private->warned_promise_count = true; + dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size()); + } + } + return processed_events; +} + +bool ThreadEventQueue::has_pending_events() const +{ + Threading::MutexLocker locker(m_private->mutex); + return !m_private->queued_events.is_empty(); +} + +} diff --git a/Userland/Libraries/LibCore/ThreadEventQueue.h b/Userland/Libraries/LibCore/ThreadEventQueue.h new file mode 100644 index 0000000000..675e7b8e76 --- /dev/null +++ b/Userland/Libraries/LibCore/ThreadEventQueue.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023, Andreas Kling <kling@serenityos.org> + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include <AK/NonnullOwnPtr.h> +#include <AK/OwnPtr.h> + +namespace Core { + +// Per-thread global event queue. This is where events are queued for the EventLoop to process. +// There is only one ThreadEventQueue per thread, and it is accessed via ThreadEventQueue::current(). +// It is allowed to post events to other threads' event queues. +class ThreadEventQueue { + AK_MAKE_NONCOPYABLE(ThreadEventQueue); + AK_MAKE_NONMOVABLE(ThreadEventQueue); + +public: + static ThreadEventQueue& current(); + + // Process all queued events. Returns the number of events that were processed. + size_t process(); + + // Posts an event to the event queue. + void post_event(Object& receiver, NonnullOwnPtr<Event>); + + // Used by Threading::BackgroundAction. + void add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>>); + + // Returns true if there are events waiting to be flushed. + bool has_pending_events() const; + +private: + ThreadEventQueue(); + ~ThreadEventQueue(); + + struct Private; + OwnPtr<Private> m_private; +}; + +} |