diff options
author | kleines Filmröllchen <filmroellchen@serenityos.org> | 2022-01-23 23:31:51 +0100 |
---|---|---|
committer | Linus Groh <mail@linusgroh.de> | 2022-04-21 13:55:00 +0200 |
commit | 6b13436ef64b8ae91f71b3fed2e8555f733d7998 (patch) | |
tree | 2473662c9b2c8962d1bf926c03106299ea336e0f /Tests | |
parent | b0a2572577af0132bd3a9408ea05219fa093a79b (diff) | |
download | serenity-6b13436ef64b8ae91f71b3fed2e8555f733d7998.zip |
LibCore: Introduce SharedSingleProducerCircularQueue
This new class with an admittedly long OOP-y name provides a circular
queue in shared memory. The queue is a lock-free synchronous queue
implemented with atomics, and its implementation is significantly
simplified by only accounting for one producer (and multiple consumers).
It is intended to be used as a producer-consumer communication
datastructure across processes. The original motivation behind this
class is efficient short-period transfer of audio data in userspace.
This class includes formal proofs of several correctness properties of
the main queue operations `enqueue` and `dequeue`. These proofs are not
100% complete in their existing form as the invariants they depend on
are "handwaved". This seems fine to me right now, as any proof is better
than no proof :^). Anyways, the proofs should build confidence that the
implemented algorithms, which are only roughly based on existing work,
operate correctly in even the worst-case concurrency scenarios.
Diffstat (limited to 'Tests')
-rw-r--r-- | Tests/LibCore/CMakeLists.txt | 2 | ||||
-rw-r--r-- | Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp | 203 |
2 files changed, 205 insertions, 0 deletions
diff --git a/Tests/LibCore/CMakeLists.txt b/Tests/LibCore/CMakeLists.txt index 922547c80f..e099250646 100644 --- a/Tests/LibCore/CMakeLists.txt +++ b/Tests/LibCore/CMakeLists.txt @@ -5,6 +5,7 @@ set(TEST_SOURCES TestLibCoreDeferredInvoke.cpp TestLibCoreStream.cpp TestLibCoreFilePermissionsMask.cpp + TestLibCoreSharedSingleProducerCircularQueue.cpp ) foreach(source IN LISTS TEST_SOURCES) @@ -13,5 +14,6 @@ endforeach() # NOTE: Required because of the LocalServer tests target_link_libraries(TestLibCoreStream LibThreading) +target_link_libraries(TestLibCoreSharedSingleProducerCircularQueue LibThreading) install(FILES long_lines.txt 10kb.txt small.txt DESTINATION usr/Tests/LibCore) diff --git a/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp new file mode 100644 index 0000000000..c691ee972e --- /dev/null +++ b/Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org> + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include "sched.h" +#include <LibCore/SharedCircularQueue.h> +#include <LibTest/TestCase.h> +#include <LibThreading/Thread.h> + +using TestQueue = Core::SharedSingleProducerCircularQueue<int>; +using QueueError = ErrorOr<int, TestQueue::QueueStatus>; + +Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count); + +// These first two cases don't multithread at all. + +TEST_CASE(simple_enqueue) +{ + auto queue = MUST(TestQueue::try_create()); + for (size_t i = 0; i < queue.size() - 1; ++i) + EXPECT(!queue.try_enqueue((int)i).is_error()); + + auto result = queue.try_enqueue(0); + EXPECT(result.is_error()); + EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full); +} + +TEST_CASE(simple_dequeue) +{ + auto queue = MUST(TestQueue::try_create()); + auto const test_count = 10; + for (int i = 0; i < test_count; ++i) + (void)queue.try_enqueue(i); + for (int i = 0; i < test_count; ++i) { + auto const element = queue.try_dequeue(); + EXPECT(!element.is_error()); + EXPECT_EQ(element.value(), i); + } +} + +// There is one parallel consumer, but nobody is producing at the same time. +TEST_CASE(simple_multithread) +{ + auto queue = MUST(TestQueue::try_create()); + auto const test_count = 10; + + for (int i = 0; i < test_count; ++i) + (void)queue.try_enqueue(i); + + auto second_thread = Threading::Thread::construct([&queue]() { + auto copied_queue = queue; + for (int i = 0; i < test_count; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + EXPECT_EQ(result.value(), i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return 0; + }); + second_thread->start(); + (void)second_thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); +} + +// There is one parallel consumer and one parallel producer. +TEST_CASE(producer_consumer_multithread) +{ + auto queue = MUST(TestQueue::try_create()); + // Ensure that we have the possibility of filling the queue up. + auto const test_count = queue.size() * 4; + + Atomic<bool> other_thread_running { false }; + + auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() { + auto copied_queue = queue; + other_thread_running.store(true); + for (size_t i = 0; i < test_count; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + EXPECT_EQ(result.value(), (int)i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return 0; + }); + second_thread->start(); + + while (!other_thread_running.load()) + ; + + for (size_t i = 0; i < test_count; ++i) { + ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; + do { + result = queue.try_enqueue((int)i); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); + + if (result.is_error()) + FAIL("Unexpected error while enqueueing."); + } + + (void)second_thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); +} + +// There are multiple parallel consumers, but nobody is producing at the same time. +TEST_CASE(multi_consumer) +{ + auto queue = MUST(TestQueue::try_create()); + // This needs to be divisible by 4! + size_t const test_count = queue.size() - 4; + Atomic<size_t> dequeue_count = 0; + + auto threads = { + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + }; + + for (size_t i = 0; i < test_count; ++i) + (void)queue.try_enqueue((int)i); + + for (auto thread : threads) + thread->start(); + for (auto thread : threads) + (void)thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); + EXPECT_EQ(dequeue_count.load(), (size_t)test_count); +} + +// There are multiple parallel consumers and one parallel producer. +TEST_CASE(single_producer_multi_consumer) +{ + auto queue = MUST(TestQueue::try_create()); + // Choose a higher number to provoke possible race conditions. + size_t const test_count = queue.size() * 8; + Atomic<size_t> dequeue_count = 0; + + auto threads = { + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)), + }; + for (auto thread : threads) + thread->start(); + + for (size_t i = 0; i < test_count; ++i) { + ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid; + do { + result = queue.try_enqueue((int)i); + // After we put something in the first time, let's wait while nobody has dequeued yet. + while (dequeue_count.load() == 0) + ; + // Give others time to do something. + sched_yield(); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full); + + if (result.is_error()) + FAIL("Unexpected error while enqueueing."); + } + + for (auto thread : threads) + (void)thread->join(); + + EXPECT_EQ(queue.weak_used(), (size_t)0); + EXPECT_EQ(dequeue_count.load(), (size_t)test_count); +} + +Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count) +{ + return [&queue, &dequeue_count, test_count]() { + auto copied_queue = queue; + for (size_t i = 0; i < test_count / 4; ++i) { + QueueError result = TestQueue::QueueStatus::Invalid; + do { + result = copied_queue.try_dequeue(); + if (!result.is_error()) + dequeue_count.fetch_add(1); + // Give others time to do something. + sched_yield(); + } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty); + + if (result.is_error()) + FAIL("Unexpected error while dequeueing."); + } + return (intptr_t)0; + }; +} |