summaryrefslogtreecommitdiff
path: root/Servers
diff options
context:
space:
mode:
Diffstat (limited to 'Servers')
-rw-r--r--Servers/AudioServer/ASClientConnection.cpp176
-rw-r--r--Servers/AudioServer/ASClientConnection.h84
-rw-r--r--Servers/AudioServer/ASEventLoop.cpp37
-rw-r--r--Servers/AudioServer/ASEventLoop.h20
-rw-r--r--Servers/AudioServer/ASMixer.cpp112
-rw-r--r--Servers/AudioServer/ASMixer.h33
-rw-r--r--Servers/AudioServer/Makefile7
-rw-r--r--Servers/AudioServer/main.cpp82
8 files changed, 468 insertions, 83 deletions
diff --git a/Servers/AudioServer/ASClientConnection.cpp b/Servers/AudioServer/ASClientConnection.cpp
new file mode 100644
index 0000000000..1bd4a9e255
--- /dev/null
+++ b/Servers/AudioServer/ASClientConnection.cpp
@@ -0,0 +1,176 @@
+#include "ASClientConnection.h"
+#include "ASMixer.h"
+
+#include <LibCore/CEventLoop.h>
+#include <LibAudio/ASAPI.h>
+#include <LibAudio/ABuffer.h>
+#include <SharedBuffer.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdio.h>
+
+ASClientConnection::ASClientConnection(int fd, int client_id, ASMixer& mixer)
+ : m_socket(fd)
+ , m_notifier(CNotifier(fd, CNotifier::Read))
+ , m_client_id(client_id)
+ , m_mixer(mixer)
+{
+ m_notifier.on_ready_to_read = [this] { drain_client(); };
+ ASAPI_ServerMessage message;
+ message.type = ASAPI_ServerMessage::Type::Greeting;
+ message.greeting.server_pid = getpid();
+ message.greeting.your_client_id = m_client_id;
+ post_message(message);
+ dbg() << "********** S: Created new ASClientConnection " << fd << client_id << " and said hello";
+}
+
+ASClientConnection::~ASClientConnection()
+{
+ dbg() << "********** S: Destroyed ASClientConnection " << m_socket.fd() << m_client_id;
+}
+
+void ASClientConnection::post_message(const ASAPI_ServerMessage& message, const ByteBuffer& extra_data)
+{
+ if (!extra_data.is_empty())
+ const_cast<ASAPI_ServerMessage&>(message).extra_size = extra_data.size();
+
+ struct iovec iov[2];
+ int iov_count = 1;
+
+ iov[0].iov_base = const_cast<ASAPI_ServerMessage*>(&message);
+ iov[0].iov_len = sizeof(message);
+
+ if (!extra_data.is_empty()) {
+ iov[1].iov_base = const_cast<u8*>(extra_data.data());
+ iov[1].iov_len = extra_data.size();
+ ++iov_count;
+ }
+
+ int nwritten = writev(m_socket.fd(), iov, iov_count);
+ if (nwritten < 0) {
+ switch (errno) {
+ case EPIPE:
+ dbgprintf("WSClientConnection::post_message: Disconnected from peer.\n");
+ delete_later();
+ return;
+ break;
+ case EAGAIN:
+ dbgprintf("WSClientConnection::post_message: Client buffer overflowed.\n");
+ did_misbehave();
+ return;
+ break;
+ default:
+ perror("WSClientConnection::post_message writev");
+ ASSERT_NOT_REACHED();
+ }
+ }
+
+ ASSERT(nwritten == (int)(sizeof(message) + extra_data.size()));
+}
+
+void ASClientConnection::event(CEvent& event)
+{
+ if (event.type() == ASEvent::WM_ClientDisconnected) {
+ int client_id = static_cast<const ASClientDisconnectedNotification&>(event).client_id();
+ dbgprintf("ASClientConnection: Client disconnected: %d\n", client_id);
+ delete this;
+ return;
+ }
+
+ CObject::event(event);
+}
+
+void ASClientConnection::drain_client()
+{
+ unsigned messages_received = 0;
+ for (;;) {
+ ASAPI_ClientMessage message;
+ // FIXME: Don't go one message at a time, that's so much context switching, oof.
+ ssize_t nread = recv(m_socket.fd(), &message, sizeof(ASAPI_ClientMessage), MSG_DONTWAIT);
+ if (nread == 0 || (nread == -1 && errno == EAGAIN)) {
+ if (!messages_received) {
+ // TODO: is delete_later() sufficient?
+ CEventLoop::current().post_event(*this, make<ASClientDisconnectedNotification>(m_client_id));
+ }
+ break;
+ }
+ if (nread < 0) {
+ perror("recv");
+ ASSERT_NOT_REACHED();
+ }
+ ByteBuffer extra_data;
+ if (message.extra_size) {
+ if (message.extra_size >= 32768) {
+ dbgprintf("message.extra_size is way too large\n");
+ return did_misbehave();
+ }
+ extra_data = ByteBuffer::create_uninitialized(message.extra_size);
+ // FIXME: We should allow this to time out. Maybe use a socket timeout?
+ int extra_nread = read(m_socket.fd(), extra_data.data(), extra_data.size());
+ if (extra_nread != (int)message.extra_size) {
+ dbgprintf("extra_nread(%d) != extra_size(%d)\n", extra_nread, extra_data.size());
+ if (extra_nread < 0)
+ perror("read");
+ return did_misbehave();
+ }
+ }
+ if (!handle_message(message, move(extra_data)))
+ return;
+ ++messages_received;
+ }
+}
+
+bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, const ByteBuffer&)
+{
+ switch (message.type) {
+ case ASAPI_ClientMessage::Type::Greeting:
+ m_pid = message.greeting.client_pid;
+ break;
+ case ASAPI_ClientMessage::Type::PlayBuffer: {
+ // ### ensure that the size is that of a Vector<ASample>
+ Vector<ASample> samples;
+
+ {
+ const auto& shared_buf = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id);
+ if (!shared_buf) {
+ did_misbehave();
+ return false;
+ }
+
+ if (shared_buf->size() / sizeof(ASample) > 441000) {
+ did_misbehave();
+ return false;
+ }
+ samples.resize(shared_buf->size() / sizeof(ASample));
+ memcpy(samples.data(), shared_buf->data(), shared_buf->size());
+ }
+
+ // we no longer need the buffer, so acknowledge that it's playing
+ // TODO: rate limit playback here somehow
+ ASAPI_ServerMessage reply;
+ reply.type = ASAPI_ServerMessage::Type::PlayingBuffer;
+ reply.playing_buffer.buffer_id = message.play_buffer.buffer_id;
+ post_message(reply);
+
+ m_mixer.queue(*this, adopt(*new ABuffer(samples)));
+ break;
+ }
+ case ASAPI_ClientMessage::Type::Invalid:
+ default:
+ dbgprintf("ASClientConnection: Unexpected message ID %d\n", int(message.type));
+ did_misbehave();
+ }
+
+ return true;
+}
+
+void ASClientConnection::did_misbehave()
+{
+ dbgprintf("ASClientConnection{%p} (id=%d, pid=%d) misbehaved, disconnecting.\n", this, m_client_id, m_pid);
+ delete_later();
+ m_notifier.set_enabled(false);
+}
diff --git a/Servers/AudioServer/ASClientConnection.h b/Servers/AudioServer/ASClientConnection.h
new file mode 100644
index 0000000000..27f9cd3259
--- /dev/null
+++ b/Servers/AudioServer/ASClientConnection.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include <LibCore/CObject.h>
+#include <LibCore/CEvent.h>
+#include <LibCore/CIODevice.h>
+#include <LibCore/CNotifier.h>
+
+struct ASAPI_ServerMessage;
+struct ASAPI_ClientMessage;
+
+class ASEvent : public CEvent {
+public:
+ enum Type {
+ Invalid = 2000,
+ WM_ClientDisconnected,
+ };
+ ASEvent() {}
+ explicit ASEvent(Type type)
+ : CEvent(type)
+ {
+ }
+};
+
+class ASClientDisconnectedNotification : public ASEvent {
+public:
+ explicit ASClientDisconnectedNotification(int client_id)
+ : ASEvent(WM_ClientDisconnected)
+ , m_client_id(client_id)
+ {
+ }
+
+ int client_id() const { return m_client_id; }
+
+private:
+ int m_client_id { 0 };
+};
+
+class ASMixer;
+
+class ASClientConnection : public CObject
+{
+public:
+ ASClientConnection(int fd, int client_id, ASMixer& mixer);
+ ~ASClientConnection();
+
+ void post_message(const ASAPI_ServerMessage&, const ByteBuffer& = {});
+ bool handle_message(const ASAPI_ClientMessage&, const ByteBuffer& = {});
+
+ void drain_client();
+
+ void did_misbehave();
+
+ const char* class_name() const override { return "ASClientConnection"; }
+
+protected:
+ void event(CEvent& event) override;
+private:
+ // TODO: A way to create some kind of CIODevice with an open FD would be nice.
+ class ASOpenedSocket : public CIODevice
+ {
+ public:
+ const char* class_name() const override { return "ASOpenedSocket"; }
+ ASOpenedSocket(int fd)
+ {
+ set_fd(fd);
+ set_mode(CIODevice::OpenMode::ReadWrite);
+ }
+
+ bool open(CIODevice::OpenMode) override
+ {
+ ASSERT_NOT_REACHED();
+ return true;
+ };
+
+ int fd() const { return CIODevice::fd(); }
+ };
+
+ ASOpenedSocket m_socket;
+ CNotifier m_notifier;
+ int m_client_id;
+ int m_pid;
+ ASMixer& m_mixer;
+};
+
diff --git a/Servers/AudioServer/ASEventLoop.cpp b/Servers/AudioServer/ASEventLoop.cpp
new file mode 100644
index 0000000000..d68916efd6
--- /dev/null
+++ b/Servers/AudioServer/ASEventLoop.cpp
@@ -0,0 +1,37 @@
+#include "ASEventLoop.h"
+#include "ASClientConnection.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+ASEventLoop::ASEventLoop()
+{
+ unlink("/tmp/asportal");
+
+ sockaddr_un address;
+ address.sun_family = AF_LOCAL;
+ strcpy(address.sun_path, "/tmp/asportal");
+ int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address));
+ ASSERT(rc == 0);
+ rc = listen(m_server_sock.fd(), 5);
+ ASSERT(rc == 0);
+
+ m_server_notifier = make<CNotifier>(m_server_sock.fd(), CNotifier::Read);
+ m_server_notifier->on_ready_to_read = [this] { drain_server(); };
+}
+
+void ASEventLoop::drain_server()
+{
+ sockaddr_un address;
+ socklen_t address_size = sizeof(address);
+ int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size);
+ if (client_fd < 0) {
+ dbgprintf("AudioServer: accept() failed: %s\n", strerror(errno));
+ } else {
+ dbgprintf("AudioServer: accept()ed client %d\n", client_fd);
+ static int s_next_client_id = 0;
+ new ASClientConnection(client_fd, s_next_client_id++, m_mixer);
+ }
+}
+
diff --git a/Servers/AudioServer/ASEventLoop.h b/Servers/AudioServer/ASEventLoop.h
new file mode 100644
index 0000000000..c6a5ed259f
--- /dev/null
+++ b/Servers/AudioServer/ASEventLoop.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <LibCore/CEventLoop.h>
+#include <LibCore/CLocalSocket.h>
+#include <LibCore/CNotifier.h>
+#include "ASMixer.h"
+
+class ASEventLoop
+{
+public:
+ ASEventLoop();
+ int exec() { return m_event_loop.exec(); }
+private:
+ CEventLoop m_event_loop;
+ CLocalSocket m_server_sock;
+ OwnPtr<CNotifier> m_server_notifier;
+ ASMixer m_mixer;
+
+ void drain_server();
+};
diff --git a/Servers/AudioServer/ASMixer.cpp b/Servers/AudioServer/ASMixer.cpp
new file mode 100644
index 0000000000..15a00dcb4d
--- /dev/null
+++ b/Servers/AudioServer/ASMixer.cpp
@@ -0,0 +1,112 @@
+#include <AK/BufferStream.h>
+#include <LibCore/CThread.h>
+
+#include <limits>
+#include "ASMixer.h"
+
+ASMixer::ASMixer()
+ : m_device("/dev/audio")
+{
+ if (!m_device.open(CIODevice::WriteOnly)) {
+ dbgprintf("Can't open audio device: %s\n", m_device.error_string());
+ return;
+ }
+
+ CThread sound_thread([](void* context) -> int {
+ ASMixer* mixer = (ASMixer*)context;
+ mixer->mix();
+ return 0;
+ }, this);
+}
+
+void ASMixer::queue(ASClientConnection&, const ABuffer& buffer)
+{
+ ASSERT(buffer.size_in_bytes());
+ CLocker lock(m_lock);
+ m_pending_mixing.append(ASMixerBuffer(buffer));
+}
+
+void ASMixer::mix()
+{
+ Vector<ASMixerBuffer> active_mix_buffers;
+
+ for (;;) {
+ {
+ CLocker lock(m_lock);
+ for (const auto& buf : m_pending_mixing)
+ active_mix_buffers.append(buf);
+ m_pending_mixing.clear();
+ }
+
+ // ### use a wakeup of some kind rather than this garbage
+ if (active_mix_buffers.size() == 0) {
+ // nothing to mix yet
+ usleep(10000);
+ continue;
+ }
+
+ int max_size = 0;
+
+ for (auto& buffer : active_mix_buffers) {
+ if (buffer.done)
+ continue;
+ ASSERT(buffer.buffer->size_in_bytes()); // zero sized buffer? how?
+ max_size = max(max_size, buffer.buffer->size_in_bytes() - buffer.pos);
+ }
+
+ // ### clear up 'done' buffers more aggressively
+ if (max_size == 0) {
+ active_mix_buffers.clear();
+ continue;
+ }
+
+ max_size = min(1023, max_size);
+
+ Vector<ASample> mixed_buffer;
+ mixed_buffer.resize(max_size);
+
+ // Mix the buffers together into the output
+ for (auto& buffer : active_mix_buffers) {
+ if (buffer.done)
+ continue;
+ auto& samples = buffer.buffer->samples();
+
+ for (int i = 0; i < max_size && buffer.pos < samples.size(); ++buffer.pos, ++i) {
+ auto& mixed_sample = mixed_buffer[i];
+ mixed_sample += samples[buffer.pos];
+ }
+
+ // clear it later
+ if (buffer.pos == samples.size())
+ buffer.done = true;
+ }
+
+ // output the mixed stuff to the device
+ // max_size is 0 indexed, so add 1.
+ const int output_buffer_byte_size = (max_size + 1) * 2 * 2;
+ ASSERT(output_buffer_byte_size == 4096);
+ ByteBuffer buffer(ByteBuffer::create_uninitialized(output_buffer_byte_size));
+ BufferStream stream(buffer);
+
+ for (int i = 0; i < mixed_buffer.size(); ++i) {
+ auto& mixed_sample = mixed_buffer[i];
+ mixed_sample.clamp();
+
+ i16 out_sample;
+ out_sample = mixed_sample.left * std::numeric_limits<i16>::max();
+ stream << out_sample;
+
+ ASSERT(!stream.at_end()); // we should have enough space for both channels in one buffer!
+ out_sample = mixed_sample.right * std::numeric_limits<i16>::max();
+ stream << out_sample;
+
+ ASSERT(!stream.at_end());
+ }
+
+ if (stream.offset() != 0) {
+ buffer.trim(stream.offset());
+ m_device.write(buffer);
+ mixed_buffer.resize(0);
+ }
+ }
+}
diff --git a/Servers/AudioServer/ASMixer.h b/Servers/AudioServer/ASMixer.h
new file mode 100644
index 0000000000..2cf45fd13f
--- /dev/null
+++ b/Servers/AudioServer/ASMixer.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <AK/RefCounted.h>
+#include <AK/ByteBuffer.h>
+#include <LibCore/CFile.h>
+#include <LibCore/CLock.h>
+#include <LibAudio/ABuffer.h>
+#include <AK/NonnullRefPtrVector.h>
+
+class ASClientConnection;
+
+class ASMixer : public RefCounted<ASMixer> {
+public:
+ ASMixer();
+
+ void queue(ASClientConnection&, const ABuffer&);
+
+private:
+ struct ASMixerBuffer {
+ ASMixerBuffer(const NonnullRefPtr<ABuffer>& buf)
+ : buffer(buf)
+ {}
+ NonnullRefPtr<ABuffer> buffer;
+ int pos { 0 };
+ bool done { false };
+ };
+
+ Vector<ASMixerBuffer> m_pending_mixing;
+ CFile m_device;
+ CLock m_lock;
+
+ void mix();
+};
diff --git a/Servers/AudioServer/Makefile b/Servers/AudioServer/Makefile
index 74b7b9c22d..e813207d10 100644
--- a/Servers/AudioServer/Makefile
+++ b/Servers/AudioServer/Makefile
@@ -1,7 +1,10 @@
include ../../Makefile.common
AUDIOSERVER_OBJS = \
- main.o
+ main.o \
+ ASMixer.o \
+ ASClientConnection.o \
+ ASEventLoop.o
APP = AudioServer
OBJS = $(AUDIOSERVER_OBJS)
@@ -11,7 +14,7 @@ DEFINES += -DUSERLAND
all: $(APP)
$(APP): $(OBJS)
- $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore -laudio
+ $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore
.cpp.o:
@echo "CXX $<"; $(CXX) $(CXXFLAGS) -o $@ -c $<
diff --git a/Servers/AudioServer/main.cpp b/Servers/AudioServer/main.cpp
index 915aad33f5..ac7ef5297b 100644
--- a/Servers/AudioServer/main.cpp
+++ b/Servers/AudioServer/main.cpp
@@ -1,86 +1,6 @@
#include <LibCore/CFile.h>
-#include <LibCore/CEventLoop.h>
-#include <LibCore/CLocalSocket.h>
-#include <LibCore/CNotifier.h>
-#include <LibAudio/AWavLoader.h>
-#include <LibAudio/AWavFile.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
-class ASEventLoop
-{
-public:
- ASEventLoop();
- int exec() { return m_event_loop.exec(); }
-private:
- CEventLoop m_event_loop;
- CLocalSocket m_server_sock;
- OwnPtr<CNotifier> m_server_notifier;
-
- void drain_server();
-};
-
-void read_and_play_wav()
-{
- CFile audio("/dev/audio");
- if (!audio.open(CIODevice::WriteOnly)) {
- dbgprintf("Can't open audio device: %s\n", audio.error_string());
- return;
- }
-
- AWavLoader loader;
- const auto& file = loader.load_wav("/home/anon/tmp.wav");
- if (!file) {
- dbgprintf("Can't parse WAV: %s\n", loader.error_string());
- return;
- }
-
- dbgprintf("Read WAV of format %d with num_channels %d sample rate %d, bits per sample %d\n", (u8)file->format(), file->channel_count(), file->sample_rate_per_second(), file->bits_per_sample());
-
- auto contents = file->sample_data();
- const int chunk_size = 4096;
- int i = 0;
- while (i < contents.size()) {
- const auto chunk = contents.slice(i, chunk_size);
- audio.write(chunk);
- i += chunk_size;
- }
-}
-
-ASEventLoop::ASEventLoop()
-{
- read_and_play_wav();
-
- unlink("/tmp/asportal");
-
- sockaddr_un address;
- address.sun_family = AF_LOCAL;
- strcpy(address.sun_path, "/tmp/asportal");
- int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address));
- ASSERT(rc == 0);
- rc = listen(m_server_sock.fd(), 5);
- ASSERT(rc == 0);
-
- m_server_notifier = make<CNotifier>(m_server_sock.fd(), CNotifier::Read);
- m_server_notifier->on_ready_to_read = [this] { drain_server(); };
-}
-
-void ASEventLoop::drain_server()
-{
- sockaddr_un address;
- socklen_t address_size = sizeof(address);
- int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size);
- if (client_fd < 0) {
- dbgprintf("WindowServer: accept() failed: %s\n", strerror(errno));
- } else {
- dbgprintf("AudioServer: accept()ed client %d\n", client_fd);
- String s("hello, client!\n");
- write(client_fd, s.characters(), s.length());
- close(client_fd);
- }
-}
+#include "ASEventLoop.h"
int main(int, char**)
{