summaryrefslogtreecommitdiff
path: root/Userland
diff options
context:
space:
mode:
authorTim Schumacher <timschumi@gmx.de>2023-01-25 16:22:51 +0100
committerAndrew Kaster <andrewdkaster@gmail.com>2023-01-29 19:16:44 -0700
commit94f139c1117e2870c45df27085b004723b5b86ca (patch)
tree0117e407e522a768fec59131d9904ecdca9985fb /Userland
parent8464da143987e290fa3d9ad739b491e6adbffee6 (diff)
downloadserenity-94f139c1117e2870c45df27085b004723b5b86ca.zip
AK: Move buffered streams from `LibCore`
Diffstat (limited to 'Userland')
-rw-r--r--Userland/Libraries/LibCore/Stream.h309
1 files changed, 1 insertions, 308 deletions
diff --git a/Userland/Libraries/LibCore/Stream.h b/Userland/Libraries/LibCore/Stream.h
index 58361c5179..404600e602 100644
--- a/Userland/Libraries/LibCore/Stream.h
+++ b/Userland/Libraries/LibCore/Stream.h
@@ -8,6 +8,7 @@
#pragma once
#include <AK/Badge.h>
+#include <AK/BufferedStream.h>
#include <AK/CircularBuffer.h>
#include <AK/DeprecatedStream.h>
#include <AK/DeprecatedString.h>
@@ -462,317 +463,9 @@ private:
PosixSocketHelper m_helper { Badge<LocalSocket> {} };
};
-// Buffered stream wrappers
-
-template<typename T>
-concept StreamLike = IsBaseOf<AK::Stream, T>;
-template<typename T>
-concept SeekableStreamLike = IsBaseOf<SeekableStream, T>;
template<typename T>
concept SocketLike = IsBaseOf<Socket, T>;
-template<typename T>
-class BufferedHelper {
- AK_MAKE_NONCOPYABLE(BufferedHelper);
-
-public:
- template<StreamLike U>
- BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, CircularBuffer buffer)
- : m_stream(move(stream))
- , m_buffer(move(buffer))
- {
- }
-
- BufferedHelper(BufferedHelper&& other)
- : m_stream(move(other.m_stream))
- , m_buffer(move(other.m_buffer))
- {
- }
-
- BufferedHelper& operator=(BufferedHelper&& other)
- {
- m_stream = move(other.m_stream);
- m_buffer = move(other.m_buffer);
- return *this;
- }
-
- template<template<typename> typename BufferedType>
- static ErrorOr<NonnullOwnPtr<BufferedType<T>>> create_buffered(NonnullOwnPtr<T> stream, size_t buffer_size)
- {
- if (!buffer_size)
- return Error::from_errno(EINVAL);
- if (!stream->is_open())
- return Error::from_errno(ENOTCONN);
-
- auto buffer = TRY(CircularBuffer::create_empty(buffer_size));
-
- return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer)));
- }
-
- T& stream() { return *m_stream; }
- T const& stream() const { return *m_stream; }
-
- ErrorOr<Bytes> read(Bytes buffer)
- {
- if (!stream().is_open())
- return Error::from_errno(ENOTCONN);
- if (buffer.is_empty())
- return buffer;
-
- // Fill the internal buffer if it has run dry.
- if (m_buffer.used_space() == 0)
- TRY(populate_read_buffer());
-
- // Let's try to take all we can from the buffer first.
- return m_buffer.read(buffer);
- }
-
- // Reads into the buffer until \n is encountered.
- // The size of the Bytes object is the maximum amount of bytes that will be
- // read. Returns the bytes read as a StringView.
- ErrorOr<StringView> read_line(Bytes buffer)
- {
- return StringView { TRY(read_until(buffer, "\n"sv)) };
- }
-
- ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate)
- {
- return read_until_any_of(buffer, Array { candidate });
- }
-
- template<size_t N>
- ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates)
- {
- if (!stream().is_open())
- return Error::from_errno(ENOTCONN);
-
- if (buffer.is_empty())
- return buffer;
-
- auto const candidate = TRY(find_and_populate_until_any_of(candidates, buffer.size()));
-
- if (stream().is_eof()) {
- if (buffer.size() < m_buffer.used_space()) {
- // Normally, reading from an EOFed stream and receiving bytes
- // would mean that the stream is no longer EOF. However, it's
- // possible with a buffered stream that the user is able to read
- // the buffer contents even when the underlying stream is EOF.
- // We already violate this invariant once by giving the user the
- // chance to read the remaining buffer contents, but if the user
- // doesn't give us a big enough buffer, then we would be
- // violating the invariant twice the next time the user attempts
- // to read, which is No Good. So let's give a descriptive error
- // to the caller about why it can't read.
- return Error::from_errno(EMSGSIZE);
- }
- }
-
- if (candidate.has_value()) {
- auto const read_bytes = m_buffer.read(buffer.trim(candidate->offset));
- TRY(m_buffer.discard(candidate->size));
- return read_bytes;
- }
-
- // If we still haven't found anything, then it's most likely the case
- // that the delimiter ends beyond the length of the caller-passed
- // buffer. Let's just fill the caller's buffer up.
- return m_buffer.read(buffer);
- }
-
- struct Match {
- size_t offset {};
- size_t size {};
- };
-
- template<size_t N>
- ErrorOr<Optional<Match>> find_and_populate_until_any_of(Array<StringView, N> const& candidates, Optional<size_t> max_offset = {})
- {
- Optional<size_t> longest_candidate;
- for (auto& candidate : candidates) {
- if (candidate.length() >= longest_candidate.value_or(candidate.length()))
- longest_candidate = candidate.length();
- }
-
- // The intention here is to try to match all the possible
- // delimiter candidates and try to find the longest one we can
- // remove from the buffer after copying up to the delimiter to the
- // user buffer.
-
- auto const find_candidates = [this, &candidates, &longest_candidate](size_t min_offset, Optional<size_t> max_offset = {}) -> Optional<Match> {
- auto const corrected_minimum_offset = *longest_candidate > min_offset ? 0 : min_offset - *longest_candidate;
- max_offset = max_offset.value_or(m_buffer.used_space());
-
- Optional<size_t> longest_match;
- size_t match_size = 0;
- for (auto& candidate : candidates) {
- auto const result = m_buffer.offset_of(candidate, corrected_minimum_offset, *max_offset);
-
- if (result.has_value()) {
- auto previous_match = longest_match.value_or(*result);
- if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) {
- longest_match = result;
- match_size = candidate.length();
- }
- }
- }
-
- if (longest_match.has_value())
- return Match { *longest_match, match_size };
-
- return {};
- };
-
- if (auto first_find = find_candidates(0, max_offset); first_find.has_value())
- return first_find;
-
- auto last_size = m_buffer.used_space();
-
- while (m_buffer.used_space() < max_offset.value_or(m_buffer.capacity())) {
- auto const read_bytes = TRY(populate_read_buffer());
- if (read_bytes == 0)
- break;
-
- if (auto first_find = find_candidates(last_size, max_offset); first_find.has_value())
- return first_find;
- last_size = m_buffer.used_space();
- }
-
- return Optional<Match> {};
- }
-
- // Returns whether a line can be read, populating the buffer in the process.
- ErrorOr<bool> can_read_line()
- {
- if (stream().is_eof())
- return m_buffer.used_space() > 0;
-
- return TRY(find_and_populate_until_any_of(Array<StringView, 1> { "\n"sv })).has_value();
- }
-
- bool is_eof() const
- {
- if (m_buffer.used_space() > 0) {
- return false;
- }
-
- return stream().is_eof();
- }
-
- size_t buffer_size() const
- {
- return m_buffer.capacity();
- }
-
- size_t buffered_data_size() const
- {
- return m_buffer.used_space();
- }
-
- void clear_buffer()
- {
- m_buffer.clear();
- }
-
- ErrorOr<void> discard_bytes(size_t count)
- {
- return m_buffer.discard(count);
- }
-
-private:
- ErrorOr<size_t> populate_read_buffer()
- {
- if (m_buffer.empty_space() == 0)
- return 0;
-
- // TODO: Figure out if we can do direct writes in a comfortable way.
- Array<u8, 1024> temporary_buffer;
- auto const fillable_slice = temporary_buffer.span().trim(min(temporary_buffer.size(), m_buffer.empty_space()));
- size_t nread = 0;
- do {
- auto result = stream().read(fillable_slice);
- if (result.is_error()) {
- if (!result.error().is_errno())
- return result.error();
- if (result.error().code() == EINTR)
- continue;
- if (result.error().code() == EAGAIN)
- break;
- return result.error();
- }
- auto const filled_slice = result.value();
- VERIFY(m_buffer.write(filled_slice) == filled_slice.size());
- nread += filled_slice.size();
- break;
- } while (true);
- return nread;
- }
-
- NonnullOwnPtr<T> m_stream;
- CircularBuffer m_buffer;
-};
-
-// NOTE: A Buffered which accepts any Stream could be added here, but it is not
-// needed at the moment.
-
-template<SeekableStreamLike T>
-class BufferedSeekable final : public SeekableStream {
- friend BufferedHelper<T>;
-
-public:
- static ErrorOr<NonnullOwnPtr<BufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16384)
- {
- return BufferedHelper<T>::template create_buffered<BufferedSeekable>(move(stream), buffer_size);
- }
-
- BufferedSeekable(BufferedSeekable&& other) = default;
- BufferedSeekable& operator=(BufferedSeekable&& other) = default;
-
- virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_helper.read(move(buffer)); }
- virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.stream().write(buffer); }
- virtual bool is_eof() const override { return m_helper.is_eof(); }
- virtual bool is_open() const override { return m_helper.stream().is_open(); }
- virtual void close() override { m_helper.stream().close(); }
- virtual ErrorOr<size_t> seek(i64 offset, SeekMode mode) override
- {
- if (mode == SeekMode::FromCurrentPosition) {
- // If possible, seek using the buffer alone.
- if (0 <= offset && static_cast<u64>(offset) <= m_helper.buffered_data_size()) {
- MUST(m_helper.discard_bytes(offset));
- return TRY(m_helper.stream().tell()) - m_helper.buffered_data_size();
- }
-
- offset = offset - m_helper.buffered_data_size();
- }
-
- auto result = TRY(m_helper.stream().seek(offset, mode));
- m_helper.clear_buffer();
-
- return result;
- }
- virtual ErrorOr<void> truncate(off_t length) override
- {
- return m_helper.stream().truncate(length);
- }
-
- ErrorOr<StringView> read_line(Bytes buffer) { return m_helper.read_line(move(buffer)); }
- ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) { return m_helper.read_until(move(buffer), move(candidate)); }
- template<size_t N>
- ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of(move(buffer), move(candidates)); }
- ErrorOr<bool> can_read_line() { return m_helper.can_read_line(); }
-
- size_t buffer_size() const { return m_helper.buffer_size(); }
-
- virtual ~BufferedSeekable() override = default;
-
-private:
- BufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer)
- : m_helper(Badge<BufferedSeekable<T>> {}, move(stream), move(buffer))
- {
- }
-
- BufferedHelper<T> m_helper;
-};
-
class BufferedSocketBase : public Socket {
public:
virtual ErrorOr<StringView> read_line(Bytes buffer) = 0;