summaryrefslogtreecommitdiff
path: root/Userland/Libraries/LibWebSocket/WebSocket.cpp
blob: 16f557644cd6a5f43a56e24be7a5e212785dea07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
/*
 * Copyright (c) 2021, Dex♪ <dexes.ttp@gmail.com>
 *
 * SPDX-License-Identifier: BSD-2-Clause
 */

#include <AK/Base64.h>
#include <AK/Random.h>
#include <LibCrypto/Hash/HashManager.h>
#include <LibWebSocket/WebSocket.h>
#include <unistd.h>

namespace WebSocket {

// Note : The websocket protocol is defined by RFC 6455, found at https://tools.ietf.org/html/rfc6455
// In this file, section numbers will refer to the RFC 6455

NonnullRefPtr<WebSocket> WebSocket::create(ConnectionInfo connection)
{
    return adopt_ref(*new WebSocket(move(connection)));
}

WebSocket::WebSocket(ConnectionInfo connection)
    : m_connection(move(connection))
{
}

WebSocket::~WebSocket()
{
}

void WebSocket::start()
{
    VERIFY(m_state == WebSocket::InternalState::NotStarted);
    VERIFY(!m_impl);
    m_impl = WebSocketImpl::construct();

    m_impl->on_connection_error = [this] {
        dbgln("WebSocket: Connection error (underlying socket)");
        fatal_error(WebSocket::Error::CouldNotEstablishConnection);
    };
    m_impl->on_connected = [this] {
        if (m_state != WebSocket::InternalState::EstablishingProtocolConnection)
            return;
        m_state = WebSocket::InternalState::SendingClientHandshake;
        send_client_handshake();
        drain_read();
    };
    m_impl->on_ready_to_read = [this] {
        drain_read();
    };
    m_state = WebSocket::InternalState::EstablishingProtocolConnection;
    m_impl->connect(m_connection);
}

ReadyState WebSocket::ready_state()
{
    switch (m_state) {
    case WebSocket::InternalState::NotStarted:
    case WebSocket::InternalState::EstablishingProtocolConnection:
    case WebSocket::InternalState::SendingClientHandshake:
    case WebSocket::InternalState::WaitingForServerHandshake:
        return ReadyState::Connecting;
    case WebSocket::InternalState::Open:
        return ReadyState::Open;
    case WebSocket::InternalState::Closing:
        return ReadyState::Closing;
    case WebSocket::InternalState::Closed:
    case WebSocket::InternalState::Errored:
        return ReadyState::Closed;
    default:
        VERIFY_NOT_REACHED();
        return ReadyState::Closed;
    }
}

void WebSocket::send(Message const& message)
{
    // Calling send on a socket that is not opened is not allowed
    VERIFY(m_state == WebSocket::InternalState::Open);
    VERIFY(m_impl);
    if (message.is_text())
        send_frame(WebSocket::OpCode::Text, message.data(), true);
    else
        send_frame(WebSocket::OpCode::Binary, message.data(), true);
}

void WebSocket::close(u16 code, String const& message)
{
    // Calling close on a socket that is not opened is not allowed
    VERIFY(m_state == WebSocket::InternalState::Open);
    VERIFY(m_impl);
    auto message_bytes = message.bytes();
    auto close_payload = ByteBuffer::create_uninitialized(message_bytes.size() + 2).release_value_but_fixme_should_propagate_errors(); // FIXME: Handle possible OOM situation.
    close_payload.overwrite(0, (u8*)&code, 2);
    close_payload.overwrite(2, message_bytes.data(), message_bytes.size());
    send_frame(WebSocket::OpCode::ConnectionClose, close_payload, true);
}

void WebSocket::drain_read()
{
    if (m_impl->eof()) {
        // The connection got closed by the server
        m_state = WebSocket::InternalState::Closed;
        notify_close(m_last_close_code, m_last_close_message, true);
        discard_connection();
        return;
    }

    switch (m_state) {
    case InternalState::NotStarted:
    case InternalState::EstablishingProtocolConnection:
    case InternalState::SendingClientHandshake: {
        auto initializing_bytes = m_impl->read(1024);
        if (!initializing_bytes.is_error())
            dbgln("drain_read() was called on a websocket that isn't opened yet. Read {} bytes from the socket.", initializing_bytes.value().size());
    } break;
    case InternalState::WaitingForServerHandshake: {
        read_server_handshake();
    } break;
    case InternalState::Open:
    case InternalState::Closing: {
        read_frame();
    } break;
    case InternalState::Closed:
    case InternalState::Errored: {
        auto closed_bytes = m_impl->read(1024);
        if (!closed_bytes.is_error())
            dbgln("drain_read() was called on a closed websocket. Read {} bytes from the socket.", closed_bytes.value().size());
    } break;
    default:
        VERIFY_NOT_REACHED();
    }
}

// The client handshake message is defined in the second list of section 4.1
void WebSocket::send_client_handshake()
{
    VERIFY(m_impl);
    VERIFY(m_state == WebSocket::InternalState::SendingClientHandshake);
    StringBuilder builder;

    // 2. and 3. GET /resource name/ HTTP 1.1
    builder.appendff("GET {} HTTP/1.1\r\n", m_connection.resource_name());

    // 4. Host
    auto url = m_connection.url();
    builder.appendff("Host: {}", url.host());
    if (!m_connection.is_secure() && url.port_or_default() != 80)
        builder.appendff(":{}", url.port_or_default());
    else if (m_connection.is_secure() && url.port_or_default() != 443)
        builder.appendff(":{}", url.port_or_default());
    builder.append("\r\n");

    // 5. and 6. Connection Upgrade
    builder.append("Upgrade: websocket\r\n");
    builder.append("Connection: Upgrade\r\n");

    // 7. 16-byte nonce encoded as Base64
    u8 nonce_data[16];
    fill_with_random(nonce_data, 16);
    m_websocket_key = encode_base64(ReadonlyBytes(nonce_data, 16));
    builder.appendff("Sec-WebSocket-Key: {}\r\n", m_websocket_key);

    // 8. Origin (optional field)
    if (!m_connection.origin().is_empty()) {
        builder.appendff("Origin: {}\r\n", m_connection.origin());
    }

    // 9. Websocket version
    builder.append("Sec-WebSocket-Version: 13\r\n");

    // 10. Websocket protocol (optional field)
    if (!m_connection.protocols().is_empty()) {
        builder.append("Sec-WebSocket-Protocol: ");
        builder.join(",", m_connection.protocols());
        builder.append("\r\n");
    }

    // 11. Websocket extensions (optional field)
    if (!m_connection.extensions().is_empty()) {
        builder.append("Sec-WebSocket-Extensions: ");
        builder.join(",", m_connection.extensions());
        builder.append("\r\n");
    }

    // 12. Additional headers
    for (auto& header : m_connection.headers()) {
        builder.appendff("{}: {}\r\n", header.name, header.value);
    }

    builder.append("\r\n");

    m_state = WebSocket::InternalState::WaitingForServerHandshake;
    auto success = m_impl->send(builder.to_string().bytes());
    VERIFY(success);
}

// The server handshake message is defined in the third list of section 4.1
void WebSocket::read_server_handshake()
{
    VERIFY(m_impl);
    VERIFY(m_state == WebSocket::InternalState::WaitingForServerHandshake);
    // Read the server handshake
    if (!m_impl->can_read_line())
        return;

    if (!m_has_read_server_handshake_first_line) {
        auto header = m_impl->read_line(PAGE_SIZE).release_value_but_fixme_should_propagate_errors();
        auto parts = header.split(' ');
        if (parts.size() < 2) {
            dbgln("WebSocket: Server HTTP Handshake contained HTTP header was malformed");
            fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
            discard_connection();
            return;
        }
        if (parts[0] != "HTTP/1.1") {
            dbgln("WebSocket: Server HTTP Handshake contained HTTP header {} which isn't supported", parts[0]);
            fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
            discard_connection();
            return;
        }
        if (parts[1] != "101") {
            // 1. If the status code is not 101, handle as per HTTP procedures.
            // FIXME : This could be a redirect or a 401 authentication request, which we do not handle.
            dbgln("WebSocket: Server HTTP Handshake return status {} which isn't supported", parts[1]);
            fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
            return;
        }
        m_has_read_server_handshake_first_line = true;
    }

    // Read the rest of the reply until we find an empty line
    while (m_impl->can_read_line()) {
        auto line = m_impl->read_line(PAGE_SIZE).release_value_but_fixme_should_propagate_errors();
        if (line.is_whitespace()) {
            // We're done with the HTTP headers.
            // Fail the connection if we're missing any of the following:
            if (!m_has_read_server_handshake_upgrade) {
                // 2. |Upgrade| should be present
                dbgln("WebSocket: Server HTTP Handshake didn't contain an |Upgrade| header");
                fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                return;
            }
            if (!m_has_read_server_handshake_connection) {
                // 2. |Connection| should be present
                dbgln("WebSocket: Server HTTP Handshake didn't contain a |Connection| header");
                fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                return;
            }
            if (!m_has_read_server_handshake_accept) {
                // 2. |Sec-WebSocket-Accept| should be present
                dbgln("WebSocket: Server HTTP Handshake didn't contain a |Sec-WebSocket-Accept| header");
                fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                return;
            }

            m_state = WebSocket::InternalState::Open;
            notify_open();
            return;
        }

        auto parts = line.split(':');
        if (parts.size() < 2) {
            // The header field is not valid
            dbgln("WebSocket: Got invalid header line {} in the Server HTTP handshake", line);
            fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
            return;
        }

        auto header_name = parts[0];

        if (header_name.equals_ignoring_case("Upgrade")) {
            // 2. |Upgrade| should be case-insensitive "websocket"
            if (!parts[1].trim_whitespace().equals_ignoring_case("websocket")) {
                dbgln("WebSocket: Server HTTP Handshake Header |Upgrade| should be 'websocket', got '{}'. Failing connection.", parts[1]);
                fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                return;
            }

            m_has_read_server_handshake_upgrade = true;
            continue;
        }

        if (header_name.equals_ignoring_case("Connection")) {
            // 3. |Connection| should be case-insensitive "Upgrade"
            if (!parts[1].trim_whitespace().equals_ignoring_case("Upgrade")) {
                dbgln("WebSocket: Server HTTP Handshake Header |Connection| should be 'Upgrade', got '{}'. Failing connection.", parts[1]);
                return;
            }

            m_has_read_server_handshake_connection = true;
            continue;
        }

        if (header_name.equals_ignoring_case("Sec-WebSocket-Accept")) {
            // 4. |Sec-WebSocket-Accept| should be base64(SHA1(|Sec-WebSocket-Key| + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
            auto expected_content = String::formatted("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", m_websocket_key);

            Crypto::Hash::Manager hash;
            hash.initialize(Crypto::Hash::HashKind::SHA1);
            hash.update(expected_content);
            auto expected_sha1 = hash.digest();
            auto expected_sha1_string = encode_base64(ReadonlyBytes(expected_sha1.immutable_data(), expected_sha1.data_length()));
            if (!parts[1].trim_whitespace().equals_ignoring_case(expected_sha1_string)) {
                dbgln("WebSocket: Server HTTP Handshake Header |Sec-Websocket-Accept| should be '{}', got '{}'. Failing connection.", expected_sha1_string, parts[1]);
                fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                return;
            }

            m_has_read_server_handshake_accept = true;
            continue;
        }

        if (header_name.equals_ignoring_case("Sec-WebSocket-Extensions")) {
            // 5. |Sec-WebSocket-Extensions| should not contain an extension that doesn't appear in m_connection->extensions()
            auto server_extensions = parts[1].split(',');
            for (auto const& extension : server_extensions) {
                auto trimmed_extension = extension.trim_whitespace();
                bool found_extension = false;
                for (auto const& supported_extension : m_connection.extensions()) {
                    if (trimmed_extension.equals_ignoring_case(supported_extension)) {
                        found_extension = true;
                    }
                }
                if (!found_extension) {
                    dbgln("WebSocket: Server HTTP Handshake Header |Sec-WebSocket-Extensions| contains '{}', which is not supported by the client. Failing connection.", trimmed_extension);
                    fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                    return;
                }
            }
            continue;
        }

        if (header_name.equals_ignoring_case("Sec-WebSocket-Protocol")) {
            // 6. |Sec-WebSocket-Protocol| should not contain an extension that doesn't appear in m_connection->protocols()
            auto server_protocols = parts[1].split(',');
            for (auto const& protocol : server_protocols) {
                auto trimmed_protocol = protocol.trim_whitespace();
                bool found_protocol = false;
                for (auto const& supported_protocol : m_connection.protocols()) {
                    if (trimmed_protocol.equals_ignoring_case(supported_protocol)) {
                        found_protocol = true;
                    }
                }
                if (!found_protocol) {
                    dbgln("WebSocket: Server HTTP Handshake Header |Sec-WebSocket-Protocol| contains '{}', which is not supported by the client. Failing connection.", trimmed_protocol);
                    fatal_error(WebSocket::Error::ConnectionUpgradeFailed);
                    return;
                }
            }
            continue;
        }
    }

    // If needed, we will keep reading the header on the next drain_read call
}

void WebSocket::read_frame()
{
    VERIFY(m_impl);
    VERIFY(m_state == WebSocket::InternalState::Open || m_state == WebSocket::InternalState::Closing);

    auto head_bytes_result = m_impl->read(2);
    if (head_bytes_result.is_error() || head_bytes_result.value().is_empty()) {
        // The connection got closed.
        m_state = WebSocket::InternalState::Closed;
        notify_close(m_last_close_code, m_last_close_message, true);
        discard_connection();
        return;
    }
    auto head_bytes = head_bytes_result.release_value();
    VERIFY(head_bytes.size() == 2);

    bool is_final_frame = head_bytes[0] & 0x80;
    if (!is_final_frame) {
        // FIXME: Support fragmented frames
        TODO();
    }

    auto op_code = (WebSocket::OpCode)(head_bytes[0] & 0x0f);
    bool is_masked = head_bytes[1] & 0x80;

    // Parse the payload length.
    size_t payload_length;
    auto payload_length_bits = head_bytes[1] & 0x7f;
    if (payload_length_bits == 127) {
        // A code of 127 means that the next 8 bytes contains the payload length
        auto actual_bytes = MUST(m_impl->read(8));
        VERIFY(actual_bytes.size() == 8);
        u64 full_payload_length = (u64)((u64)(actual_bytes[0] & 0xff) << 56)
            | (u64)((u64)(actual_bytes[1] & 0xff) << 48)
            | (u64)((u64)(actual_bytes[2] & 0xff) << 40)
            | (u64)((u64)(actual_bytes[3] & 0xff) << 32)
            | (u64)((u64)(actual_bytes[4] & 0xff) << 24)
            | (u64)((u64)(actual_bytes[5] & 0xff) << 16)
            | (u64)((u64)(actual_bytes[6] & 0xff) << 8)
            | (u64)((u64)(actual_bytes[7] & 0xff) << 0);
        VERIFY(full_payload_length <= NumericLimits<size_t>::max());
        payload_length = (size_t)full_payload_length;
    } else if (payload_length_bits == 126) {
        // A code of 126 means that the next 2 bytes contains the payload length
        auto actual_bytes = MUST(m_impl->read(2));
        VERIFY(actual_bytes.size() == 2);
        payload_length = (size_t)((size_t)(actual_bytes[0] & 0xff) << 8)
            | (size_t)((size_t)(actual_bytes[1] & 0xff) << 0);
    } else {
        payload_length = (size_t)payload_length_bits;
    }

    // Parse the mask, if it exists.
    // Note : this is technically non-conformant with Section 5.1 :
    // > A server MUST NOT mask any frames that it sends to the client.
    // > A client MUST close a connection if it detects a masked frame.
    // > (These rules might be relaxed in a future specification.)
    // But because it doesn't cost much, we can support receiving masked frames anyways.
    u8 masking_key[4];
    if (is_masked) {
        auto masking_key_data = MUST(m_impl->read(4));
        VERIFY(masking_key_data.size() == 4);
        masking_key[0] = masking_key_data[0];
        masking_key[1] = masking_key_data[1];
        masking_key[2] = masking_key_data[2];
        masking_key[3] = masking_key_data[3];
    }

    auto payload = ByteBuffer::create_uninitialized(payload_length).release_value_but_fixme_should_propagate_errors(); // FIXME: Handle possible OOM situation.
    u64 read_length = 0;
    while (read_length < payload_length) {
        auto payload_part_result = m_impl->read(payload_length - read_length);
        if (payload_part_result.is_error() || payload_part_result.value().is_empty()) {
            // We got disconnected, somehow.
            dbgln("Websocket: Server disconnected while sending payload ({} bytes read out of {})", read_length, payload_length);
            fatal_error(WebSocket::Error::ServerClosedSocket);
            return;
        }
        auto payload_part = payload_part_result.release_value();
        // We read at most "actual_length - read" bytes, so this is safe to do.
        payload.overwrite(read_length, payload_part.data(), payload_part.size());
        read_length += payload_part.size();
    }

    if (is_masked) {
        // Unmask the payload
        for (size_t i = 0; i < payload.size(); ++i) {
            payload[i] = payload[i] ^ (masking_key[i % 4]);
        }
    }

    if (op_code == WebSocket::OpCode::ConnectionClose) {
        if (payload.size() > 1) {
            m_last_close_code = (((u16)(payload[0] & 0xff) << 8) | ((u16)(payload[1] & 0xff)));
            m_last_close_message = String(ReadonlyBytes(payload.offset_pointer(2), payload.size() - 2));
        }
        m_state = WebSocket::InternalState::Closing;
        return;
    }
    if (op_code == WebSocket::OpCode::Ping) {
        // Immediately send a pong frame as a reply, with the given payload.
        send_frame(WebSocket::OpCode::Pong, payload, true);
        return;
    }
    if (op_code == WebSocket::OpCode::Pong) {
        // We can safely ignore the pong
        return;
    }
    if (op_code == WebSocket::OpCode::Continuation) {
        // FIXME: Support fragmented frames
        TODO();
    }
    if (op_code == WebSocket::OpCode::Text) {
        notify_message(Message(payload, true));
        return;
    }
    if (op_code == WebSocket::OpCode::Binary) {
        notify_message(Message(payload, false));
        return;
    }
    dbgln("Websocket: Found unknown opcode {}", (u8)op_code);
}

void WebSocket::send_frame(WebSocket::OpCode op_code, ReadonlyBytes payload, bool is_final)
{
    VERIFY(m_impl);
    VERIFY(m_state == WebSocket::InternalState::Open);
    u8 frame_head[1] = { (u8)((is_final ? 0x80 : 0x00) | ((u8)(op_code)&0xf)) };
    m_impl->send(ReadonlyBytes(frame_head, 1));
    // Section 5.1 : a client MUST mask all frames that it sends to the server
    bool has_mask = true;
    // FIXME: If the payload has a size > size_t max on a 32-bit platform, we could
    //     technically stream it via non-final packets. However, the size was already
    //     truncated earlier in the call stack when stuffing into a ReadonlyBytes
    if (payload.size() > NumericLimits<u16>::max()) {
        // Send (the 'mask' flag + 127) + the 8-byte payload length
        if constexpr (sizeof(size_t) >= 8) {
            u8 payload_length[9] = {
                (u8)((has_mask ? 0x80 : 0x00) | 127),
                (u8)((payload.size() >> 56) & 0xff),
                (u8)((payload.size() >> 48) & 0xff),
                (u8)((payload.size() >> 40) & 0xff),
                (u8)((payload.size() >> 32) & 0xff),
                (u8)((payload.size() >> 24) & 0xff),
                (u8)((payload.size() >> 16) & 0xff),
                (u8)((payload.size() >> 8) & 0xff),
                (u8)((payload.size() >> 0) & 0xff),
            };
            m_impl->send(ReadonlyBytes(payload_length, 9));
        } else {
            u8 payload_length[9] = {
                (u8)((has_mask ? 0x80 : 0x00) | 127),
                0,
                0,
                0,
                0,
                (u8)((payload.size() >> 24) & 0xff),
                (u8)((payload.size() >> 16) & 0xff),
                (u8)((payload.size() >> 8) & 0xff),
                (u8)((payload.size() >> 0) & 0xff),
            };
            m_impl->send(ReadonlyBytes(payload_length, 9));
        }
    } else if (payload.size() >= 126) {
        // Send (the 'mask' flag + 126) + the 2-byte payload length
        u8 payload_length[3] = {
            (u8)((has_mask ? 0x80 : 0x00) | 126),
            (u8)((payload.size() >> 8) & 0xff),
            (u8)((payload.size() >> 0) & 0xff),
        };
        m_impl->send(ReadonlyBytes(payload_length, 3));
    } else {
        // Send the mask flag + the payload in a single byte
        u8 payload_length[1] = {
            (u8)((has_mask ? 0x80 : 0x00) | (u8)(payload.size() & 0x7f)),
        };
        m_impl->send(ReadonlyBytes(payload_length, 1));
    }
    if (has_mask) {
        // Section 10.3 :
        // > Clients MUST choose a new masking key for each frame, using an algorithm
        // > that cannot be predicted by end applications that provide data
        u8 masking_key[4];
        fill_with_random(masking_key, 4);
        m_impl->send(ReadonlyBytes(masking_key, 4));
        // don't try to send empty payload
        if (payload.size() == 0)
            return;
        // Mask the payload
        auto buffer_result = ByteBuffer::create_uninitialized(payload.size());
        if (!buffer_result.is_error()) {
            auto& masked_payload = buffer_result.value();
            for (size_t i = 0; i < payload.size(); ++i) {
                masked_payload[i] = payload[i] ^ (masking_key[i % 4]);
            }
            m_impl->send(masked_payload);
        }
    } else if (payload.size() > 0) {
        m_impl->send(payload);
    }
}

void WebSocket::fatal_error(WebSocket::Error error)
{
    m_state = WebSocket::InternalState::Errored;
    notify_error(error);
    discard_connection();
}

void WebSocket::discard_connection()
{
    deferred_invoke([this] {
        VERIFY(m_impl);
        m_impl->discard_connection();
        m_impl->on_connection_error = nullptr;
        m_impl->on_connected = nullptr;
        m_impl->on_ready_to_read = nullptr;
        m_impl = nullptr;
    });
}

void WebSocket::notify_open()
{
    if (!on_open)
        return;
    on_open();
}

void WebSocket::notify_close(u16 code, String reason, bool was_clean)
{
    if (!on_close)
        return;
    on_close(code, move(reason), was_clean);
}

void WebSocket::notify_error(WebSocket::Error error)
{
    if (!on_error)
        return;
    on_error(error);
}

void WebSocket::notify_message(Message message)
{
    if (!on_message)
        return;
    on_message(move(message));
}

}