blob: e6d9b23826d1d116b342bab1d8141fd3265fc7e0 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
/*
* Copyright (c) 2020, the SerenityOS developers.
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/CircularQueue.h>
#include <AK/Stream.h>
namespace AK {
// FIXME: There are a lot of raw loops here, that's not necessary an issue but it
// has to be verified that the optimizer is able to insert memcpy instead.
template<size_t Capacity>
class CircularDuplexStream : public AK::DuplexStream {
public:
size_t write(ReadonlyBytes bytes) override
{
const auto nwritten = min(bytes.size(), Capacity - m_queue.size());
for (size_t idx = 0; idx < nwritten; ++idx)
m_queue.enqueue(bytes[idx]);
m_total_written += nwritten;
return nwritten;
}
bool write_or_error(ReadonlyBytes bytes) override
{
if (Capacity - m_queue.size() < bytes.size()) {
set_recoverable_error();
return false;
}
const auto nwritten = write(bytes);
VERIFY(nwritten == bytes.size());
return true;
}
size_t read(Bytes bytes) override
{
if (has_any_error())
return 0;
const auto nread = min(bytes.size(), m_queue.size());
for (size_t idx = 0; idx < nread; ++idx)
bytes[idx] = m_queue.dequeue();
return nread;
}
size_t read(Bytes bytes, size_t seekback)
{
if (seekback > Capacity || seekback > m_total_written) {
set_recoverable_error();
return 0;
}
const auto nread = min(bytes.size(), seekback);
for (size_t idx = 0; idx < nread; ++idx) {
const auto index = (m_total_written - seekback + idx) % Capacity;
bytes[idx] = m_queue.m_storage[index];
}
return nread;
}
bool read_or_error(Bytes bytes) override
{
if (m_queue.size() < bytes.size()) {
set_recoverable_error();
return false;
}
read(bytes);
return true;
}
bool discard_or_error(size_t count) override
{
if (m_queue.size() < count) {
set_recoverable_error();
return false;
}
for (size_t idx = 0; idx < count; ++idx)
m_queue.dequeue();
return true;
}
bool unreliable_eof() const override { return eof(); }
bool eof() const { return m_queue.size() == 0; }
size_t remaining_contiguous_space() const
{
return min(Capacity - m_queue.size(), m_queue.capacity() - (m_queue.head_index() + m_queue.size()) % Capacity);
}
Bytes reserve_contiguous_space(size_t count)
{
VERIFY(count <= remaining_contiguous_space());
Bytes bytes { m_queue.m_storage + (m_queue.head_index() + m_queue.size()) % Capacity, count };
m_queue.m_size += count;
m_total_written += count;
return bytes;
}
private:
CircularQueue<u8, Capacity> m_queue;
size_t m_total_written { 0 };
};
}
using AK::CircularDuplexStream;
|