summaryrefslogtreecommitdiff
path: root/Libraries
diff options
context:
space:
mode:
authorAndreas Kling <awesomekling@gmail.com>2019-08-03 19:41:02 +0200
committerAndreas Kling <awesomekling@gmail.com>2019-08-03 19:49:19 +0200
commit8e684f095982597d548514ccd7f723f6f285b1f5 (patch)
treefb8d83acebd948580467af03bbb0f6b9d51c3462 /Libraries
parent3519b6c201e53b703ced179b18d8570e93845d2d (diff)
downloadserenity-8e684f095982597d548514ccd7f723f6f285b1f5.zip
AudioServer: Port to the new generated IPC mechanism
Fork the IPC Connection classes into Server:: and Client::ConnectionNG. The new IPC messages are serialized very snugly instead of using the same generic data structure for all messages. Remove ASAPI.h since we now generate all of it from AudioServer.ipc :^)
Diffstat (limited to 'Libraries')
-rw-r--r--Libraries/LibAudio/AClientConnection.cpp16
-rw-r--r--Libraries/LibAudio/AClientConnection.h6
-rw-r--r--Libraries/LibAudio/ASAPI.h250
-rw-r--r--Libraries/LibCore/CoreIPCClient.h146
-rw-r--r--Libraries/LibCore/CoreIPCServer.h114
5 files changed, 267 insertions, 265 deletions
diff --git a/Libraries/LibAudio/AClientConnection.cpp b/Libraries/LibAudio/AClientConnection.cpp
index 00f36d6387..3cd78c0aa8 100644
--- a/Libraries/LibAudio/AClientConnection.cpp
+++ b/Libraries/LibAudio/AClientConnection.cpp
@@ -3,23 +3,23 @@
#include <SharedBuffer.h>
AClientConnection::AClientConnection()
- : Connection("/tmp/asportal")
+ : ConnectionNG("/tmp/asportal")
{
}
void AClientConnection::handshake()
{
- auto response = send_sync<ASAPI_Client::Greeting>(getpid());
- set_server_pid(response.server_pid());
- set_my_client_id(response.your_client_id());
+ auto response = send_sync<AudioServer::Greet>(getpid());
+ set_server_pid(response->server_pid());
+ set_my_client_id(response->client_id());
}
void AClientConnection::enqueue(const ABuffer& buffer)
{
for (;;) {
const_cast<ABuffer&>(buffer).shared_buffer().share_with(server_pid());
- auto response = send_sync<ASAPI_Client::EnqueueBuffer>(buffer.shared_buffer_id());
- if (response.success())
+ auto response = send_sync<AudioServer::EnqueueBuffer>(buffer.shared_buffer_id());
+ if (response->success())
break;
sleep(1);
}
@@ -27,10 +27,10 @@ void AClientConnection::enqueue(const ABuffer& buffer)
int AClientConnection::get_main_mix_volume()
{
- return send_sync<ASAPI_Client::GetMainMixVolume>().volume();
+ return send_sync<AudioServer::GetMainMixVolume>()->volume();
}
void AClientConnection::set_main_mix_volume(int volume)
{
- send_sync<ASAPI_Client::SetMainMixVolume>(volume);
+ send_sync<AudioServer::SetMainMixVolume>(volume);
}
diff --git a/Libraries/LibAudio/AClientConnection.h b/Libraries/LibAudio/AClientConnection.h
index 3a40698dad..1978a7f00d 100644
--- a/Libraries/LibAudio/AClientConnection.h
+++ b/Libraries/LibAudio/AClientConnection.h
@@ -1,11 +1,11 @@
#pragma once
-#include <LibAudio/ASAPI.h>
+#include <AudioServer/AudioServerEndpoint.h>
#include <LibCore/CoreIPCClient.h>
class ABuffer;
-class AClientConnection : public IPC::Client::Connection<ASAPI_ServerMessage, ASAPI_ClientMessage> {
+class AClientConnection : public IPC::Client::ConnectionNG<AudioServerEndpoint> {
C_OBJECT(AClientConnection)
public:
AClientConnection();
@@ -13,8 +13,6 @@ public:
virtual void handshake() override;
void enqueue(const ABuffer&);
- virtual void postprocess_bundles(Vector<IncomingMessageBundle>&) override {}
-
int get_main_mix_volume();
void set_main_mix_volume(int);
};
diff --git a/Libraries/LibAudio/ASAPI.h b/Libraries/LibAudio/ASAPI.h
deleted file mode 100644
index 6d91f6bcef..0000000000
--- a/Libraries/LibAudio/ASAPI.h
+++ /dev/null
@@ -1,250 +0,0 @@
-#pragma once
-
-#include <AK/Assertions.h>
-
-struct ASAPI_ServerMessage {
- enum class Type {
- Invalid,
- Greeting,
- FinishedPlayingBuffer,
- EnqueueBufferResponse,
- DidGetMainMixVolume,
- DidSetMainMixVolume,
- };
-
- Type type { Type::Invalid };
- unsigned extra_size { 0 };
- bool success { true };
- int value { 0 };
-
- union {
- struct {
- int server_pid;
- int your_client_id;
- } greeting;
- struct {
- int buffer_id;
- } playing_buffer;
- };
-};
-
-struct ASAPI_ClientMessage {
- enum class Type {
- Invalid,
- Greeting,
- EnqueueBuffer,
- GetMainMixVolume,
- SetMainMixVolume,
- };
-
- Type type { Type::Invalid };
- unsigned extra_size { 0 };
- int value { 0 };
-
- union {
- struct {
- int client_pid;
- } greeting;
- struct {
- int buffer_id;
- } play_buffer;
- };
-};
-
-// FIXME: Everything below this line should be generated from some kind of IPC protocol description.
-
-namespace ASAPI_Server {
-class Greeting;
-class FinishedPlayingBuffer;
-class EnqueueBufferResponse;
-class DidGetMainMixVolume;
-class DidSetMainMixVolume;
-}
-
-namespace ASAPI_Client {
-
-template<ASAPI_ClientMessage::Type type>
-class Message {
-public:
- static ASAPI_ClientMessage::Type message_type() { return type; }
- operator const ASAPI_ClientMessage&() const { return m_message; }
-
-protected:
- Message()
- {
- m_message.type = type;
- }
-
- Message(const ASAPI_ClientMessage& message)
- : m_message(message)
- {
- ASSERT(message.type == type);
- }
-
- ASAPI_ClientMessage m_message;
-};
-
-class Greeting : public Message<ASAPI_ClientMessage::Type::Greeting> {
-public:
- typedef ASAPI_Server::Greeting ResponseType;
- Greeting(const ASAPI_ClientMessage& message)
- : Message(message)
- {
- }
-
- Greeting(int client_pid)
- {
- m_message.greeting.client_pid = client_pid;
- }
-
- int client_pid() const { return m_message.greeting.client_pid; }
-};
-
-class EnqueueBuffer : public Message<ASAPI_ClientMessage::Type::EnqueueBuffer> {
-public:
- typedef ASAPI_Server::EnqueueBufferResponse ResponseType;
-
- EnqueueBuffer(const ASAPI_ClientMessage& message)
- : Message(message)
- {
- }
-
- EnqueueBuffer(int buffer_id)
- {
- m_message.play_buffer.buffer_id = buffer_id;
- }
-
- int buffer_id() const { return m_message.play_buffer.buffer_id; }
-};
-
-class GetMainMixVolume : public Message<ASAPI_ClientMessage::Type::GetMainMixVolume> {
-public:
- typedef ASAPI_Server::DidGetMainMixVolume ResponseType;
-
- GetMainMixVolume(const ASAPI_ClientMessage& message)
- : Message(message)
- {
- }
-
- GetMainMixVolume()
- {
- }
-};
-
-class SetMainMixVolume : public Message<ASAPI_ClientMessage::Type::SetMainMixVolume> {
-public:
- typedef ASAPI_Server::DidSetMainMixVolume ResponseType;
-
- SetMainMixVolume(const ASAPI_ClientMessage& message)
- : Message(message)
- {
- }
-
- SetMainMixVolume(int volume)
- {
- m_message.value = volume;
- }
-};
-
-}
-
-namespace ASAPI_Server {
-
-template<ASAPI_ServerMessage::Type type>
-class Message {
-public:
- static ASAPI_ServerMessage::Type message_type() { return type; }
- operator const ASAPI_ServerMessage&() const { return m_message; }
-
-protected:
- Message()
- {
- m_message.type = type;
- }
-
- Message(const ASAPI_ServerMessage& message)
- : m_message(message)
- {
- ASSERT(message.type == type);
- }
-
- ASAPI_ServerMessage m_message;
-};
-
-class Greeting : public Message<ASAPI_ServerMessage::Type::Greeting> {
-public:
- Greeting(const ASAPI_ServerMessage& message)
- : Message(message)
- {
- }
-
- Greeting(int server_pid, int your_client_id)
- {
- m_message.greeting.server_pid = server_pid;
- m_message.greeting.your_client_id = your_client_id;
- }
-
- int server_pid() const { return m_message.greeting.server_pid; }
- int your_client_id() const { return m_message.greeting.your_client_id; }
-};
-
-class FinishedPlayingBuffer : public Message<ASAPI_ServerMessage::Type::FinishedPlayingBuffer> {
-public:
- FinishedPlayingBuffer(const ASAPI_ServerMessage& message)
- : Message(message)
- {
- }
-
- FinishedPlayingBuffer(int buffer_id)
- {
- m_message.playing_buffer.buffer_id = buffer_id;
- }
-
- int buffer_id() const { return m_message.playing_buffer.buffer_id; }
-};
-
-class EnqueueBufferResponse : public Message<ASAPI_ServerMessage::Type::EnqueueBufferResponse> {
-public:
- EnqueueBufferResponse(const ASAPI_ServerMessage& message)
- : Message(message)
- {
- }
-
- EnqueueBufferResponse(bool success, int buffer_id)
- {
- m_message.success = success;
- m_message.playing_buffer.buffer_id = buffer_id;
- }
-
- bool success() const { return m_message.success; }
- int buffer_id() const { return m_message.playing_buffer.buffer_id; }
-};
-
-class DidGetMainMixVolume : public Message<ASAPI_ServerMessage::Type::DidGetMainMixVolume> {
-public:
- DidGetMainMixVolume(const ASAPI_ServerMessage& message)
- : Message(message)
- {
- }
-
- DidGetMainMixVolume(int volume)
- {
- m_message.value = volume;
- }
-
- int volume() const { return m_message.value; }
-};
-
-class DidSetMainMixVolume : public Message<ASAPI_ServerMessage::Type::DidSetMainMixVolume> {
-public:
- DidSetMainMixVolume(const ASAPI_ServerMessage& message)
- : Message(message)
- {
- }
-
- DidSetMainMixVolume()
- {
- }
-};
-
-}
diff --git a/Libraries/LibCore/CoreIPCClient.h b/Libraries/LibCore/CoreIPCClient.h
index c6169bf11c..f932014aca 100644
--- a/Libraries/LibCore/CoreIPCClient.h
+++ b/Libraries/LibCore/CoreIPCClient.h
@@ -1,12 +1,11 @@
#pragma once
-#include <LibAudio/ASAPI.h>
#include <LibCore/CEvent.h>
#include <LibCore/CEventLoop.h>
#include <LibCore/CLocalSocket.h>
#include <LibCore/CNotifier.h>
#include <LibCore/CSyscallUtils.h>
-
+#include <LibIPC/IMessage.h>
#include <stdio.h>
#include <sys/select.h>
#include <sys/socket.h>
@@ -236,5 +235,148 @@ namespace Client {
int m_my_client_id { -1 };
};
+ template<typename Endpoint>
+ class ConnectionNG : public CObject {
+ C_OBJECT(Connection)
+ public:
+ ConnectionNG(const StringView& address)
+ : m_notifier(CNotifier(m_connection.fd(), CNotifier::Read))
+ {
+ // We want to rate-limit our clients
+ m_connection.set_blocking(true);
+ m_notifier.on_ready_to_read = [this] {
+ drain_messages_from_server();
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
+ };
+
+ int retries = 1000;
+ while (retries) {
+ if (m_connection.connect(CSocketAddress::local(address))) {
+ break;
+ }
+
+ dbgprintf("Client::Connection: connect failed: %d, %s\n", errno, strerror(errno));
+ sleep(1);
+ --retries;
+ }
+ ASSERT(m_connection.is_connected());
+ }
+
+ virtual void handshake() = 0;
+
+ virtual void event(CEvent& event) override
+ {
+ if (event.type() == Event::PostProcess) {
+ postprocess_messages(m_unprocessed_messages);
+ } else {
+ CObject::event(event);
+ }
+ }
+
+ void set_server_pid(pid_t pid) { m_server_pid = pid; }
+ pid_t server_pid() const { return m_server_pid; }
+ void set_my_client_id(int id) { m_my_client_id = id; }
+ int my_client_id() const { return m_my_client_id; }
+
+ template<typename MessageType>
+ OwnPtr<MessageType> wait_for_specific_message()
+ {
+ // Double check we don't already have the event waiting for us.
+ // Otherwise we might end up blocked for a while for no reason.
+ for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) {
+ if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
+ auto message = move(m_unprocessed_messages[i]);
+ m_unprocessed_messages.remove(i);
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
+ return message;
+ }
+ }
+ for (;;) {
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ FD_SET(m_connection.fd(), &rfds);
+ int rc = CSyscallUtils::safe_syscall(select, m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr);
+ if (rc < 0) {
+ perror("select");
+ }
+ ASSERT(rc > 0);
+ ASSERT(FD_ISSET(m_connection.fd(), &rfds));
+ bool success = drain_messages_from_server();
+ if (!success)
+ return nullptr;
+ for (ssize_t i = 0; i < m_unprocessed_messages.size(); ++i) {
+ if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
+ auto message = move(m_unprocessed_messages[i]);
+ m_unprocessed_messages.remove(i);
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
+ return message;
+ }
+ }
+ }
+ }
+
+ bool post_message_to_server(const IMessage& message)
+ {
+ auto buffer = message.encode();
+ int nwritten = write(m_connection.fd(), buffer.data(), (size_t)buffer.size());
+ if (nwritten < 0) {
+ perror("write");
+ ASSERT_NOT_REACHED();
+ return false;
+ }
+ ASSERT(nwritten == buffer.size());
+ return true;
+ }
+
+ template<typename RequestType, typename... Args>
+ OwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
+ {
+ bool success = post_message_to_server(RequestType(forward<Args>(args)...));
+ ASSERT(success);
+ auto response = wait_for_specific_message<typename RequestType::ResponseType>();
+ ASSERT(response);
+ return response;
+ }
+
+ protected:
+ virtual void postprocess_messages(Vector<OwnPtr<IMessage>>& new_bundles)
+ {
+ new_bundles.clear();
+ }
+
+ private:
+ bool drain_messages_from_server()
+ {
+ for (;;) {
+ u8 buffer[4096];
+ ssize_t nread = recv(m_connection.fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
+ if (nread < 0) {
+ if (errno == EAGAIN) {
+ return true;
+ }
+ perror("read");
+ exit(1);
+ return false;
+ }
+ if (nread == 0) {
+ dbg() << "EOF on IPC fd";
+ exit(1);
+ return false;
+ }
+
+ auto message = Endpoint::decode_message(ByteBuffer::wrap(buffer, sizeof(buffer)));
+ ASSERT(message);
+
+ m_unprocessed_messages.append(move(message));
+ }
+ }
+
+ CLocalSocket m_connection;
+ CNotifier m_notifier;
+ Vector<OwnPtr<IMessage>> m_unprocessed_messages;
+ int m_server_pid { -1 };
+ int m_my_client_id { -1 };
+ };
+
} // Client
} // IPC
diff --git a/Libraries/LibCore/CoreIPCServer.h b/Libraries/LibCore/CoreIPCServer.h
index 54054826c3..69a1681eb1 100644
--- a/Libraries/LibCore/CoreIPCServer.h
+++ b/Libraries/LibCore/CoreIPCServer.h
@@ -6,6 +6,8 @@
#include <LibCore/CLocalSocket.h>
#include <LibCore/CNotifier.h>
#include <LibCore/CObject.h>
+#include <LibIPC/IEndpoint.h>
+#include <LibIPC/IMessage.h>
#include <errno.h>
#include <stdio.h>
@@ -52,7 +54,13 @@ namespace Server {
auto conn = new T(AK::forward<Args>(args)...) /* arghs */;
conn->send_greeting();
return conn;
- };
+ }
+
+ template<typename T, class... Args>
+ T* new_connection_ng_for_client(Args&&... args)
+ {
+ return new T(AK::forward<Args>(args)...) /* arghs */;
+ }
template<typename ServerMessage, typename ClientMessage>
class Connection : public CObject {
@@ -196,5 +204,109 @@ namespace Server {
int m_client_pid { -1 };
};
+ template<typename Endpoint>
+ class ConnectionNG : public CObject {
+ C_OBJECT(Connection)
+ public:
+ ConnectionNG(Endpoint& endpoint, CLocalSocket& socket, int client_id)
+ : m_endpoint(endpoint)
+ , m_socket(socket)
+ , m_client_id(client_id)
+ {
+ add_child(socket);
+ m_socket.on_ready_to_read = [this] { drain_client(); };
+ }
+
+ virtual ~ConnectionNG() override
+ {
+ }
+
+ void post_message(const IMessage& message)
+ {
+ auto buffer = message.encode();
+
+ int nwritten = write(m_socket.fd(), buffer.data(), (size_t)buffer.size());
+ if (nwritten < 0) {
+ switch (errno) {
+ case EPIPE:
+ dbg() << "Connection::post_message: Disconnected from peer";
+ delete_later();
+ return;
+ case EAGAIN:
+ dbg() << "Connection::post_message: Client buffer overflowed.";
+ did_misbehave();
+ return;
+ break;
+ default:
+ perror("Connection::post_message write");
+ ASSERT_NOT_REACHED();
+ }
+ }
+
+ ASSERT(nwritten == buffer.size());
+ }
+
+ void drain_client()
+ {
+ unsigned messages_received = 0;
+ for (;;) {
+ u8 buffer[4096];
+ ssize_t nread = recv(m_socket.fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
+ if (nread == 0 || (nread == -1 && errno == EAGAIN)) {
+ if (!messages_received) {
+ // TODO: is delete_later() sufficient?
+ CEventLoop::current().post_event(*this, make<DisconnectedEvent>(client_id()));
+ }
+ break;
+ }
+ if (nread < 0) {
+ perror("recv");
+ ASSERT_NOT_REACHED();
+ }
+ auto message = m_endpoint.decode_message(ByteBuffer::wrap(buffer, nread));
+ if (!message) {
+ dbg() << "drain_client: Endpoint didn't recognize message";
+ did_misbehave();
+ return;
+ }
+ ++messages_received;
+
+ auto response = m_endpoint.handle(*message);
+ if (response)
+ post_message(*response);
+ }
+ }
+
+ void did_misbehave()
+ {
+ dbg() << "Connection{" << this << "} (id=" << m_client_id << ", pid=" << m_client_pid << ") misbehaved, disconnecting.";
+ m_socket.close();
+ delete_later();
+ }
+
+ int client_id() const { return m_client_id; }
+ pid_t client_pid() const { return m_client_pid; }
+ void set_client_pid(pid_t pid) { m_client_pid = pid; }
+
+ protected:
+ void event(CEvent& event) override
+ {
+ if (event.type() == Event::Disconnected) {
+ int client_id = static_cast<const DisconnectedEvent&>(event).client_id();
+ dbgprintf("Connection: Client disconnected: %d\n", client_id);
+ delete this;
+ return;
+ }
+
+ CObject::event(event);
+ }
+
+ private:
+ Endpoint& m_endpoint;
+ CLocalSocket& m_socket;
+ int m_client_id { -1 };
+ int m_client_pid { -1 };
+ };
+
} // Server
} // IPC