diff options
Diffstat (limited to 'Servers')
-rw-r--r-- | Servers/AudioServer/ASClientConnection.cpp | 176 | ||||
-rw-r--r-- | Servers/AudioServer/ASClientConnection.h | 84 | ||||
-rw-r--r-- | Servers/AudioServer/ASEventLoop.cpp | 37 | ||||
-rw-r--r-- | Servers/AudioServer/ASEventLoop.h | 20 | ||||
-rw-r--r-- | Servers/AudioServer/ASMixer.cpp | 112 | ||||
-rw-r--r-- | Servers/AudioServer/ASMixer.h | 33 | ||||
-rw-r--r-- | Servers/AudioServer/Makefile | 7 | ||||
-rw-r--r-- | Servers/AudioServer/main.cpp | 82 |
8 files changed, 468 insertions, 83 deletions
diff --git a/Servers/AudioServer/ASClientConnection.cpp b/Servers/AudioServer/ASClientConnection.cpp new file mode 100644 index 0000000000..1bd4a9e255 --- /dev/null +++ b/Servers/AudioServer/ASClientConnection.cpp @@ -0,0 +1,176 @@ +#include "ASClientConnection.h" +#include "ASMixer.h" + +#include <LibCore/CEventLoop.h> +#include <LibAudio/ASAPI.h> +#include <LibAudio/ABuffer.h> +#include <SharedBuffer.h> + +#include <errno.h> +#include <unistd.h> +#include <sys/uio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <stdio.h> + +ASClientConnection::ASClientConnection(int fd, int client_id, ASMixer& mixer) + : m_socket(fd) + , m_notifier(CNotifier(fd, CNotifier::Read)) + , m_client_id(client_id) + , m_mixer(mixer) +{ + m_notifier.on_ready_to_read = [this] { drain_client(); }; + ASAPI_ServerMessage message; + message.type = ASAPI_ServerMessage::Type::Greeting; + message.greeting.server_pid = getpid(); + message.greeting.your_client_id = m_client_id; + post_message(message); + dbg() << "********** S: Created new ASClientConnection " << fd << client_id << " and said hello"; +} + +ASClientConnection::~ASClientConnection() +{ + dbg() << "********** S: Destroyed ASClientConnection " << m_socket.fd() << m_client_id; +} + +void ASClientConnection::post_message(const ASAPI_ServerMessage& message, const ByteBuffer& extra_data) +{ + if (!extra_data.is_empty()) + const_cast<ASAPI_ServerMessage&>(message).extra_size = extra_data.size(); + + struct iovec iov[2]; + int iov_count = 1; + + iov[0].iov_base = const_cast<ASAPI_ServerMessage*>(&message); + iov[0].iov_len = sizeof(message); + + if (!extra_data.is_empty()) { + iov[1].iov_base = const_cast<u8*>(extra_data.data()); + iov[1].iov_len = extra_data.size(); + ++iov_count; + } + + int nwritten = writev(m_socket.fd(), iov, iov_count); + if (nwritten < 0) { + switch (errno) { + case EPIPE: + dbgprintf("WSClientConnection::post_message: Disconnected from peer.\n"); + delete_later(); + return; + break; + case EAGAIN: + dbgprintf("WSClientConnection::post_message: Client buffer overflowed.\n"); + did_misbehave(); + return; + break; + default: + perror("WSClientConnection::post_message writev"); + ASSERT_NOT_REACHED(); + } + } + + ASSERT(nwritten == (int)(sizeof(message) + extra_data.size())); +} + +void ASClientConnection::event(CEvent& event) +{ + if (event.type() == ASEvent::WM_ClientDisconnected) { + int client_id = static_cast<const ASClientDisconnectedNotification&>(event).client_id(); + dbgprintf("ASClientConnection: Client disconnected: %d\n", client_id); + delete this; + return; + } + + CObject::event(event); +} + +void ASClientConnection::drain_client() +{ + unsigned messages_received = 0; + for (;;) { + ASAPI_ClientMessage message; + // FIXME: Don't go one message at a time, that's so much context switching, oof. + ssize_t nread = recv(m_socket.fd(), &message, sizeof(ASAPI_ClientMessage), MSG_DONTWAIT); + if (nread == 0 || (nread == -1 && errno == EAGAIN)) { + if (!messages_received) { + // TODO: is delete_later() sufficient? + CEventLoop::current().post_event(*this, make<ASClientDisconnectedNotification>(m_client_id)); + } + break; + } + if (nread < 0) { + perror("recv"); + ASSERT_NOT_REACHED(); + } + ByteBuffer extra_data; + if (message.extra_size) { + if (message.extra_size >= 32768) { + dbgprintf("message.extra_size is way too large\n"); + return did_misbehave(); + } + extra_data = ByteBuffer::create_uninitialized(message.extra_size); + // FIXME: We should allow this to time out. Maybe use a socket timeout? + int extra_nread = read(m_socket.fd(), extra_data.data(), extra_data.size()); + if (extra_nread != (int)message.extra_size) { + dbgprintf("extra_nread(%d) != extra_size(%d)\n", extra_nread, extra_data.size()); + if (extra_nread < 0) + perror("read"); + return did_misbehave(); + } + } + if (!handle_message(message, move(extra_data))) + return; + ++messages_received; + } +} + +bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, const ByteBuffer&) +{ + switch (message.type) { + case ASAPI_ClientMessage::Type::Greeting: + m_pid = message.greeting.client_pid; + break; + case ASAPI_ClientMessage::Type::PlayBuffer: { + // ### ensure that the size is that of a Vector<ASample> + Vector<ASample> samples; + + { + const auto& shared_buf = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id); + if (!shared_buf) { + did_misbehave(); + return false; + } + + if (shared_buf->size() / sizeof(ASample) > 441000) { + did_misbehave(); + return false; + } + samples.resize(shared_buf->size() / sizeof(ASample)); + memcpy(samples.data(), shared_buf->data(), shared_buf->size()); + } + + // we no longer need the buffer, so acknowledge that it's playing + // TODO: rate limit playback here somehow + ASAPI_ServerMessage reply; + reply.type = ASAPI_ServerMessage::Type::PlayingBuffer; + reply.playing_buffer.buffer_id = message.play_buffer.buffer_id; + post_message(reply); + + m_mixer.queue(*this, adopt(*new ABuffer(samples))); + break; + } + case ASAPI_ClientMessage::Type::Invalid: + default: + dbgprintf("ASClientConnection: Unexpected message ID %d\n", int(message.type)); + did_misbehave(); + } + + return true; +} + +void ASClientConnection::did_misbehave() +{ + dbgprintf("ASClientConnection{%p} (id=%d, pid=%d) misbehaved, disconnecting.\n", this, m_client_id, m_pid); + delete_later(); + m_notifier.set_enabled(false); +} diff --git a/Servers/AudioServer/ASClientConnection.h b/Servers/AudioServer/ASClientConnection.h new file mode 100644 index 0000000000..27f9cd3259 --- /dev/null +++ b/Servers/AudioServer/ASClientConnection.h @@ -0,0 +1,84 @@ +#pragma once + +#include <LibCore/CObject.h> +#include <LibCore/CEvent.h> +#include <LibCore/CIODevice.h> +#include <LibCore/CNotifier.h> + +struct ASAPI_ServerMessage; +struct ASAPI_ClientMessage; + +class ASEvent : public CEvent { +public: + enum Type { + Invalid = 2000, + WM_ClientDisconnected, + }; + ASEvent() {} + explicit ASEvent(Type type) + : CEvent(type) + { + } +}; + +class ASClientDisconnectedNotification : public ASEvent { +public: + explicit ASClientDisconnectedNotification(int client_id) + : ASEvent(WM_ClientDisconnected) + , m_client_id(client_id) + { + } + + int client_id() const { return m_client_id; } + +private: + int m_client_id { 0 }; +}; + +class ASMixer; + +class ASClientConnection : public CObject +{ +public: + ASClientConnection(int fd, int client_id, ASMixer& mixer); + ~ASClientConnection(); + + void post_message(const ASAPI_ServerMessage&, const ByteBuffer& = {}); + bool handle_message(const ASAPI_ClientMessage&, const ByteBuffer& = {}); + + void drain_client(); + + void did_misbehave(); + + const char* class_name() const override { return "ASClientConnection"; } + +protected: + void event(CEvent& event) override; +private: + // TODO: A way to create some kind of CIODevice with an open FD would be nice. + class ASOpenedSocket : public CIODevice + { + public: + const char* class_name() const override { return "ASOpenedSocket"; } + ASOpenedSocket(int fd) + { + set_fd(fd); + set_mode(CIODevice::OpenMode::ReadWrite); + } + + bool open(CIODevice::OpenMode) override + { + ASSERT_NOT_REACHED(); + return true; + }; + + int fd() const { return CIODevice::fd(); } + }; + + ASOpenedSocket m_socket; + CNotifier m_notifier; + int m_client_id; + int m_pid; + ASMixer& m_mixer; +}; + diff --git a/Servers/AudioServer/ASEventLoop.cpp b/Servers/AudioServer/ASEventLoop.cpp new file mode 100644 index 0000000000..d68916efd6 --- /dev/null +++ b/Servers/AudioServer/ASEventLoop.cpp @@ -0,0 +1,37 @@ +#include "ASEventLoop.h" +#include "ASClientConnection.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +ASEventLoop::ASEventLoop() +{ + unlink("/tmp/asportal"); + + sockaddr_un address; + address.sun_family = AF_LOCAL; + strcpy(address.sun_path, "/tmp/asportal"); + int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address)); + ASSERT(rc == 0); + rc = listen(m_server_sock.fd(), 5); + ASSERT(rc == 0); + + m_server_notifier = make<CNotifier>(m_server_sock.fd(), CNotifier::Read); + m_server_notifier->on_ready_to_read = [this] { drain_server(); }; +} + +void ASEventLoop::drain_server() +{ + sockaddr_un address; + socklen_t address_size = sizeof(address); + int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size); + if (client_fd < 0) { + dbgprintf("AudioServer: accept() failed: %s\n", strerror(errno)); + } else { + dbgprintf("AudioServer: accept()ed client %d\n", client_fd); + static int s_next_client_id = 0; + new ASClientConnection(client_fd, s_next_client_id++, m_mixer); + } +} + diff --git a/Servers/AudioServer/ASEventLoop.h b/Servers/AudioServer/ASEventLoop.h new file mode 100644 index 0000000000..c6a5ed259f --- /dev/null +++ b/Servers/AudioServer/ASEventLoop.h @@ -0,0 +1,20 @@ +#pragma once + +#include <LibCore/CEventLoop.h> +#include <LibCore/CLocalSocket.h> +#include <LibCore/CNotifier.h> +#include "ASMixer.h" + +class ASEventLoop +{ +public: + ASEventLoop(); + int exec() { return m_event_loop.exec(); } +private: + CEventLoop m_event_loop; + CLocalSocket m_server_sock; + OwnPtr<CNotifier> m_server_notifier; + ASMixer m_mixer; + + void drain_server(); +}; diff --git a/Servers/AudioServer/ASMixer.cpp b/Servers/AudioServer/ASMixer.cpp new file mode 100644 index 0000000000..15a00dcb4d --- /dev/null +++ b/Servers/AudioServer/ASMixer.cpp @@ -0,0 +1,112 @@ +#include <AK/BufferStream.h> +#include <LibCore/CThread.h> + +#include <limits> +#include "ASMixer.h" + +ASMixer::ASMixer() + : m_device("/dev/audio") +{ + if (!m_device.open(CIODevice::WriteOnly)) { + dbgprintf("Can't open audio device: %s\n", m_device.error_string()); + return; + } + + CThread sound_thread([](void* context) -> int { + ASMixer* mixer = (ASMixer*)context; + mixer->mix(); + return 0; + }, this); +} + +void ASMixer::queue(ASClientConnection&, const ABuffer& buffer) +{ + ASSERT(buffer.size_in_bytes()); + CLocker lock(m_lock); + m_pending_mixing.append(ASMixerBuffer(buffer)); +} + +void ASMixer::mix() +{ + Vector<ASMixerBuffer> active_mix_buffers; + + for (;;) { + { + CLocker lock(m_lock); + for (const auto& buf : m_pending_mixing) + active_mix_buffers.append(buf); + m_pending_mixing.clear(); + } + + // ### use a wakeup of some kind rather than this garbage + if (active_mix_buffers.size() == 0) { + // nothing to mix yet + usleep(10000); + continue; + } + + int max_size = 0; + + for (auto& buffer : active_mix_buffers) { + if (buffer.done) + continue; + ASSERT(buffer.buffer->size_in_bytes()); // zero sized buffer? how? + max_size = max(max_size, buffer.buffer->size_in_bytes() - buffer.pos); + } + + // ### clear up 'done' buffers more aggressively + if (max_size == 0) { + active_mix_buffers.clear(); + continue; + } + + max_size = min(1023, max_size); + + Vector<ASample> mixed_buffer; + mixed_buffer.resize(max_size); + + // Mix the buffers together into the output + for (auto& buffer : active_mix_buffers) { + if (buffer.done) + continue; + auto& samples = buffer.buffer->samples(); + + for (int i = 0; i < max_size && buffer.pos < samples.size(); ++buffer.pos, ++i) { + auto& mixed_sample = mixed_buffer[i]; + mixed_sample += samples[buffer.pos]; + } + + // clear it later + if (buffer.pos == samples.size()) + buffer.done = true; + } + + // output the mixed stuff to the device + // max_size is 0 indexed, so add 1. + const int output_buffer_byte_size = (max_size + 1) * 2 * 2; + ASSERT(output_buffer_byte_size == 4096); + ByteBuffer buffer(ByteBuffer::create_uninitialized(output_buffer_byte_size)); + BufferStream stream(buffer); + + for (int i = 0; i < mixed_buffer.size(); ++i) { + auto& mixed_sample = mixed_buffer[i]; + mixed_sample.clamp(); + + i16 out_sample; + out_sample = mixed_sample.left * std::numeric_limits<i16>::max(); + stream << out_sample; + + ASSERT(!stream.at_end()); // we should have enough space for both channels in one buffer! + out_sample = mixed_sample.right * std::numeric_limits<i16>::max(); + stream << out_sample; + + ASSERT(!stream.at_end()); + } + + if (stream.offset() != 0) { + buffer.trim(stream.offset()); + m_device.write(buffer); + mixed_buffer.resize(0); + } + } +} diff --git a/Servers/AudioServer/ASMixer.h b/Servers/AudioServer/ASMixer.h new file mode 100644 index 0000000000..2cf45fd13f --- /dev/null +++ b/Servers/AudioServer/ASMixer.h @@ -0,0 +1,33 @@ +#pragma once + +#include <AK/RefCounted.h> +#include <AK/ByteBuffer.h> +#include <LibCore/CFile.h> +#include <LibCore/CLock.h> +#include <LibAudio/ABuffer.h> +#include <AK/NonnullRefPtrVector.h> + +class ASClientConnection; + +class ASMixer : public RefCounted<ASMixer> { +public: + ASMixer(); + + void queue(ASClientConnection&, const ABuffer&); + +private: + struct ASMixerBuffer { + ASMixerBuffer(const NonnullRefPtr<ABuffer>& buf) + : buffer(buf) + {} + NonnullRefPtr<ABuffer> buffer; + int pos { 0 }; + bool done { false }; + }; + + Vector<ASMixerBuffer> m_pending_mixing; + CFile m_device; + CLock m_lock; + + void mix(); +}; diff --git a/Servers/AudioServer/Makefile b/Servers/AudioServer/Makefile index 74b7b9c22d..e813207d10 100644 --- a/Servers/AudioServer/Makefile +++ b/Servers/AudioServer/Makefile @@ -1,7 +1,10 @@ include ../../Makefile.common AUDIOSERVER_OBJS = \ - main.o + main.o \ + ASMixer.o \ + ASClientConnection.o \ + ASEventLoop.o APP = AudioServer OBJS = $(AUDIOSERVER_OBJS) @@ -11,7 +14,7 @@ DEFINES += -DUSERLAND all: $(APP) $(APP): $(OBJS) - $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore -laudio + $(LD) -o $(APP) $(LDFLAGS) $(OBJS) -lc -lcore .cpp.o: @echo "CXX $<"; $(CXX) $(CXXFLAGS) -o $@ -c $< diff --git a/Servers/AudioServer/main.cpp b/Servers/AudioServer/main.cpp index 915aad33f5..ac7ef5297b 100644 --- a/Servers/AudioServer/main.cpp +++ b/Servers/AudioServer/main.cpp @@ -1,86 +1,6 @@ #include <LibCore/CFile.h> -#include <LibCore/CEventLoop.h> -#include <LibCore/CLocalSocket.h> -#include <LibCore/CNotifier.h> -#include <LibAudio/AWavLoader.h> -#include <LibAudio/AWavFile.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> - -class ASEventLoop -{ -public: - ASEventLoop(); - int exec() { return m_event_loop.exec(); } -private: - CEventLoop m_event_loop; - CLocalSocket m_server_sock; - OwnPtr<CNotifier> m_server_notifier; - - void drain_server(); -}; - -void read_and_play_wav() -{ - CFile audio("/dev/audio"); - if (!audio.open(CIODevice::WriteOnly)) { - dbgprintf("Can't open audio device: %s\n", audio.error_string()); - return; - } - - AWavLoader loader; - const auto& file = loader.load_wav("/home/anon/tmp.wav"); - if (!file) { - dbgprintf("Can't parse WAV: %s\n", loader.error_string()); - return; - } - - dbgprintf("Read WAV of format %d with num_channels %d sample rate %d, bits per sample %d\n", (u8)file->format(), file->channel_count(), file->sample_rate_per_second(), file->bits_per_sample()); - - auto contents = file->sample_data(); - const int chunk_size = 4096; - int i = 0; - while (i < contents.size()) { - const auto chunk = contents.slice(i, chunk_size); - audio.write(chunk); - i += chunk_size; - } -} - -ASEventLoop::ASEventLoop() -{ - read_and_play_wav(); - - unlink("/tmp/asportal"); - - sockaddr_un address; - address.sun_family = AF_LOCAL; - strcpy(address.sun_path, "/tmp/asportal"); - int rc = bind(m_server_sock.fd(), (const sockaddr*)&address, sizeof(address)); - ASSERT(rc == 0); - rc = listen(m_server_sock.fd(), 5); - ASSERT(rc == 0); - - m_server_notifier = make<CNotifier>(m_server_sock.fd(), CNotifier::Read); - m_server_notifier->on_ready_to_read = [this] { drain_server(); }; -} - -void ASEventLoop::drain_server() -{ - sockaddr_un address; - socklen_t address_size = sizeof(address); - int client_fd = accept(m_server_sock.fd(), (sockaddr*)&address, &address_size); - if (client_fd < 0) { - dbgprintf("WindowServer: accept() failed: %s\n", strerror(errno)); - } else { - dbgprintf("AudioServer: accept()ed client %d\n", client_fd); - String s("hello, client!\n"); - write(client_fd, s.characters(), s.length()); - close(client_fd); - } -} +#include "ASEventLoop.h" int main(int, char**) { |