summaryrefslogtreecommitdiff
path: root/Tests
diff options
context:
space:
mode:
authorkleines Filmröllchen <filmroellchen@serenityos.org>2022-01-23 23:31:51 +0100
committerLinus Groh <mail@linusgroh.de>2022-04-21 13:55:00 +0200
commit6b13436ef64b8ae91f71b3fed2e8555f733d7998 (patch)
tree2473662c9b2c8962d1bf926c03106299ea336e0f /Tests
parentb0a2572577af0132bd3a9408ea05219fa093a79b (diff)
downloadserenity-6b13436ef64b8ae91f71b3fed2e8555f733d7998.zip
LibCore: Introduce SharedSingleProducerCircularQueue
This new class with an admittedly long OOP-y name provides a circular queue in shared memory. The queue is a lock-free synchronous queue implemented with atomics, and its implementation is significantly simplified by only accounting for one producer (and multiple consumers). It is intended to be used as a producer-consumer communication datastructure across processes. The original motivation behind this class is efficient short-period transfer of audio data in userspace. This class includes formal proofs of several correctness properties of the main queue operations `enqueue` and `dequeue`. These proofs are not 100% complete in their existing form as the invariants they depend on are "handwaved". This seems fine to me right now, as any proof is better than no proof :^). Anyways, the proofs should build confidence that the implemented algorithms, which are only roughly based on existing work, operate correctly in even the worst-case concurrency scenarios.
Diffstat (limited to 'Tests')
-rw-r--r--Tests/LibCore/CMakeLists.txt2
-rw-r--r--Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp203
2 files changed, 205 insertions, 0 deletions
diff --git a/Tests/LibCore/CMakeLists.txt b/Tests/LibCore/CMakeLists.txt
index 922547c80f..e099250646 100644
--- a/Tests/LibCore/CMakeLists.txt
+++ b/Tests/LibCore/CMakeLists.txt
@@ -5,6 +5,7 @@ set(TEST_SOURCES
TestLibCoreDeferredInvoke.cpp
TestLibCoreStream.cpp
TestLibCoreFilePermissionsMask.cpp
+ TestLibCoreSharedSingleProducerCircularQueue.cpp
)
foreach(source IN LISTS TEST_SOURCES)
@@ -13,5 +14,6 @@ endforeach()
# NOTE: Required because of the LocalServer tests
target_link_libraries(TestLibCoreStream LibThreading)
+target_link_libraries(TestLibCoreSharedSingleProducerCircularQueue LibThreading)
install(FILES long_lines.txt 10kb.txt small.txt DESTINATION usr/Tests/LibCore)
diff --git a/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp
new file mode 100644
index 0000000000..c691ee972e
--- /dev/null
+++ b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include "sched.h"
+#include <LibCore/SharedCircularQueue.h>
+#include <LibTest/TestCase.h>
+#include <LibThreading/Thread.h>
+
+using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
+using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
+
+Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
+
+// These first two cases don't multithread at all.
+
+TEST_CASE(simple_enqueue)
+{
+ auto queue = MUST(TestQueue::try_create());
+ for (size_t i = 0; i < queue.size() - 1; ++i)
+ EXPECT(!queue.try_enqueue((int)i).is_error());
+
+ auto result = queue.try_enqueue(0);
+ EXPECT(result.is_error());
+ EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
+}
+
+TEST_CASE(simple_dequeue)
+{
+ auto queue = MUST(TestQueue::try_create());
+ auto const test_count = 10;
+ for (int i = 0; i < test_count; ++i)
+ (void)queue.try_enqueue(i);
+ for (int i = 0; i < test_count; ++i) {
+ auto const element = queue.try_dequeue();
+ EXPECT(!element.is_error());
+ EXPECT_EQ(element.value(), i);
+ }
+}
+
+// There is one parallel consumer, but nobody is producing at the same time.
+TEST_CASE(simple_multithread)
+{
+ auto queue = MUST(TestQueue::try_create());
+ auto const test_count = 10;
+
+ for (int i = 0; i < test_count; ++i)
+ (void)queue.try_enqueue(i);
+
+ auto second_thread = Threading::Thread::construct([&queue]() {
+ auto copied_queue = queue;
+ for (int i = 0; i < test_count; ++i) {
+ QueueError result = TestQueue::QueueStatus::Invalid;
+ do {
+ result = copied_queue.try_dequeue();
+ if (!result.is_error())
+ EXPECT_EQ(result.value(), i);
+ } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
+
+ if (result.is_error())
+ FAIL("Unexpected error while dequeueing.");
+ }
+ return 0;
+ });
+ second_thread->start();
+ (void)second_thread->join();
+
+ EXPECT_EQ(queue.weak_used(), (size_t)0);
+}
+
+// There is one parallel consumer and one parallel producer.
+TEST_CASE(producer_consumer_multithread)
+{
+ auto queue = MUST(TestQueue::try_create());
+ // Ensure that we have the possibility of filling the queue up.
+ auto const test_count = queue.size() * 4;
+
+ Atomic<bool> other_thread_running { false };
+
+ auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
+ auto copied_queue = queue;
+ other_thread_running.store(true);
+ for (size_t i = 0; i < test_count; ++i) {
+ QueueError result = TestQueue::QueueStatus::Invalid;
+ do {
+ result = copied_queue.try_dequeue();
+ if (!result.is_error())
+ EXPECT_EQ(result.value(), (int)i);
+ } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
+
+ if (result.is_error())
+ FAIL("Unexpected error while dequeueing.");
+ }
+ return 0;
+ });
+ second_thread->start();
+
+ while (!other_thread_running.load())
+ ;
+
+ for (size_t i = 0; i < test_count; ++i) {
+ ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
+ do {
+ result = queue.try_enqueue((int)i);
+ } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
+
+ if (result.is_error())
+ FAIL("Unexpected error while enqueueing.");
+ }
+
+ (void)second_thread->join();
+
+ EXPECT_EQ(queue.weak_used(), (size_t)0);
+}
+
+// There are multiple parallel consumers, but nobody is producing at the same time.
+TEST_CASE(multi_consumer)
+{
+ auto queue = MUST(TestQueue::try_create());
+ // This needs to be divisible by 4!
+ size_t const test_count = queue.size() - 4;
+ Atomic<size_t> dequeue_count = 0;
+
+ auto threads = {
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ };
+
+ for (size_t i = 0; i < test_count; ++i)
+ (void)queue.try_enqueue((int)i);
+
+ for (auto thread : threads)
+ thread->start();
+ for (auto thread : threads)
+ (void)thread->join();
+
+ EXPECT_EQ(queue.weak_used(), (size_t)0);
+ EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
+}
+
+// There are multiple parallel consumers and one parallel producer.
+TEST_CASE(single_producer_multi_consumer)
+{
+ auto queue = MUST(TestQueue::try_create());
+ // Choose a higher number to provoke possible race conditions.
+ size_t const test_count = queue.size() * 8;
+ Atomic<size_t> dequeue_count = 0;
+
+ auto threads = {
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
+ };
+ for (auto thread : threads)
+ thread->start();
+
+ for (size_t i = 0; i < test_count; ++i) {
+ ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
+ do {
+ result = queue.try_enqueue((int)i);
+ // After we put something in the first time, let's wait while nobody has dequeued yet.
+ while (dequeue_count.load() == 0)
+ ;
+ // Give others time to do something.
+ sched_yield();
+ } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
+
+ if (result.is_error())
+ FAIL("Unexpected error while enqueueing.");
+ }
+
+ for (auto thread : threads)
+ (void)thread->join();
+
+ EXPECT_EQ(queue.weak_used(), (size_t)0);
+ EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
+}
+
+Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
+{
+ return [&queue, &dequeue_count, test_count]() {
+ auto copied_queue = queue;
+ for (size_t i = 0; i < test_count / 4; ++i) {
+ QueueError result = TestQueue::QueueStatus::Invalid;
+ do {
+ result = copied_queue.try_dequeue();
+ if (!result.is_error())
+ dequeue_count.fetch_add(1);
+ // Give others time to do something.
+ sched_yield();
+ } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
+
+ if (result.is_error())
+ FAIL("Unexpected error while dequeueing.");
+ }
+ return (intptr_t)0;
+ };
+}