summaryrefslogtreecommitdiff
path: root/AK/CircularDuplexStream.h
blob: d581c9e79c961666f07ed4a25ff287a2ce073d1d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/*
 * Copyright (c) 2020, the SerenityOS developers.
 *
 * SPDX-License-Identifier: BSD-2-Clause
 */

#pragma once

#include <AK/CircularQueue.h>
#include <AK/Stream.h>

namespace AK {

// FIXME: There are a lot of raw loops here, that's not necessary an issue but it
//        has to be verified that the optimizer is able to insert memcpy instead.
template<size_t Capacity>
class CircularDuplexStream : public AK::DuplexStream {
public:
    size_t write(ReadonlyBytes bytes) override
    {
        const auto nwritten = min(bytes.size(), Capacity - m_queue.size());

        for (size_t idx = 0; idx < nwritten; ++idx)
            m_queue.enqueue(bytes[idx]);

        m_total_written += nwritten;
        return nwritten;
    }

    bool write_or_error(ReadonlyBytes bytes) override
    {
        if (Capacity - m_queue.size() < bytes.size()) {
            set_recoverable_error();
            return false;
        }

        const auto nwritten = write(bytes);
        VERIFY(nwritten == bytes.size());
        return true;
    }

    size_t read(Bytes bytes) override
    {
        if (has_any_error())
            return 0;

        const auto nread = min(bytes.size(), m_queue.size());

        for (size_t idx = 0; idx < nread; ++idx)
            bytes[idx] = m_queue.dequeue();

        return nread;
    }

    size_t read(Bytes bytes, size_t seekback)
    {
        if (seekback > Capacity || seekback > m_total_written) {
            set_recoverable_error();
            return 0;
        }

        const auto nread = min(bytes.size(), seekback);

        for (size_t idx = 0; idx < nread; ++idx) {
            const auto index = (m_total_written - seekback + idx) % Capacity;
            bytes[idx] = m_queue.m_storage[index];
        }

        return nread;
    }

    bool read_or_error(Bytes bytes) override
    {
        if (m_queue.size() < bytes.size()) {
            set_recoverable_error();
            return false;
        }

        read(bytes);
        return true;
    }

    bool discard_or_error(size_t count) override
    {
        if (m_queue.size() < count) {
            set_recoverable_error();
            return false;
        }

        for (size_t idx = 0; idx < count; ++idx)
            m_queue.dequeue();

        return true;
    }

    bool unreliable_eof() const override { return eof(); }
    bool eof() const { return m_queue.size() == 0; }

    size_t remaining_contigous_space() const
    {
        return min(Capacity - m_queue.size(), m_queue.capacity() - (m_queue.head_index() + m_queue.size()) % Capacity);
    }

    Bytes reserve_contigous_space(size_t count)
    {
        VERIFY(count <= remaining_contigous_space());

        Bytes bytes { m_queue.m_storage + (m_queue.head_index() + m_queue.size()) % Capacity, count };

        m_queue.m_size += count;
        m_total_written += count;

        return bytes;
    }

private:
    CircularQueue<u8, Capacity> m_queue;
    size_t m_total_written { 0 };
};

}

using AK::CircularDuplexStream;