1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
/*
* Copyright (c) 2022, Gregory Bertilson <zaggy1024@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Debug.h>
#include <AK/Variant.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/Mutex.h>
#include <LibThreading/Thread.h>
namespace Threading {
// Macro to allow single-line logging prints with fields that only exist in debug mode.
#if WORKER_THREAD_DEBUG
# define WORKER_LOG(args...) ({ dbgln(args); })
#else
# define WORKER_LOG(args...)
#endif
template<typename ErrorType>
class WorkerThread {
enum class State {
Idle,
Working,
Stopped,
};
using WorkerTask = Function<ErrorOr<void, ErrorType>()>;
using WorkerState = Variant<State, WorkerTask, ErrorType>;
public:
static ErrorOr<NonnullOwnPtr<WorkerThread>> create(StringView name)
{
auto worker_thread = TRY(adopt_nonnull_own_or_enomem(new (nothrow) WorkerThread()));
worker_thread->m_thread = TRY(Threading::Thread::try_create([&self = *worker_thread]() {
WORKER_LOG("Starting worker loop {}", self.m_id);
while (true) {
self.m_mutex.lock();
if (self.m_stop) {
WORKER_LOG("Exiting {}", self.m_id);
self.m_state = State::Stopped;
self.m_condition.broadcast();
self.m_mutex.unlock();
return 0;
}
if (self.m_state.template has<WorkerTask>()) {
auto task = move(self.m_state.template get<WorkerTask>());
self.m_state = State::Working;
self.m_mutex.unlock();
WORKER_LOG("Starting task on {}", self.m_id);
auto result = task();
if (result.is_error()) {
WORKER_LOG("Task finished on {} with error", self.m_id);
self.m_mutex.lock();
self.m_state = result.release_error();
self.m_condition.broadcast();
} else {
WORKER_LOG("Task finished successfully on {}", self.m_id);
self.m_mutex.lock();
self.m_state = State::Idle;
self.m_condition.broadcast();
}
}
WORKER_LOG("Awaiting new task in {}...", self.m_id);
self.m_condition.wait();
WORKER_LOG("Worker thread awoken in {}", self.m_id);
self.m_mutex.unlock();
}
return 0;
},
name));
worker_thread->m_thread->start();
return worker_thread;
}
~WorkerThread()
{
m_mutex.lock();
m_stop = true;
m_condition.broadcast();
while (!is_in_state(State::Stopped))
m_condition.wait();
m_mutex.unlock();
(void)m_thread->join();
WORKER_LOG("Worker thread {} joined successfully", m_id);
}
// Returns whether the task is starting.
bool start_task(WorkerTask&& task)
{
m_mutex.lock();
VERIFY(!is_in_state(State::Stopped));
bool start_work = false;
if (is_in_state(State::Idle)) {
start_work = true;
} else if (m_state.template has<ErrorType>()) {
WORKER_LOG("Starting task and ignoring previous error: {}", m_state.template get<ErrorType>().string_literal());
start_work = true;
}
if (start_work) {
WORKER_LOG("Queuing task on {}", m_id);
m_state = move(task);
m_condition.broadcast();
}
m_mutex.unlock();
return start_work;
}
ErrorOr<void, ErrorType> wait_until_task_is_finished()
{
WORKER_LOG("Waiting for task to finish on {}...", m_id);
m_mutex.lock();
while (true) {
if (m_state.template has<WorkerTask>() || is_in_state(State::Working)) {
m_condition.wait();
} else if (m_state.template has<ErrorType>()) {
auto error = move(m_state.template get<ErrorType>());
m_state = State::Idle;
m_mutex.unlock();
WORKER_LOG("Finished waiting with error on {}: {}", m_id, error.string_literal());
return error;
} else {
m_mutex.unlock();
WORKER_LOG("Finished waiting on {}", m_id);
return {};
}
}
m_mutex.unlock();
}
private:
#if WORKER_THREAD_DEBUG
static inline size_t current_id = 0;
#endif
WorkerThread()
: m_condition(m_mutex)
#if WORKER_THREAD_DEBUG
, m_id(current_id++)
#endif
{
}
WorkerThread(WorkerThread const&) = delete;
WorkerThread(WorkerThread&&) = delete;
// Must be called with the mutex locked.
bool is_in_state(State state)
{
return m_state.template has<State>() && m_state.template get<State>() == state;
}
RefPtr<Threading::Thread> m_thread;
Threading::Mutex m_mutex;
Threading::ConditionVariable m_condition;
WorkerState m_state { State::Idle };
bool m_stop { false };
#if WORKER_THREAD_DEBUG
size_t m_id;
#endif
};
#undef WORKER_LOG
}
|