From 41bece0682c2211eddbf9d7153e72e39ab1979d0 Mon Sep 17 00:00:00 2001 From: Robin Burchell Date: Wed, 17 Jul 2019 14:52:12 +0200 Subject: Make AClientConnection generic --- Libraries/LibAudio/AClientConnection.cpp | 141 ++---------------------- Libraries/LibAudio/AClientConnection.h | 22 +--- Libraries/LibCore/CIPCClientSideConnection.h | 159 +++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 148 deletions(-) create mode 100644 Libraries/LibCore/CIPCClientSideConnection.h diff --git a/Libraries/LibAudio/AClientConnection.cpp b/Libraries/LibAudio/AClientConnection.cpp index 6c122edcf9..75e3af1126 100644 --- a/Libraries/LibAudio/AClientConnection.cpp +++ b/Libraries/LibAudio/AClientConnection.cpp @@ -1,144 +1,25 @@ -#include "AClientConnection.h" -#include "ABuffer.h" #include -#include -#include -#include -#include -#include -#include -#include +#include +#include "AClientConnection.h" AClientConnection::AClientConnection() - : m_notifier(CNotifier(m_connection.fd(), CNotifier::Read)) -{ - // We want to rate-limit our clients - m_connection.set_blocking(true); - m_notifier.on_ready_to_read = [this] { - drain_messages_from_server(); - }; - m_connection.on_connected = [this] { - ASAPI_ClientMessage request; - request.type = ASAPI_ClientMessage::Type::Greeting; - request.greeting.client_pid = getpid(); - auto response = sync_request(request, ASAPI_ServerMessage::Type::Greeting); - m_server_pid = response.greeting.server_pid; - m_my_client_id = response.greeting.your_client_id; - dbg() << "**** C: Got greeting from AudioServer: client ID " << m_my_client_id << " PID " << m_server_pid; - }; - - int retries = 1000; - while (retries) { - if (m_connection.connect(CSocketAddress::local("/tmp/asportal"))) { - break; - } - -#ifdef ACLIENT_DEBUG - dbgprintf("**** C: AClientConnection: connect failed: %d, %s\n", errno, strerror(errno)); -#endif - sleep(1); - --retries; - } -} - -bool AClientConnection::drain_messages_from_server() -{ - for (;;) { - ASAPI_ServerMessage message; - ssize_t nread = recv(m_connection.fd(), &message, sizeof(ASAPI_ServerMessage), MSG_DONTWAIT); - if (nread < 0) { - if (errno == EAGAIN) { - return true; - } - perror("read"); - exit(1); - return false; - } - if (nread == 0) { - dbgprintf("EOF on IPC fd\n"); - exit(1); - return false; - } - ASSERT(nread == sizeof(message)); - ByteBuffer extra_data; - if (message.extra_size) { - extra_data = ByteBuffer::create_uninitialized(message.extra_size); - int extra_nread = read(m_connection.fd(), extra_data.data(), extra_data.size()); - if (extra_nread < 0) { - perror("read"); - ASSERT_NOT_REACHED(); - } - ASSERT((size_t)extra_nread == message.extra_size); - } - m_unprocessed_bundles.append({ move(message), move(extra_data) }); - } -} - -bool AClientConnection::wait_for_specific_event(ASAPI_ServerMessage::Type type, ASAPI_ServerMessage& event) + : CIPCClientSideConnection() { - for (;;) { - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(m_connection.fd(), &rfds); - int rc = select(m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr); - if (rc < 0) { - perror("select"); - } - ASSERT(rc > 0); - ASSERT(FD_ISSET(m_connection.fd(), &rfds)); - bool success = drain_messages_from_server(); - if (!success) - return false; - for (ssize_t i = 0; i < m_unprocessed_bundles.size(); ++i) { - if (m_unprocessed_bundles[i].message.type == type) { - event = move(m_unprocessed_bundles[i].message); - m_unprocessed_bundles.remove(i); - return true; - } - } - } -} - -bool AClientConnection::post_message_to_server(const ASAPI_ClientMessage& message, const ByteBuffer& extra_data) -{ - if (!extra_data.is_empty()) - const_cast(message).extra_size = extra_data.size(); - - struct iovec iov[2]; - int iov_count = 1; - iov[0].iov_base = const_cast(&message); - iov[0].iov_len = sizeof(message); - - if (!extra_data.is_empty()) { - iov[1].iov_base = const_cast(extra_data.data()); - iov[1].iov_len = extra_data.size(); - ++iov_count; - } - - int nwritten = writev(m_connection.fd(), iov, iov_count); - if (nwritten < 0) { - perror("writev"); - ASSERT_NOT_REACHED(); - } - ASSERT((size_t)nwritten == sizeof(message) + extra_data.size()); - - return true; } -ASAPI_ServerMessage AClientConnection::sync_request(const ASAPI_ClientMessage& request, ASAPI_ServerMessage::Type response_type) +void AClientConnection::send_greeting() { - bool success = post_message_to_server(request); - ASSERT(success); - - ASAPI_ServerMessage response; - success = wait_for_specific_event(response_type, response); - ASSERT(success); - return response; + ASAPI_ClientMessage request; + request.type = ASAPI_ClientMessage::Type::Greeting; + request.greeting.client_pid = getpid(); + auto response = sync_request(request, ASAPI_ServerMessage::Type::Greeting); + set_server_pid(response.greeting.server_pid); + set_my_client_id(response.greeting.your_client_id); } void AClientConnection::play(const ABuffer& buffer) { - auto shared_buf = SharedBuffer::create(m_server_pid, buffer.size_in_bytes()); + auto shared_buf = SharedBuffer::create(server_pid(), buffer.size_in_bytes()); if (!shared_buf) { dbg() << "Failed to create a shared buffer!"; return; diff --git a/Libraries/LibAudio/AClientConnection.h b/Libraries/LibAudio/AClientConnection.h index 5ef369d942..d98f9c5789 100644 --- a/Libraries/LibAudio/AClientConnection.h +++ b/Libraries/LibAudio/AClientConnection.h @@ -2,29 +2,15 @@ #include #include +#include #include + class ABuffer; -class AClientConnection { +class AClientConnection : public CIPCClientSideConnection { public: AClientConnection(); + void send_greeting() override; void play(const ABuffer& buffer); - -private: - bool drain_messages_from_server(); - bool wait_for_specific_event(ASAPI_ServerMessage::Type type, ASAPI_ServerMessage& event); - bool post_message_to_server(const ASAPI_ClientMessage& message, const ByteBuffer& extra_data = {}); - ASAPI_ServerMessage sync_request(const ASAPI_ClientMessage& request, ASAPI_ServerMessage::Type response_type); - - CLocalSocket m_connection; - CNotifier m_notifier; - - struct IncomingASMessageBundle { - ASAPI_ServerMessage message; - ByteBuffer extra_data; - }; - Vector m_unprocessed_bundles; - int m_server_pid; - int m_my_client_id; }; diff --git a/Libraries/LibCore/CIPCClientSideConnection.h b/Libraries/LibCore/CIPCClientSideConnection.h new file mode 100644 index 0000000000..2255754d0a --- /dev/null +++ b/Libraries/LibCore/CIPCClientSideConnection.h @@ -0,0 +1,159 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +template +class CIPCClientSideConnection { +public: + CIPCClientSideConnection() + : m_notifier(CNotifier(m_connection.fd(), CNotifier::Read)) + { + // We want to rate-limit our clients + m_connection.set_blocking(true); + m_notifier.on_ready_to_read = [this] { + drain_messages_from_server(); + }; + m_connection.on_connected = [this] { + dbg() << "IPC: Connected, sending greeting"; + send_greeting(); + dbg() << "IPC: Greeting sent!"; + }; + + int retries = 1000; + while (retries) { + if (m_connection.connect(CSocketAddress::local("/tmp/asportal"))) { + break; + } + + dbgprintf("CIPCClientSideConnection: connect failed: %d, %s\n", errno, strerror(errno)); + sleep(1); + --retries; + } + } + + virtual void send_greeting() = 0; + void set_server_pid(pid_t pid) { m_server_pid = pid; } + pid_t server_pid() const { return m_server_pid; } + void set_my_client_id(int id) { m_my_client_id = id; } + int my_client_id() const { return m_my_client_id; } + +protected: + template + bool wait_for_specific_event(MessageType type, ServerMessage& event) + { + for (;;) { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(m_connection.fd(), &rfds); + int rc = select(m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr); + if (rc < 0) { + perror("select"); + } + ASSERT(rc > 0); + ASSERT(FD_ISSET(m_connection.fd(), &rfds)); + bool success = drain_messages_from_server(); + if (!success) + return false; + for (ssize_t i = 0; i < m_unprocessed_bundles.size(); ++i) { + if (m_unprocessed_bundles[i].message.type == type) { + event = move(m_unprocessed_bundles[i].message); + m_unprocessed_bundles.remove(i); + return true; + } + } + } + } + + bool post_message_to_server(const ClientMessage& message, const ByteBuffer&& extra_data = {}) + { + if (!extra_data.is_empty()) + const_cast(message).extra_size = extra_data.size(); + + struct iovec iov[2]; + int iov_count = 1; + iov[0].iov_base = const_cast(&message); + iov[0].iov_len = sizeof(message); + + if (!extra_data.is_empty()) { + iov[1].iov_base = const_cast(extra_data.data()); + iov[1].iov_len = extra_data.size(); + ++iov_count; + } + + int nwritten = writev(m_connection.fd(), iov, iov_count); + if (nwritten < 0) { + perror("writev"); + ASSERT_NOT_REACHED(); + } + ASSERT((size_t)nwritten == sizeof(message) + extra_data.size()); + + return true; + } + + template + ServerMessage sync_request(const ClientMessage& request, MessageType response_type) + { + bool success = post_message_to_server(request); + ASSERT(success); + + ServerMessage response; + success = wait_for_specific_event(response_type, response); + ASSERT(success); + return response; + } + +private: + bool drain_messages_from_server() + { + for (;;) { + ServerMessage message; + ssize_t nread = recv(m_connection.fd(), &message, sizeof(ServerMessage), MSG_DONTWAIT); + if (nread < 0) { + if (errno == EAGAIN) { + return true; + } + perror("read"); + exit(1); + return false; + } + if (nread == 0) { + dbgprintf("EOF on IPC fd\n"); + exit(1); + return false; + } + ASSERT(nread == sizeof(message)); + ByteBuffer extra_data; + if (message.extra_size) { + extra_data = ByteBuffer::create_uninitialized(message.extra_size); + int extra_nread = read(m_connection.fd(), extra_data.data(), extra_data.size()); + if (extra_nread < 0) { + perror("read"); + ASSERT_NOT_REACHED(); + } + ASSERT((size_t)extra_nread == message.extra_size); + } + m_unprocessed_bundles.append({ move(message), move(extra_data) }); + } + } + + CLocalSocket m_connection; + CNotifier m_notifier; + + struct IncomingASMessageBundle { + ServerMessage message; + ByteBuffer extra_data; + }; + Vector m_unprocessed_bundles; + int m_server_pid; + int m_my_client_id; +}; -- cgit v1.2.3