diff options
author | Andreas Kling <awesomekling@gmail.com> | 2019-08-03 19:41:02 +0200 |
---|---|---|
committer | Andreas Kling <awesomekling@gmail.com> | 2019-08-03 19:49:19 +0200 |
commit | 8e684f095982597d548514ccd7f723f6f285b1f5 (patch) | |
tree | fb8d83acebd948580467af03bbb0f6b9d51c3462 /Libraries | |
parent | 3519b6c201e53b703ced179b18d8570e93845d2d (diff) | |
download | serenity-8e684f095982597d548514ccd7f723f6f285b1f5.zip |
AudioServer: Port to the new generated IPC mechanism
Fork the IPC Connection classes into Server:: and Client::ConnectionNG.
The new IPC messages are serialized very snugly instead of using the
same generic data structure for all messages.
Remove ASAPI.h since we now generate all of it from AudioServer.ipc :^)
Diffstat (limited to 'Libraries')
-rw-r--r-- | Libraries/LibAudio/AClientConnection.cpp | 16 | ||||
-rw-r--r-- | Libraries/LibAudio/AClientConnection.h | 6 | ||||
-rw-r--r-- | Libraries/LibAudio/ASAPI.h | 250 | ||||
-rw-r--r-- | Libraries/LibCore/CoreIPCClient.h | 146 | ||||
-rw-r--r-- | Libraries/LibCore/CoreIPCServer.h | 114 |
5 files changed, 267 insertions, 265 deletions
diff --git a/Libraries/LibAudio/AClientConnection.cpp b/Libraries/LibAudio/AClientConnection.cpp index 00f36d6387..3cd78c0aa8 100644 --- a/Libraries/LibAudio/AClientConnection.cpp +++ b/Libraries/LibAudio/AClientConnection.cpp @@ -3,23 +3,23 @@ #include <SharedBuffer.h> AClientConnection::AClientConnection() - : Connection("/tmp/asportal") + : ConnectionNG("/tmp/asportal") { } void AClientConnection::handshake() { - auto response = send_sync<ASAPI_Client::Greeting>(getpid()); - set_server_pid(response.server_pid()); - set_my_client_id(response.your_client_id()); + auto response = send_sync<AudioServer::Greet>(getpid()); + set_server_pid(response->server_pid()); + set_my_client_id(response->client_id()); } void AClientConnection::enqueue(const ABuffer& buffer) { for (;;) { const_cast<ABuffer&>(buffer).shared_buffer().share_with(server_pid()); - auto response = send_sync<ASAPI_Client::EnqueueBuffer>(buffer.shared_buffer_id()); - if (response.success()) + auto response = send_sync<AudioServer::EnqueueBuffer>(buffer.shared_buffer_id()); + if (response->success()) break; sleep(1); } @@ -27,10 +27,10 @@ void AClientConnection::enqueue(const ABuffer& buffer) int AClientConnection::get_main_mix_volume() { - return send_sync<ASAPI_Client::GetMainMixVolume>().volume(); + return send_sync<AudioServer::GetMainMixVolume>()->volume(); } void AClientConnection::set_main_mix_volume(int volume) { - send_sync<ASAPI_Client::SetMainMixVolume>(volume); + send_sync<AudioServer::SetMainMixVolume>(volume); } diff --git a/Libraries/LibAudio/AClientConnection.h b/Libraries/LibAudio/AClientConnection.h index 3a40698dad..1978a7f00d 100644 --- a/Libraries/LibAudio/AClientConnection.h +++ b/Libraries/LibAudio/AClientConnection.h @@ -1,11 +1,11 @@ #pragma once -#include <LibAudio/ASAPI.h> +#include <AudioServer/AudioServerEndpoint.h> #include <LibCore/CoreIPCClient.h> class ABuffer; -class AClientConnection : public IPC::Client::Connection<ASAPI_ServerMessage, ASAPI_ClientMessage> { +class AClientConnection : public IPC::Client::ConnectionNG<AudioServerEndpoint> { C_OBJECT(AClientConnection) public: AClientConnection(); @@ -13,8 +13,6 @@ public: virtual void handshake() override; void enqueue(const ABuffer&); - virtual void postprocess_bundles(Vector<IncomingMessageBundle>&) override {} - int get_main_mix_volume(); void set_main_mix_volume(int); }; diff --git a/Libraries/LibAudio/ASAPI.h b/Libraries/LibAudio/ASAPI.h deleted file mode 100644 index 6d91f6bcef..0000000000 --- a/Libraries/LibAudio/ASAPI.h +++ /dev/null @@ -1,250 +0,0 @@ -#pragma once - -#include <AK/Assertions.h> - -struct ASAPI_ServerMessage { - enum class Type { - Invalid, - Greeting, - FinishedPlayingBuffer, - EnqueueBufferResponse, - DidGetMainMixVolume, - DidSetMainMixVolume, - }; - - Type type { Type::Invalid }; - unsigned extra_size { 0 }; - bool success { true }; - int value { 0 }; - - union { - struct { - int server_pid; - int your_client_id; - } greeting; - struct { - int buffer_id; - } playing_buffer; - }; -}; - -struct ASAPI_ClientMessage { - enum class Type { - Invalid, - Greeting, - EnqueueBuffer, - GetMainMixVolume, - SetMainMixVolume, - }; - - Type type { Type::Invalid }; - unsigned extra_size { 0 }; - int value { 0 }; - - union { - struct { - int client_pid; - } greeting; - struct { - int buffer_id; - } play_buffer; - }; -}; - -// FIXME: Everything below this line should be generated from some kind of IPC protocol description. - -namespace ASAPI_Server { -class Greeting; -class FinishedPlayingBuffer; -class EnqueueBufferResponse; -class DidGetMainMixVolume; -class DidSetMainMixVolume; -} - -namespace ASAPI_Client { - -template<ASAPI_ClientMessage::Type type> -class Message { -public: - static ASAPI_ClientMessage::Type message_type() { return type; } - operator const ASAPI_ClientMessage&() const { return m_message; } - -protected: - Message() - { - m_message.type = type; - } - - Message(const ASAPI_ClientMessage& message) - : m_message(message) - { - ASSERT(message.type == type); - } - - ASAPI_ClientMessage m_message; -}; - -class Greeting : public Message<ASAPI_ClientMessage::Type::Greeting> { -public: - typedef ASAPI_Server::Greeting ResponseType; - Greeting(const ASAPI_ClientMessage& message) - : Message(message) - { - } - - Greeting(int client_pid) - { - m_message.greeting.client_pid = client_pid; - } - - int client_pid() const { return m_message.greeting.client_pid; } -}; - -class EnqueueBuffer : public Message<ASAPI_ClientMessage::Type::EnqueueBuffer> { -public: - typedef ASAPI_Server::EnqueueBufferResponse ResponseType; - - EnqueueBuffer(const ASAPI_ClientMessage& message) - : Message(message) - { - } - - EnqueueBuffer(int buffer_id) - { - m_message.play_buffer.buffer_id = buffer_id; - } - - int buffer_id() const { return m_message.play_buffer.buffer_id; } -}; - -class GetMainMixVolume : public Message<ASAPI_ClientMessage::Type::GetMainMixVolume> { -public: - typedef ASAPI_Server::DidGetMainMixVolume ResponseType; - - GetMainMixVolume(const ASAPI_ClientMessage& message) - : Message(message) - { - } - - GetMainMixVolume() - { - } -}; - -class SetMainMixVolume : public Message<ASAPI_ClientMessage::Type::SetMainMixVolume> { -public: - typedef ASAPI_Server::DidSetMainMixVolume ResponseType; - - SetMainMixVolume(const ASAPI_ClientMessage& message) - : Message(message) - { - } - - SetMainMixVolume(int volume) - { - m_message.value = volume; - } -}; - -} - -namespace ASAPI_Server { - -template<ASAPI_ServerMessage::Type type> -class Message { -public: - static ASAPI_ServerMessage::Type message_type() { return type; } - operator const ASAPI_ServerMessage&() const { return m_message; } - -protected: - Message() - { - m_message.type = type; - } - - Message(const ASAPI_ServerMessage& message) - : m_message(message) - { - ASSERT(message.type == type); - } - - ASAPI_ServerMessage m_message; -}; - -class Greeting : public Message<ASAPI_ServerMessage::Type::Greeting> { -public: - Greeting(const ASAPI_ServerMessage& message) - : Message(message) - { - } - - Greeting(int server_pid, int your_client_id) - { - m_message.greeting.server_pid = server_pid; - m_message.greeting.your_client_id = your_client_id; - } - - int server_pid() const { return m_message.greeting.server_pid; } - int your_client_id() const { return m_message.greeting.your_client_id; } -}; - -class FinishedPlayingBuffer : public Message<ASAPI_ServerMessage::Type::FinishedPlayingBuffer> { -public: - FinishedPlayingBuffer(const ASAPI_ServerMessage& message) - : Message(message) - { - } - - FinishedPlayingBuffer(int buffer_id) - { - m_message.playing_buffer.buffer_id = buffer_id; - } - - int buffer_id() const { return m_message.playing_buffer.buffer_id; } -}; - -class EnqueueBufferResponse : public Message<ASAPI_ServerMessage::Type::EnqueueBufferResponse> { -public: - EnqueueBufferResponse(const ASAPI_ServerMessage& message) - : Message(message) - { - } - - EnqueueBufferResponse(bool success, int buffer_id) - { - m_message.success = success; - m_message.playing_buffer.buffer_id = buffer_id; - } - - bool success() const { return m_message.success; } - int buffer_id() const { return m_message.playing_buffer.buffer_id; } -}; - -class DidGetMainMixVolume : public Message<ASAPI_ServerMessage::Type::DidGetMainMixVolume> { -public: - DidGetMainMixVolume(const ASAPI_ServerMessage& message) - : Message(message) - { - } - - DidGetMainMixVolume(int volume) - { - m_message.value = volume; - } - - int volume() const { return m_message.value; } -}; - -class DidSetMainMixVolume : public Message<ASAPI_ServerMessage::Type::DidSetMainMixVolume> { -public: - DidSetMainMixVolume(const ASAPI_ServerMessage& message) - : Message(message) - { - } - - DidSetMainMixVolume() - { - } -}; - -} diff --git a/Libraries/LibCore/CoreIPCClient.h b/Libraries/LibCore/CoreIPCClient.h index c6169bf11c..f932014aca 100644 --- a/Libraries/LibCore/CoreIPCClient.h +++ b/Libraries/LibCore/CoreIPCClient.h @@ -1,12 +1,11 @@ #pragma once -#include <LibAudio/ASAPI.h> #include <LibCore/CEvent.h> #include <LibCore/CEventLoop.h> #include <LibCore/CLocalSocket.h> #include <LibCore/CNotifier.h> #include <LibCore/CSyscallUtils.h> - +#include <LibIPC/IMessage.h> #include <stdio.h> #include <sys/select.h> #include <sys/socket.h> @@ -236,5 +235,148 @@ namespace Client { int m_my_client_id { -1 }; }; + template<typename Endpoint> + class ConnectionNG : public CObject { + C_OBJECT(Connection) + public: + ConnectionNG(const StringView& address) + : m_notifier(CNotifier(m_connection.fd(), CNotifier::Read)) + { + // We want to rate-limit our clients + m_connection.set_blocking(true); + m_notifier.on_ready_to_read = [this] { + drain_messages_from_server(); + CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd())); + }; + + int retries = 1000; + while (retries) { + if (m_connection.connect(CSocketAddress::local(address))) { + break; + } + + dbgprintf("Client::Connection: connect failed: %d, %s\n", errno, strerror(errno)); + sleep(1); + --retries; + } + ASSERT(m_connection.is_connected()); + } + + virtual void handshake() = 0; + + virtual void event(CEvent& event) override + { + if (event.type() == Event::PostProcess) { + postprocess_messages(m_unprocessed_messages); + } else { + CObject::event(event); + } + } + + void set_server_pid(pid_t pid) { m_server_pid = pid; } + pid_t server_pid() const { return m_server_pid; } + void set_my_client_id(int id) { m_my_client_id = id; } + int my_client_id() const { return m_my_client_id; } + + template<typename MessageType> + OwnPtr<MessageType> wait_for_specific_message() + { + // Double check we don't already have the event waiting for us. + // Otherwise we might end up blocked for a while for no reason. + for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) { + if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) { + auto message = move(m_unprocessed_messages[i]); + m_unprocessed_messages.remove(i); + CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd())); + return message; + } + } + for (;;) { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(m_connection.fd(), &rfds); + int rc = CSyscallUtils::safe_syscall(select, m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr); + if (rc < 0) { + perror("select"); + } + ASSERT(rc > 0); + ASSERT(FD_ISSET(m_connection.fd(), &rfds)); + bool success = drain_messages_from_server(); + if (!success) + return nullptr; + for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) { + if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) { + auto message = move(m_unprocessed_messages[i]); + m_unprocessed_messages.remove(i); + CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd())); + return message; + } + } + } + } + + bool post_message_to_server(const IMessage& message) + { + auto buffer = message.encode(); + int nwritten = write(m_connection.fd(), buffer.data(), (size_t)buffer.size()); + if (nwritten < 0) { + perror("write"); + ASSERT_NOT_REACHED(); + return false; + } + ASSERT(nwritten == buffer.size()); + return true; + } + + template<typename RequestType, typename... Args> + OwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args) + { + bool success = post_message_to_server(RequestType(forward<Args>(args)...)); + ASSERT(success); + auto response = wait_for_specific_message<typename RequestType::ResponseType>(); + ASSERT(response); + return response; + } + + protected: + virtual void postprocess_messages(Vector<OwnPtr<IMessage>>& new_bundles) + { + new_bundles.clear(); + } + + private: + bool drain_messages_from_server() + { + for (;;) { + u8 buffer[4096]; + ssize_t nread = recv(m_connection.fd(), buffer, sizeof(buffer), MSG_DONTWAIT); + if (nread < 0) { + if (errno == EAGAIN) { + return true; + } + perror("read"); + exit(1); + return false; + } + if (nread == 0) { + dbg() << "EOF on IPC fd"; + exit(1); + return false; + } + + auto message = Endpoint::decode_message(ByteBuffer::wrap(buffer, sizeof(buffer))); + ASSERT(message); + + m_unprocessed_messages.append(move(message)); + } + } + + CLocalSocket m_connection; + CNotifier m_notifier; + Vector<OwnPtr<IMessage>> m_unprocessed_messages; + int m_server_pid { -1 }; + int m_my_client_id { -1 }; + }; + } // Client } // IPC diff --git a/Libraries/LibCore/CoreIPCServer.h b/Libraries/LibCore/CoreIPCServer.h index 54054826c3..69a1681eb1 100644 --- a/Libraries/LibCore/CoreIPCServer.h +++ b/Libraries/LibCore/CoreIPCServer.h @@ -6,6 +6,8 @@ #include <LibCore/CLocalSocket.h> #include <LibCore/CNotifier.h> #include <LibCore/CObject.h> +#include <LibIPC/IEndpoint.h> +#include <LibIPC/IMessage.h> #include <errno.h> #include <stdio.h> @@ -52,7 +54,13 @@ namespace Server { auto conn = new T(AK::forward<Args>(args)...) /* arghs */; conn->send_greeting(); return conn; - }; + } + + template<typename T, class... Args> + T* new_connection_ng_for_client(Args&&... args) + { + return new T(AK::forward<Args>(args)...) /* arghs */; + } template<typename ServerMessage, typename ClientMessage> class Connection : public CObject { @@ -196,5 +204,109 @@ namespace Server { int m_client_pid { -1 }; }; + template<typename Endpoint> + class ConnectionNG : public CObject { + C_OBJECT(Connection) + public: + ConnectionNG(Endpoint& endpoint, CLocalSocket& socket, int client_id) + : m_endpoint(endpoint) + , m_socket(socket) + , m_client_id(client_id) + { + add_child(socket); + m_socket.on_ready_to_read = [this] { drain_client(); }; + } + + virtual ~ConnectionNG() override + { + } + + void post_message(const IMessage& message) + { + auto buffer = message.encode(); + + int nwritten = write(m_socket.fd(), buffer.data(), (size_t)buffer.size()); + if (nwritten < 0) { + switch (errno) { + case EPIPE: + dbg() << "Connection::post_message: Disconnected from peer"; + delete_later(); + return; + case EAGAIN: + dbg() << "Connection::post_message: Client buffer overflowed."; + did_misbehave(); + return; + break; + default: + perror("Connection::post_message write"); + ASSERT_NOT_REACHED(); + } + } + + ASSERT(nwritten == buffer.size()); + } + + void drain_client() + { + unsigned messages_received = 0; + for (;;) { + u8 buffer[4096]; + ssize_t nread = recv(m_socket.fd(), buffer, sizeof(buffer), MSG_DONTWAIT); + if (nread == 0 || (nread == -1 && errno == EAGAIN)) { + if (!messages_received) { + // TODO: is delete_later() sufficient? + CEventLoop::current().post_event(*this, make<DisconnectedEvent>(client_id())); + } + break; + } + if (nread < 0) { + perror("recv"); + ASSERT_NOT_REACHED(); + } + auto message = m_endpoint.decode_message(ByteBuffer::wrap(buffer, nread)); + if (!message) { + dbg() << "drain_client: Endpoint didn't recognize message"; + did_misbehave(); + return; + } + ++messages_received; + + auto response = m_endpoint.handle(*message); + if (response) + post_message(*response); + } + } + + void did_misbehave() + { + dbg() << "Connection{" << this << "} (id=" << m_client_id << ", pid=" << m_client_pid << ") misbehaved, disconnecting."; + m_socket.close(); + delete_later(); + } + + int client_id() const { return m_client_id; } + pid_t client_pid() const { return m_client_pid; } + void set_client_pid(pid_t pid) { m_client_pid = pid; } + + protected: + void event(CEvent& event) override + { + if (event.type() == Event::Disconnected) { + int client_id = static_cast<const DisconnectedEvent&>(event).client_id(); + dbgprintf("Connection: Client disconnected: %d\n", client_id); + delete this; + return; + } + + CObject::event(event); + } + + private: + Endpoint& m_endpoint; + CLocalSocket& m_socket; + int m_client_id { -1 }; + int m_client_pid { -1 }; + }; + } // Server } // IPC |