diff options
author | Matthew Olsson <matthewcolsson@gmail.com> | 2023-03-28 18:56:11 -0700 |
---|---|---|
committer | Linus Groh <mail@linusgroh.de> | 2023-04-01 23:43:07 +0100 |
commit | bc9919178ee99a843aa081b5af38e1ef74546d97 (patch) | |
tree | 1ad8d40d1e4c246464a6927e5283eabd4f7f1ea4 /Userland/Libraries/LibWeb/Streams | |
parent | f99d6e177f564064eef42a93e832ade22e7c3a99 (diff) | |
download | serenity-bc9919178ee99a843aa081b5af38e1ef74546d97.zip |
LibWeb: Add ReadableStreamDefaultController
Diffstat (limited to 'Userland/Libraries/LibWeb/Streams')
7 files changed, 652 insertions, 10 deletions
diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index fc9bc45020..ac51a21aed 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -5,8 +5,12 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include <LibJS/Runtime/PromiseCapability.h> +#include <LibJS/Runtime/PromiseConstructor.h> +#include <LibWeb/Bindings/ExceptionOrUtils.h> #include <LibWeb/Streams/AbstractOperations.h> #include <LibWeb/Streams/ReadableStream.h> +#include <LibWeb/Streams/ReadableStreamDefaultController.h> #include <LibWeb/Streams/ReadableStreamDefaultReader.h> #include <LibWeb/Streams/ReadableStreamGenericReader.h> #include <LibWeb/WebIDL/ExceptionOr.h> @@ -53,12 +57,71 @@ WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> readable_stream_cancel(Re // 2. Set reader.[[readIntoRequests]] to an empty list. // 3. For each readIntoRequest of readIntoRequests, // 1. Perform readIntoRequest’s close steps, given undefined. + (void)reader; + // 7. Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason). + auto source_cancel_promise = MUST(stream.controller()->cancel_steps(reason)); + // 8. Return the result of reacting to sourceCancelPromise with a fulfillment step that returns undefined. - (void)reader; - (void)reason; + auto react_result = WebIDL::react_to_promise(*source_cancel_promise, + [](auto const&) -> WebIDL::ExceptionOr<JS::Value> { return JS::js_undefined(); }, + {}); + + return WebIDL::create_resolved_promise(realm, react_result); +} + +// https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request +void readable_stream_fulfill_read_request(ReadableStream& stream, JS::Value chunk, bool done) +{ + // 1. Assert: ! ReadableStreamHasDefaultReader(stream) is true. + VERIFY(readable_stream_has_default_reader(stream)); + + // 2. Let reader be stream.[[reader]]. + auto& reader = *stream.reader(); + + // 3. Assert: reader.[[readRequests]] is not empty. + VERIFY(!reader.read_requests().is_empty()); + + // 4. Let readRequest be reader.[[readRequests]][0]. + // 5. Remove readRequest from reader.[[readRequests]]. + auto read_request = reader.read_requests().take_first(); + + // 6. If done is true, perform readRequest’s close steps. + if (done) { + read_request->on_close(); + } + // 7. Otherwise, perform readRequest’s chunk steps, given chunk. + else { + read_request->on_chunk(chunk); + } +} + +// https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests +size_t readable_stream_get_num_read_requests(ReadableStream& stream) +{ + // 1. Assert: ! ReadableStreamHasDefaultReader(stream) is true. + VERIFY(readable_stream_has_default_reader(stream)); - return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + // 2. Return stream.[[reader]].[[readRequests]]'s size. + return stream.reader()->read_requests().size(); +} + +// https://streams.spec.whatwg.org/#readable-stream-has-default-reader +bool readable_stream_has_default_reader(ReadableStream& stream) +{ + // 1. Let reader be stream.[[reader]]. + auto reader = stream.reader(); + + // 2. If reader is undefined, return false. + if (!reader) + return false; + + // 3. If reader implements ReadableStreamDefaultReader, return true. + if (reader->is_default_reader()) + return true; + + // 4. Return false. + return false; } // https://streams.spec.whatwg.org/#readable-stream-close @@ -96,6 +159,62 @@ void readable_stream_close(ReadableStream& stream) } } +// https://streams.spec.whatwg.org/#readable-stream-error +void readable_stream_error(ReadableStream& stream, JS::Value error) +{ + auto& realm = stream.realm(); + + // 1. Assert: stream.[[state]] is "readable". + VERIFY(stream.is_readable()); + + // 2. Set stream.[[state]] to "errored". + stream.set_stream_state(ReadableStream::State::Errored); + + // 3. Set stream.[[storedError]] to e. + stream.set_stored_error(error); + + // 4. Let reader be stream.[[reader]]. + auto reader = stream.reader(); + + // 5. If reader is undefined, return. + if (!reader) + return; + + // 6. Reject reader.[[closedPromise]] with e. + WebIDL::reject_promise(realm, *reader->closed_promise_capability(), error); + + // 7. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true. + WebIDL::mark_promise_as_handled(*reader->closed_promise_capability()); + + // 8. If reader implements ReadableStreamDefaultReader, + if (reader->is_default_reader()) { + // 1. Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e). + readable_stream_default_reader_error_read_requests(*reader, error); + } + // 9. Otherwise, + else { + // 1. Assert: reader implements ReadableStreamBYOBReader. + // 2. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e). + + // FIXME: Handle BYOBReader + TODO(); + } +} + +// https://streams.spec.whatwg.org/#readable-stream-add-read-request +void readable_stream_add_read_request(ReadableStream& stream, ReadRequest const& read_request) +{ + // FIXME: Check implementation type + // 1. Assert: stream.[[reader]] implements ReadableStreamDefaultReader. + VERIFY(stream.reader()); + + // 2. Assert: stream.[[state]] is "readable". + VERIFY(stream.is_readable()); + + // 3. Append readRequest to stream.[[reader]].[[readRequests]]. + stream.reader()->read_requests().append(read_request); +} + // https://streams.spec.whatwg.org/#readable-stream-reader-generic-cancel JS::NonnullGCPtr<WebIDL::Promise> readable_stream_reader_generic_cancel(ReadableStreamGenericReaderMixin& reader, JS::Value reason) { @@ -173,7 +292,8 @@ WebIDL::ExceptionOr<void> readable_stream_reader_generic_release(ReadableStreamG // 6. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true. WebIDL::mark_promise_as_handled(*reader.closed_promise_capability()); - // FIXME: 7. Perform ! stream.[[controller]].[[ReleaseSteps]](). + // 7. Perform ! stream.[[controller]].[[ReleaseSteps]](). + stream->controller()->release_steps(); // 8. Set stream.[[reader]] to undefined. stream->set_reader({}); @@ -223,7 +343,8 @@ void readable_stream_default_reader_read(ReadableStreamDefaultReader& reader, Re // 1. Assert: stream.[[state]] is "readable". VERIFY(stream->is_readable()); - // FIXME: 2. Perform ! stream.[[controller]].[[PullSteps]](readRequest). + // 2. Perform ! stream.[[controller]].[[PullSteps]](readRequest). + MUST(stream->controller()->pull_steps(read_request)); } } @@ -256,4 +377,228 @@ WebIDL::ExceptionOr<void> set_up_readable_stream_default_reader(ReadableStreamDe return {}; } +// https://streams.spec.whatwg.org/#readable-stream-default-controller-close +void readable_stream_default_controller_close(ReadableStreamDefaultController& controller) +{ + // 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if (!readable_stream_default_controller_can_close_or_enqueue(controller)) + return; + + // 2. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 3. Set controller.[[closeRequested]] to true. + controller.set_close_requested(true); + + // 4. If controller.[[queue]] is empty, + if (controller.queue().is_empty()) { + // 1. Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + readable_stream_default_controller_clear_algorithms(controller); + + // 2. Perform ! ReadableStreamClose(stream). + readable_stream_close(*stream); + } +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue +WebIDL::ExceptionOr<void> readable_stream_default_controller_enqueue(ReadableStreamDefaultController& controller, JS::Value chunk) +{ + auto& vm = controller.vm(); + + // 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return. + if (!readable_stream_default_controller_can_close_or_enqueue(controller)) + return {}; + + // 2. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 3. If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, perform ! ReadableStreamFulfillReadRequest(stream, chunk, false). + if (is_readable_stream_locked(*stream) && readable_stream_get_num_read_requests(*stream) > 0) { + readable_stream_fulfill_read_request(*stream, chunk, false); + } + // 4. Otherwise, + else { + // 1. Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as a completion record. + auto result = (*controller.strategy_size_algorithm())(chunk); + + // 2. If result is an abrupt completion, + if (result.is_abrupt()) { + // 1. Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]). + readable_stream_default_controller_error(controller, result.value().value()); + + // 2. Return result. + return result; + } + + // 3. Let chunkSize be result.[[Value]]. + auto chunk_size = TRY(result.release_value().release_value().to_double(vm)); + + // 4. Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize). + auto enqueue_result = enqueue_value_with_size(controller, chunk, chunk_size); + + // 5. If enqueueResult is an abrupt completion, + if (enqueue_result.is_error()) { + auto throw_completion = Bindings::throw_dom_exception_if_needed(vm, [&] { return enqueue_result; }).throw_completion(); + + // 1. Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]). + readable_stream_default_controller_error(controller, throw_completion.value().value()); + + // 2. Return enqueueResult. + return enqueue_result; + } + } + + // 5. Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + return readable_stream_default_controller_can_pull_if_needed(controller); +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed +WebIDL::ExceptionOr<void> readable_stream_default_controller_can_pull_if_needed(ReadableStreamDefaultController& controller) +{ + // 1. Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller). + auto should_pull = readable_stream_default_controller_should_call_pull(controller); + + // 2. If shouldPull is false, return. + if (!should_pull) + return {}; + + // 3. If controller.[[pulling]] is true, + if (controller.pulling()) { + // 1. Set controller.[[pullAgain]] to true. + controller.set_pull_again(true); + + // 2. Return. + return {}; + } + + // 4. Assert: controller.[[pullAgain]] is false. + VERIFY(!controller.pull_again()); + + // 5. Set controller.[[pulling]] to true. + controller.set_pulling(true); + + // 6. Let pullPromise be the result of performing controller.[[pullAlgorithm]]. + auto pull_promise = TRY((*controller.pull_algorithm())()); + + // 7. Upon fulfillment of pullPromise, + WebIDL::upon_fulfillment(*pull_promise, [&](auto const&) -> WebIDL::ExceptionOr<JS::Value> { + // 1. Set controller.[[pulling]] to false. + controller.set_pulling(false); + + // 2. If controller.[[pullAgain]] is true, + if (controller.pull_again()) { + // 1. Set controller.[[pullAgain]] to false. + controller.set_pull_again(false); + + // 2. Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller). + TRY(readable_stream_default_controller_can_pull_if_needed(controller)); + } + + return JS::js_undefined(); + }); + + // 8. Upon rejection of pullPromise with reason e, + WebIDL::upon_rejection(*pull_promise, [&](auto const& e) -> WebIDL::ExceptionOr<JS::Value> { + // 1. Perform ! ReadableStreamDefaultControllerError(controller, e). + readable_stream_default_controller_error(controller, e); + + return JS::js_undefined(); + }); + + return {}; +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull +bool readable_stream_default_controller_should_call_pull(ReadableStreamDefaultController& controller) +{ + // 1. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 2. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return false. + if (!readable_stream_default_controller_can_close_or_enqueue(controller)) + return false; + + // 3. If controller.[[started]] is false, return false. + if (!controller.started()) + return false; + + // 4. If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, return true. + if (is_readable_stream_locked(*stream) && readable_stream_get_num_read_requests(*stream) > 0) + return true; + + // 5. Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller). + auto desired_size = readable_stream_default_controller_get_desired_size(controller); + + // 6. Assert: desiredSize is not null. + VERIFY(desired_size.has_value()); + + // 7. If desiredSize > 0, return true. + if (desired_size.release_value() > 0) + return true; + + // 8. Return false. + return false; +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms +void readable_stream_default_controller_clear_algorithms(ReadableStreamDefaultController& controller) +{ + // 1. Set controller.[[pullAlgorithm]] to undefined. + controller.set_pull_algorithm({}); + + // 2. Set controller.[[cancelAlgorithm]] to undefined. + controller.set_cancel_algorithm({}); + + // 3. Set controller.[[strategySizeAlgorithm]] to undefined. + controller.set_strategy_size_algorithm({}); +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-error +void readable_stream_default_controller_error(ReadableStreamDefaultController& controller, JS::Value error) +{ + // 1. Let stream be controller.[[stream]]. + auto stream = controller.stream(); + + // 2. If stream.[[state]] is not "readable", return. + if (!stream->is_readable()) + return; + + // 3. Perform ! ResetQueue(controller). + reset_queue(controller); + + // 4. Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller). + readable_stream_default_controller_clear_algorithms(controller); + + // 5. Perform ! ReadableStreamError(stream, e). + readable_stream_error(*stream, error); +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size +Optional<float> readable_stream_default_controller_get_desired_size(ReadableStreamDefaultController& controller) +{ + auto stream = controller.stream(); + + // 1. Let state be controller.[[stream]].[[state]]. + + // 2. If state is "errored", return null. + if (stream->is_errored()) + return {}; + + // 3. If state is "closed", return 0. + if (stream->is_closed()) + return 0.0f; + + // 4. Return controller.[[strategyHWM]] − controller.[[queueTotalSize]]. + return controller.strategy_hwm() - controller.queue_total_size(); +} + +// https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue +bool readable_stream_default_controller_can_close_or_enqueue(ReadableStreamDefaultController& controller) +{ + // 1. Let state be controller.[[stream]].[[state]]. + // 2. If controller.[[closeRequested]] is false and state is "readable", return true. + // 3. Otherwise, return false. + return !controller.close_requested() && controller.stream()->is_readable(); +} + } diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index e419bc2403..33dea67b50 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -14,10 +14,19 @@ namespace Web::Streams { +using SizeAlgorithm = JS::SafeFunction<JS::Completion(JS::Value)>; +using PullAlgorithm = JS::SafeFunction<WebIDL::ExceptionOr<JS::GCPtr<WebIDL::Promise>>()>; +using CancelAlgorithm = JS::SafeFunction<WebIDL::ExceptionOr<JS::GCPtr<WebIDL::Promise>>(JS::Value)>; + bool is_readable_stream_locked(ReadableStream const&); void readable_stream_close(ReadableStream&); +void readable_stream_error(ReadableStream&, JS::Value error); +void readable_stream_add_read_request(ReadableStream&, ReadRequest const&); WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> readable_stream_cancel(ReadableStream&, JS::Value reason); +void readable_stream_fulfill_read_request(ReadableStream&, JS::Value chunk, bool done); +size_t readable_stream_get_num_read_requests(ReadableStream&); +bool readable_stream_has_default_reader(ReadableStream&); JS::NonnullGCPtr<WebIDL::Promise> readable_stream_reader_generic_cancel(ReadableStreamGenericReaderMixin&, JS::Value reason); void readable_stream_reader_generic_initialize(ReadableStreamGenericReaderMixin&, ReadableStream&); @@ -27,5 +36,14 @@ void readable_stream_default_reader_error_read_requests(ReadableStreamDefaultRea void readable_stream_default_reader_read(ReadableStreamDefaultReader&, ReadRequest&); WebIDL::ExceptionOr<void> readable_stream_default_reader_release(ReadableStreamDefaultReader&); WebIDL::ExceptionOr<void> set_up_readable_stream_default_reader(ReadableStreamDefaultReader&, ReadableStream&); +void readable_stream_default_controller_close(ReadableStreamDefaultController&); +WebIDL::ExceptionOr<void> readable_stream_default_controller_enqueue(ReadableStreamDefaultController&, JS::Value chunk); +WebIDL::ExceptionOr<void> readable_stream_default_controller_can_pull_if_needed(ReadableStreamDefaultController&); +bool readable_stream_default_controller_should_call_pull(ReadableStreamDefaultController&); +void readable_stream_default_controller_clear_algorithms(ReadableStreamDefaultController&); + +void readable_stream_default_controller_error(ReadableStreamDefaultController&, JS::Value error); +Optional<float> readable_stream_default_controller_get_desired_size(ReadableStreamDefaultController&); +bool readable_stream_default_controller_can_close_or_enqueue(ReadableStreamDefaultController&); } diff --git a/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp b/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp index 6e96ec2c2e..8eda868bee 100644 --- a/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp +++ b/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp @@ -7,6 +7,7 @@ #include <LibWeb/Bindings/Intrinsics.h> #include <LibWeb/Streams/AbstractOperations.h> #include <LibWeb/Streams/ReadableStream.h> +#include <LibWeb/Streams/ReadableStreamDefaultController.h> #include <LibWeb/Streams/ReadableStreamDefaultReader.h> #include <LibWeb/WebIDL/ExceptionOr.h> diff --git a/Userland/Libraries/LibWeb/Streams/ReadableStream.h b/Userland/Libraries/LibWeb/Streams/ReadableStream.h index 174a790f74..790a1ac78d 100644 --- a/Userland/Libraries/LibWeb/Streams/ReadableStream.h +++ b/Userland/Libraries/LibWeb/Streams/ReadableStream.h @@ -17,6 +17,10 @@ namespace Web::Streams { // https://streams.spec.whatwg.org/#typedefdef-readablestreamreader using ReadableStreamReader = JS::GCPtr<ReadableStreamDefaultReader>; +// FIXME: Variant<DefaultController, ByteStreamController> +// https://streams.spec.whatwg.org/#typedefdef-readablestreamcontroller +using ReadableStreamController = JS::GCPtr<ReadableStreamDefaultController>; + // https://streams.spec.whatwg.org/#readablestream class ReadableStream final : public Bindings::PlatformObject { WEB_PLATFORM_OBJECT(ReadableStream, Bindings::PlatformObject); @@ -32,19 +36,22 @@ public: virtual ~ReadableStream() override; - JS::GCPtr<JS::Object> controller() const { return m_controller; } + ReadableStreamController controller() { return m_controller; } + void set_controller(ReadableStreamController value) { m_controller = value; } + JS::Value stored_error() const { return m_stored_error; } + void set_stored_error(JS::Value value) { m_stored_error = value; } ReadableStreamReader reader() const { return m_reader; } void set_reader(ReadableStreamReader value) { m_reader = value; } + bool is_disturbed() const; + void set_disturbed(bool value) { m_disturbed = value; } + bool is_readable() const; bool is_closed() const; bool is_errored() const; bool is_locked() const; - bool is_disturbed() const; - - void set_disturbed(bool value) { m_disturbed = value; } void set_stream_state(State value) { m_state = value; } private: @@ -55,7 +62,7 @@ private: // https://streams.spec.whatwg.org/#readablestream-controller // A ReadableStreamDefaultController or ReadableByteStreamController created with the ability to control the state and queue of this stream - JS::GCPtr<JS::Object> m_controller; + ReadableStreamController m_controller; // https://streams.spec.whatwg.org/#readablestream-detached // A boolean flag set to true when the stream is transferred diff --git a/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.cpp b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.cpp new file mode 100644 index 0000000000..e0c20b6b8e --- /dev/null +++ b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.cpp @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2023, Matthew Olsson <mattco@serenityos.org> + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#include <LibJS/SafeFunction.h> +#include <LibWeb/Bindings/Intrinsics.h> +#include <LibWeb/Bindings/ReadableStreamDefaultControllerPrototype.h> +#include <LibWeb/Streams/AbstractOperations.h> +#include <LibWeb/Streams/ReadableStream.h> +#include <LibWeb/Streams/ReadableStreamDefaultController.h> +#include <LibWeb/Streams/ReadableStreamDefaultReader.h> +#include <LibWeb/WebIDL/ExceptionOr.h> +#include <LibWeb/WebIDL/Promise.h> + +namespace Web::Streams { + +ReadableStreamDefaultController::ReadableStreamDefaultController(JS::Realm& realm) + : Bindings::PlatformObject(realm) +{ +} + +// https://streams.spec.whatwg.org/#rs-default-controller-desired-size +Optional<float> ReadableStreamDefaultController::desired_size() +{ + // 1. Return ! ReadableStreamDefaultControllerGetDesiredSize(this). + return readable_stream_default_controller_get_desired_size(*this); +} + +// https://streams.spec.whatwg.org/#rs-default-controller-close +WebIDL::ExceptionOr<void> ReadableStreamDefaultController::close() +{ + // 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception. + if (!readable_stream_default_controller_can_close_or_enqueue(*this)) { + return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Stream is not closable"sv }; + } + + // 2. Perform ! ReadableStreamDefaultControllerClose(this). + readable_stream_default_controller_close(*this); + + return {}; +} + +// https://streams.spec.whatwg.org/#rs-default-controller-enqueue +WebIDL::ExceptionOr<void> ReadableStreamDefaultController::enqueue(JS::Value chunk) +{ + // 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception. + if (!readable_stream_default_controller_can_close_or_enqueue(*this)) + return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Cannot enqueue chunk to stream"sv }; + + // 2. Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk). + TRY(readable_stream_default_controller_enqueue(*this, chunk)); + + return {}; +} + +// https://streams.spec.whatwg.org/#rs-default-controller-error +void ReadableStreamDefaultController::error(JS::Value error) +{ + // 1. Perform ! ReadableStreamDefaultControllerError(this, e). + readable_stream_default_controller_error(*this, error); +} + +// https://streams.spec.whatwg.org/#rs-default-controller-private-cancel +WebIDL::ExceptionOr<JS::GCPtr<WebIDL::Promise>> ReadableStreamDefaultController::cancel_steps(JS::Value reason) +{ + // 1. Perform ! ResetQueue(this). + reset_queue(*this); + + // 2. Let result be the result of performing this.[[cancelAlgorithm]], passing reason. + auto result = (*cancel_algorithm())(reason); + + // 3. Perform ! ReadableStreamDefaultControllerClearAlgorithms(this). + readable_stream_default_controller_clear_algorithms(*this); + + // 4. Return result. + return result; +} + +// https://streams.spec.whatwg.org/#rs-default-controller-private-pull +WebIDL::ExceptionOr<void> ReadableStreamDefaultController::pull_steps(Web::Streams::ReadRequest& read_request) +{ + // 1. Let stream be this.[[stream]]. + auto& stream = *m_stream; + + // 2. If this.[[queue]] is not empty, + if (!m_queue.is_empty()) { + // 1. Let chunk be ! DequeueValue(this). + auto chunk = dequeue_value(*this); + + // 2. If this.[[closeRequested]] is true and this.[[queue]] is empty, + if (m_close_requested && m_queue.is_empty()) { + // 1. Perform ! ReadableStreamDefaultControllerClearAlgorithms(this). + readable_stream_default_controller_clear_algorithms(*this); + + // 2. Perform ! ReadableStreamClose(stream). + readable_stream_close(stream); + } + // 3. Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + else { + TRY(readable_stream_default_controller_can_pull_if_needed(*this)); + } + + // 4. Perform readRequest’s chunk steps, given chunk. + read_request.on_chunk(chunk); + } + // 3. Otherwise, + else { + // 1. Perform ! ReadableStreamAddReadRequest(stream, readRequest). + readable_stream_add_read_request(stream, read_request); + + // 2. Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this). + TRY(readable_stream_default_controller_can_pull_if_needed(*this)); + } + + return {}; +} + +// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultcontroller-releasesteps +void ReadableStreamDefaultController::release_steps() +{ + // 1. Return. +} + +JS::ThrowCompletionOr<void> ReadableStreamDefaultController::initialize(JS::Realm& realm) +{ + MUST_OR_THROW_OOM(Base::initialize(realm)); + set_prototype(&Bindings::ensure_web_prototype<Bindings::ReadableStreamDefaultControllerPrototype>(realm, "ReadableStreamDefaultController")); + + return {}; +} + +void ReadableStreamDefaultController::visit_edges(Cell::Visitor& visitor) +{ + Base::visit_edges(visitor); + for (auto const& item : m_queue) + visitor.visit(item.value); + visitor.visit(m_stream); +} + +} diff --git a/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.h b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.h new file mode 100644 index 0000000000..d731d19f14 --- /dev/null +++ b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2023, Matthew Olsson <mattco@serenityos.org> + * + * SPDX-License-Identifier: BSD-2-Clause + */ + +#pragma once + +#include <AK/Forward.h> +#include <AK/Optional.h> +#include <AK/SinglyLinkedList.h> +#include <LibJS/Forward.h> +#include <LibWeb/Bindings/PlatformObject.h> +#include <LibWeb/Forward.h> +#include <LibWeb/Streams/AbstractOperations.h> +#include <LibWeb/WebIDL/Promise.h> + +namespace Web::Streams { + +// https://streams.spec.whatwg.org/#readablestreamdefaultcontroller +class ReadableStreamDefaultController : public Bindings::PlatformObject { + WEB_PLATFORM_OBJECT(ReadableStreamDefaultController, Bindings::PlatformObject); + +public: + explicit ReadableStreamDefaultController(JS::Realm&); + virtual ~ReadableStreamDefaultController() override = default; + + Optional<float> desired_size(); + + WebIDL::ExceptionOr<void> close(); + WebIDL::ExceptionOr<void> enqueue(JS::Value chunk); + void error(JS::Value error); + + auto& cancel_algorithm() { return m_cancel_algorithm; } + void set_cancel_algorithm(Optional<CancelAlgorithm> value) { m_cancel_algorithm = move(value); } + + bool close_requested() const { return m_close_requested; } + void set_close_requested(bool value) { m_close_requested = value; } + + bool pull_again() const { return m_pull_again; } + void set_pull_again(bool value) { m_pull_again = value; } + + auto& pull_algorithm() { return m_pull_algorithm; } + void set_pull_algorithm(Optional<PullAlgorithm> value) { m_pull_algorithm = move(value); } + + bool pulling() const { return m_pulling; } + void set_pulling(bool value) { m_pulling = value; } + + SinglyLinkedList<ValueWithSize>& queue() { return m_queue; } + + double queue_total_size() const { return m_queue_total_size; } + void set_queue_total_size(double value) { m_queue_total_size = value; } + + bool started() const { return m_started; } + void set_started(bool value) { m_started = value; } + + size_t strategy_hwm() const { return m_strategy_hwm; } + void set_strategy_hwm(size_t value) { m_strategy_hwm = value; } + + auto& strategy_size_algorithm() { return m_strategy_size_algorithm; } + void set_strategy_size_algorithm(Optional<SizeAlgorithm> value) { m_strategy_size_algorithm = move(value); } + + JS::GCPtr<ReadableStream> stream() { return m_stream; } + void set_stream(JS::GCPtr<ReadableStream> value) { m_stream = value; } + + WebIDL::ExceptionOr<JS::GCPtr<WebIDL::Promise>> cancel_steps(JS::Value reason); + WebIDL::ExceptionOr<void> pull_steps(ReadRequest&); + void release_steps(); + +private: + virtual JS::ThrowCompletionOr<void> initialize(JS::Realm&) override; + + virtual void visit_edges(Cell::Visitor&) override; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm + // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source + Optional<CancelAlgorithm> m_cancel_algorithm; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested + // A boolean flag indicating whether the stream has been closed by its underlying source, but still has chunks in its internal queue that have not yet been read + bool m_close_requested { false }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain + // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying source's pull algorithm to pull more data, but the pull could not yet be done since a previous call is still executing + bool m_pull_again { false }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm + // A promise-returning algorithm that pulls data from the underlying source + Optional<PullAlgorithm> m_pull_algorithm; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling + // A boolean flag set to true while the underlying source's pull algorithm is executing and the returned promise has not yet fulfilled, used to prevent reentrant calls + bool m_pulling { false }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue + // A list representing the stream’s internal queue of chunks + SinglyLinkedList<ValueWithSize> m_queue; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize + // The total size of all the chunks stored in [[queue]] + double m_queue_total_size { 0 }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started + // A boolean flag indicating whether the underlying source has finished starting + bool m_started { false }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm + // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying source + size_t m_strategy_hwm { 0 }; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm + // An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy + Optional<SizeAlgorithm> m_strategy_size_algorithm; + + // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-stream + // The ReadableStream instance controlled + JS::GCPtr<ReadableStream> m_stream; +}; + +} diff --git a/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.idl b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.idl new file mode 100644 index 0000000000..ec695a80ab --- /dev/null +++ b/Userland/Libraries/LibWeb/Streams/ReadableStreamDefaultController.idl @@ -0,0 +1,9 @@ +// https://streams.spec.whatwg.org/#readablestreamdefaultcontroller +[Exposed=*] +interface ReadableStreamDefaultController { + readonly attribute unrestricted double? desiredSize; + + undefined close(); + undefined enqueue(optional any chunk); + undefined error(optional any e); +}; |