diff --git a/include/http/http_callbacks.h b/include/http/http_callbacks.h new file mode 100644 index 0000000..83de0ab --- /dev/null +++ b/include/http/http_callbacks.h @@ -0,0 +1,59 @@ +#pragma once + +#include "common.h" +// , , , provided by common.h + +// Forward declarations +class HttpConnectionHandler; +class ConnectionHandler; +class WebSocketConnection; +struct HttpRequest; +class HttpResponse; + +namespace HTTP_CALLBACKS_NAMESPACE { + + // ---- HttpConnectionHandler callbacks ------------------------------------ + using HttpConnRequestCallback = std::function self, + const HttpRequest& request, + HttpResponse& response + )>; + using HttpConnRouteCheckCallback = std::function; + using HttpConnMiddlewareCallback = std::function; + using HttpConnUpgradeCallback = std::function self, + const HttpRequest& request + )>; + + struct HttpConnCallbacks { + HttpConnRequestCallback request_callback = nullptr; + HttpConnRouteCheckCallback route_check_callback = nullptr; + HttpConnMiddlewareCallback middleware_callback = nullptr; + HttpConnUpgradeCallback upgrade_callback = nullptr; + }; + + // ---- WebSocketConnection callbacks -------------------------------------- + using WsMessageCallback = std::function; + using WsCloseCallback = std::function; + using WsPingCallback = std::function; + using WsErrorCallback = std::function; + + struct WsCallbacks { + WsMessageCallback message_callback = nullptr; + WsCloseCallback close_callback = nullptr; + WsPingCallback ping_callback = nullptr; + WsErrorCallback error_callback = nullptr; + }; + +} // namespace HTTP_CALLBACKS_NAMESPACE diff --git a/include/http/http_connection_handler.h b/include/http/http_connection_handler.h index 3428af4..2b929b4 100644 --- a/include/http/http_connection_handler.h +++ b/include/http/http_connection_handler.h @@ -1,5 +1,6 @@ #pragma once +#include "http/http_callbacks.h" #include "http/http_parser.h" #include "http/http_request.h" #include "http/http_response.h" @@ -13,30 +14,15 @@ class HttpConnectionHandler : public std::enable_shared_from_this conn); - // Handler for complete HTTP requests - using RequestCallback = std::function self, - const HttpRequest& request, - HttpResponse& response - )>; - void SetRequestCallback(RequestCallback callback); + // Public type aliases for backward compatibility with SetupHandlers() callers + using RequestCallback = HTTP_CALLBACKS_NAMESPACE::HttpConnRequestCallback; + using RouteCheckCallback = HTTP_CALLBACKS_NAMESPACE::HttpConnRouteCheckCallback; + using MiddlewareCallback = HTTP_CALLBACKS_NAMESPACE::HttpConnMiddlewareCallback; + using UpgradeCallback = HTTP_CALLBACKS_NAMESPACE::HttpConnUpgradeCallback; - // Check if a WebSocket route exists for the given path. - // Returns true if upgrade should proceed, false to reject. - using RouteCheckCallback = std::function; + void SetRequestCallback(RequestCallback callback); void SetRouteCheckCallback(RouteCheckCallback callback); - - // Run middleware chain before WebSocket upgrade. - // Returns true if all middleware passed, false if any short-circuited (response is set). - using MiddlewareCallback = std::function; void SetMiddlewareCallback(MiddlewareCallback callback); - - // Handler called ONCE after WebSocket upgrade is complete and ws_conn_ exists. - // Wires application-level OnMessage/OnClose callbacks on the WebSocketConnection. - using UpgradeCallback = std::function self, - const HttpRequest& request - )>; void SetUpgradeCallback(UpgradeCallback callback); // Send an HTTP response @@ -83,12 +69,17 @@ class HttpConnectionHandler : public std::enable_shared_from_this conn_; HttpParser parser_; - RequestCallback request_callback_; - RouteCheckCallback route_check_callback_; - MiddlewareCallback middleware_callback_; - UpgradeCallback upgrade_callback_; + HTTP_CALLBACKS_NAMESPACE::HttpConnCallbacks callbacks_; bool upgraded_ = false; std::unique_ptr ws_conn_; }; diff --git a/include/http/http_server.h b/include/http/http_server.h index 0e4b9d5..57eecda 100644 --- a/include/http/http_server.h +++ b/include/http/http_server.h @@ -54,6 +54,9 @@ class HttpServer { void WireNetServerCallbacks(); // Compute the pre-read input buffer cap from configured limits. size_t ComputeInputCap() const; + // Safe WS transport-close notification: null-check, exception-safe, log errors. + // Must be called OUTSIDE conn_mtx_ to prevent deadlock. + void SafeNotifyWsClose(const std::shared_ptr& http_conn); std::shared_ptr tls_ctx_; // Shared with NetServer for safe lifetime diff --git a/include/ws/websocket_connection.h b/include/ws/websocket_connection.h index 1b7939e..ea1eb4e 100644 --- a/include/ws/websocket_connection.h +++ b/include/ws/websocket_connection.h @@ -1,5 +1,6 @@ #pragma once +#include "http/http_callbacks.h" #include "ws/websocket_parser.h" #include "ws/websocket_frame.h" #include "connection_handler.h" @@ -10,11 +11,11 @@ class WebSocketConnection { public: explicit WebSocketConnection(std::shared_ptr conn); - // Message-level callbacks - using MessageCallback = std::function; - using CloseCallback = std::function; - using PingCallback = std::function; - using ErrorCallback = std::function; + // Public type aliases for backward compatibility + using MessageCallback = HTTP_CALLBACKS_NAMESPACE::WsMessageCallback; + using CloseCallback = HTTP_CALLBACKS_NAMESPACE::WsCloseCallback; + using PingCallback = HTTP_CALLBACKS_NAMESPACE::WsPingCallback; + using ErrorCallback = HTTP_CALLBACKS_NAMESPACE::WsErrorCallback; void OnMessage(MessageCallback callback); void OnClose(CloseCallback callback); @@ -58,10 +59,7 @@ class WebSocketConnection { uint16_t sent_close_code_ = 0; // Close code we sent (for NotifyTransportClose) std::string sent_close_reason_; // Close reason we sent - MessageCallback message_callback_; - CloseCallback close_callback_; - PingCallback ping_callback_; - ErrorCallback error_callback_; + HTTP_CALLBACKS_NAMESPACE::WsCallbacks callbacks_; // Fragmentation reassembly std::string fragment_buffer_; diff --git a/server/http_connection_handler.cc b/server/http_connection_handler.cc index e669b8d..44accf0 100644 --- a/server/http_connection_handler.cc +++ b/server/http_connection_handler.cc @@ -6,19 +6,19 @@ HttpConnectionHandler::HttpConnectionHandler(std::shared_ptr : conn_(std::move(conn)) {} void HttpConnectionHandler::SetRequestCallback(RequestCallback callback) { - request_callback_ = std::move(callback); + callbacks_.request_callback = std::move(callback); } void HttpConnectionHandler::SetRouteCheckCallback(RouteCheckCallback callback) { - route_check_callback_ = std::move(callback); + callbacks_.route_check_callback = std::move(callback); } void HttpConnectionHandler::SetMiddlewareCallback(MiddlewareCallback callback) { - middleware_callback_ = std::move(callback); + callbacks_.middleware_callback = std::move(callback); } void HttpConnectionHandler::SetUpgradeCallback(UpgradeCallback callback) { - upgrade_callback_ = std::move(callback); + callbacks_.upgrade_callback = std::move(callback); } void HttpConnectionHandler::SetMaxBodySize(size_t max) { @@ -55,6 +55,449 @@ void HttpConnectionHandler::CloseConnection() { conn_->CloseAfterWrite(); } +// ---- Internal phase methods (split from OnRawData for readability) -------- + +void HttpConnectionHandler::HandleUpgradedData(const std::string& data) { + try { + ws_conn_->OnRawData(data); + } catch (const std::exception& e) { + // App handler threw — log server-side, send WS close 1011. + // Don't call CloseConnection afterward: SendClose arms a 5s deadline + // for the close handshake. CloseConnection would overwrite that deadline + // and tear down the transport before the peer can send their Close reply. + logging::Get()->error("Exception in WS handler: {}", e.what()); + if (ws_conn_->IsOpen()) { + ws_conn_->SendClose(1011, "Internal error"); + } + // If !IsOpen(), a close is already in progress (close_sent_ or !is_open_). + } +} + +void HttpConnectionHandler::HandleParseError() { + // Determine appropriate error response based on parser error type + HttpResponse err_resp; + switch (parser_.GetErrorType()) { + case HttpParser::ParseError::BODY_TOO_LARGE: + err_resp = HttpResponse::PayloadTooLarge(); + break; + case HttpParser::ParseError::HEADER_TOO_LARGE: + err_resp = HttpResponse::HeaderTooLarge(); + break; + default: + err_resp = HttpResponse::BadRequest(parser_.GetError()); + break; + } + err_resp.Header("Connection", "close"); + SendResponse(err_resp); + // Actually close the connection — the stream is in an unknown state + CloseConnection(); +} + +bool HttpConnectionHandler::HandleCompleteRequest(const char*& buf, size_t& remaining, size_t consumed) { + const HttpRequest& req = parser_.GetRequest(); + + // Reject unsupported HTTP versions — only HTTP/1.0 and HTTP/1.1 supported. + // llhttp will parse any major.minor (e.g. HTTP/2.0, HTTP/0.9), but this + // server only speaks HTTP/1.x, so dispatch would produce wrong responses. + if (req.http_major != 1 || (req.http_minor != 0 && req.http_minor != 1)) { + HttpResponse ver_resp = HttpResponse::HttpVersionNotSupported(); + ver_resp.Header("Connection", "close"); + SendResponse(ver_resp); + CloseConnection(); + return false; + } + + // Track the request's HTTP version so SendResponse echoes it correctly + // (e.g. HTTP/1.0 for 1.0 clients). Must be set after the version check. + current_http_minor_ = req.http_minor; + + // RFC 7230 §5.4: HTTP/1.1 requests MUST include Host header + if (req.http_minor >= 1 && !req.HasHeader("host")) { + HttpResponse bad_req = HttpResponse::BadRequest("Missing Host header"); + bad_req.Header("Connection", "close"); + SendResponse(bad_req); + CloseConnection(); + return false; + } + + // RFC 7231 §5.1.1: reject unsupported Expect values. + // For complete requests, 100-continue is a no-op (body already arrived). + // Any other value must be rejected with 417. + if (req.HasHeader("expect")) { + std::string expect = req.GetHeader("expect"); + std::transform(expect.begin(), expect.end(), expect.begin(), ::tolower); + while (!expect.empty() && (expect.front() == ' ' || expect.front() == '\t')) + expect.erase(expect.begin()); + while (!expect.empty() && (expect.back() == ' ' || expect.back() == '\t')) + expect.pop_back(); + if (expect != "100-continue") { + HttpResponse err; + err.Status(417, "Expectation Failed"); + err.Header("Connection", "close"); + SendResponse(err); + CloseConnection(); + return false; + } + } + + // Check for WebSocket upgrade. + // Guard on method == GET: llhttp sets upgrade=1 for CONNECT too, + // but RFC 6455 §4.1 requires GET. Without this, Route("CONNECT", ...) + // is unreachable — CONNECT enters the WS path and fails validation. + if (req.upgrade && req.method == "GET" && callbacks_.route_check_callback) { + try { + // Run middleware before upgrade (auth, CORS, rate limiting, etc.) + // Hoist mw_response so successful middleware headers can be merged + // into the 101 response (e.g., Set-Cookie, auth tokens). + HttpResponse mw_response; + if (callbacks_.middleware_callback) { + if (!callbacks_.middleware_callback(req, mw_response)) { + // Middleware rejected — default to 403 if nothing was set. + // Only apply when the response is completely untouched + // (default status + no body + no headers). This allows + // middleware to intentionally short-circuit with 200 OK + // plus custom headers (e.g., CORS preflight, Set-Cookie). + if (mw_response.GetStatusCode() == 200 && + mw_response.GetBody().empty() && + mw_response.GetHeaders().empty()) { + mw_response.Status(403).Text("Forbidden"); + } + mw_response.Header("Connection", "close"); + SendResponse(mw_response); + CloseConnection(); + return false; + } + } + + // Validate WebSocket handshake per RFC 6455 + std::string ws_error; + if (!WebSocketHandshake::Validate(req, ws_error)) { + int reject_code = 400; + // RFC 6455 §4.4: wrong version → 426 + Sec-WebSocket-Version + if (ws_error.find("version") != std::string::npos || + ws_error.find("Version") != std::string::npos) { + reject_code = 426; + } + HttpResponse reject = WebSocketHandshake::Reject(reject_code, ws_error); + if (reject_code == 426) { + reject.Header("Sec-WebSocket-Version", "13"); + } + reject.Header("Connection", "close"); + SendResponse(reject); + CloseConnection(); + return false; + } + + // Check route existence BEFORE sending 101 + if (!callbacks_.route_check_callback(req.path)) { + auto not_found = HttpResponse::NotFound(); + not_found.Header("Connection", "close"); + SendResponse(not_found); + CloseConnection(); + return false; + } + + // Request completed (as upgrade) — reset timeout tracking + request_in_progress_ = false; + conn_->ClearDeadline(); + conn_->SetDeadlineTimeoutCb(nullptr); + + // Route confirmed — send 101 Switching Protocols. + // Merge safe middleware headers (e.g., Set-Cookie, auth tokens). + // Skip headers that are mandatory parts of the 101 handshake response + // to avoid corruption. + HttpResponse upgrade_resp = WebSocketHandshake::Accept(req); + for (const auto& hdr : mw_response.GetHeaders()) { + std::string key = hdr.first; + std::transform(key.begin(), key.end(), key.begin(), ::tolower); + // Skip 101 mandatory headers, framing headers, and WS + // negotiation headers. This server doesn't implement WS + // extensions or subprotocol negotiation, so allowing + // middleware to inject Sec-WebSocket-Extensions (e.g., + // permessage-deflate) would cause clients to send RSV1 + // frames that the parser rejects as protocol errors. + if (key == "connection" || key == "upgrade" || + key == "sec-websocket-accept" || key == "content-length" || + key == "transfer-encoding" || + key == "sec-websocket-extensions" || + key == "sec-websocket-protocol") { + continue; + } + upgrade_resp.Header(hdr.first, hdr.second); + } + SendResponse(upgrade_resp); + + // If the send failed (client disconnected), don't proceed with upgrade. + // SendRaw may have triggered CallCloseCb via EPIPE/ECONNRESET. + if (conn_->IsClosing()) { + return false; + } + + // Mark as upgraded IMMEDIATELY after 101 is sent, before any + // code that could throw. This ensures the catch block correctly + // identifies post-101 exceptions and sends WS close 1011 + // instead of raw HTTP 500 on an already-upgraded connection. + upgraded_ = true; + + // Create WebSocket connection + ws_conn_ = std::make_unique(conn_); + if (max_ws_message_size_ > 0) { + ws_conn_->GetParser().SetMaxPayloadSize(max_ws_message_size_); + ws_conn_->SetMaxMessageSize(max_ws_message_size_); + } + // Switch input cap to the WS message size limit. The read loop + // stops at the cap (data stays in kernel buffer, nothing is + // discarded) and requeues, so no parser desync. This bounds + // per-cycle memory allocation against a fast peer while the + // WS parser enforces frame/message limits independently. + if (max_ws_message_size_ > 0) { + conn_->SetMaxInputSize(max_ws_message_size_); + } + + // Wire WS callbacks (called exactly once, ws_conn_ guaranteed to exist) + if (callbacks_.upgrade_callback) { + callbacks_.upgrade_callback(shared_from_this(), req); + } + + // Forward any trailing bytes after the HTTP headers as WebSocket data + buf += consumed; + remaining -= consumed; + if (remaining > 0 && ws_conn_) { + std::string trailing(buf, remaining); + ws_conn_->OnRawData(trailing); + } + return false; + + } catch (const std::exception& e) { + // Exception in middleware/upgrade handler — log server-side, + // send generic 500 to client (never leak e.what() over the wire). + logging::Get()->error("Exception in upgrade handler: {}", e.what()); + if (!upgraded_) { + // Pre-101: send HTTP 500, close via HTTP path + HttpResponse err = HttpResponse::InternalError(); + err.Header("Connection", "close"); + SendResponse(err); + CloseConnection(); + } else if (ws_conn_) { + // Post-101 with WS connection: send close 1011. + // SendClose now includes CloseAfterWrite for proper drain. + ws_conn_->SendClose(1011, "Internal error"); + } else { + // Post-101 but ws_conn_ is null — make_unique threw (OOM). + // Connection is in a bad state (101 sent, no WS handler). + // Force close the transport immediately. + conn_->ForceClose(); + } + return false; + } + } + + // Normal HTTP request -- dispatch to handler + if (callbacks_.request_callback) { + HttpResponse response; + try { + callbacks_.request_callback(shared_from_this(), req, response); + } catch (const std::exception& e) { + // Log the exception server-side; never send e.what() to the + // client — it can contain stack traces, file paths, DB strings. + logging::Get()->error("Exception in request handler: {}", e.what()); + response = HttpResponse::InternalError(); + response.Header("Connection", "close"); + SendResponse(response); + CloseConnection(); + return false; + } + + // Determine if response sets Connection: close (needed for + // keep-alive logic AND the close decision after sending). + // Scan ALL Connection headers and parse each as a comma-separated + // token list (RFC 7230 §6.1). Values like "keep-alive, close" or + // "upgrade, close" must be recognized, not just exact "close". + bool resp_close = false; + for (const auto& hdr : response.GetHeaders()) { + std::string key = hdr.first; + std::transform(key.begin(), key.end(), key.begin(), ::tolower); + if (key == "connection") { + std::string val = hdr.second; + std::transform(val.begin(), val.end(), val.begin(), ::tolower); + std::istringstream ss(val); + std::string token; + while (std::getline(ss, token, ',')) { + while (!token.empty() && (token.front() == ' ' || token.front() == '\t')) + token.erase(token.begin()); + while (!token.empty() && (token.back() == ' ' || token.back() == '\t')) + token.pop_back(); + if (token == "close") { + resp_close = true; + } + } + } + } + + // HTTP/1.0 persistence requires explicit Connection: keep-alive + // in the response. Without it, a compliant 1.0 client treats the + // response as close-delimited and closes its end, while the server + // keeps waiting — stranding the connection until idle timeout. + if (req.http_minor == 0 && req.keep_alive && !resp_close) { + response.Header("Connection", "keep-alive"); + } + + // If the server will close after this response (client sent + // Connection: close, or HTTP/1.0 without keep-alive), the response + // must include Connection: close so the wire semantics match. + // Without this, an HTTP/1.1 response implies persistence but the + // server tears the socket down immediately after sending. + if (!req.keep_alive && !resp_close) { + response.Header("Connection", "close"); + } + + // RFC 7231 §4.3.2: HEAD responses MUST NOT include a body, + // but MUST include the same headers as the GET response (including + // Content-Length reflecting the GET body size). + if (req.method == "HEAD") { + // Serialize the full response to get auto-computed Content-Length, + // then strip the body from the wire output. + response.Version(1, current_http_minor_); + std::string wire = response.Serialize(); + // Find the end of headers (blank line) + auto header_end = wire.find("\r\n\r\n"); + if (header_end != std::string::npos) { + wire = wire.substr(0, header_end + 4); // Include the blank line + } + conn_->SendRaw(wire.data(), wire.size()); + } else { + SendResponse(response); + } + + // If SendResponse triggered a connection close (e.g., EPIPE), + // stop processing pipelined requests. + if (conn_->IsClosing()) { + return false; + } + + if (!req.keep_alive || resp_close) { + CloseConnection(); + return false; + } + } + + // Request completed — reset timeout tracking for next request + request_in_progress_ = false; + conn_->ClearDeadline(); + conn_->SetDeadlineTimeoutCb(nullptr); + + // Advance past consumed bytes + buf += consumed; + remaining -= consumed; + + // Reset parser and per-request state for next request (keep-alive / pipelining) + parser_.Reset(); + sent_100_continue_ = false; + + // If there are remaining bytes (pipelined request), arm a new deadline + // AND re-install the 408 callback so timer-driven timeout sends proper response + if (remaining > 0 && request_timeout_sec_ > 0) { + request_in_progress_ = true; + request_start_ = std::chrono::steady_clock::now(); + conn_->SetDeadline(request_start_ + std::chrono::seconds(request_timeout_sec_)); + std::weak_ptr weak_self = shared_from_this(); + conn_->SetDeadlineTimeoutCb([weak_self]() { + if (auto self = weak_self.lock()) { + HttpResponse timeout_resp = HttpResponse::RequestTimeout(); + timeout_resp.Header("Connection", "close"); + self->SendResponse(timeout_resp); + } + }); + } + + return true; // Continue pipelining loop +} + +void HttpConnectionHandler::HandleIncompleteRequest() { + // Incomplete request -- need more data. + // If the peer already closed (close_after_write_ set), no more bytes + // will arrive — the request can never complete. Close immediately + // instead of leaking the connection slot until timeout. + if (conn_->IsCloseDeferred()) { + conn_->ForceClose(); + return; + } + // Perform early validation once headers are complete to avoid + // holding connection slots for requests that can never succeed. + if (!sent_100_continue_ && parser_.GetRequest().headers_complete) { + const auto& partial = parser_.GetRequest(); + + // Early reject: unsupported HTTP version + if (partial.http_major != 1 || + (partial.http_minor != 0 && partial.http_minor != 1)) { + HttpResponse ver_resp = HttpResponse::HttpVersionNotSupported(); + ver_resp.Header("Connection", "close"); + SendResponse(ver_resp); + CloseConnection(); + return; + } + + // Early reject: HTTP/1.1 missing Host + if (partial.http_minor >= 1 && !partial.HasHeader("host")) { + HttpResponse bad_req = HttpResponse::BadRequest("Missing Host header"); + bad_req.Header("Connection", "close"); + SendResponse(bad_req); + CloseConnection(); + return; + } + + // Early reject: Content-Length exceeds body size limit. + // Without this, a client can send headers with a huge Content-Length + // and no body, occupying a connection slot until request timeout. + if (max_body_size_ > 0 && + partial.content_length > max_body_size_) { + HttpResponse err = HttpResponse::PayloadTooLarge(); + err.Header("Connection", "close"); + SendResponse(err); + CloseConnection(); + return; + } + + // RFC 7231 §5.1.1: handle Expect header + if (partial.HasHeader("expect")) { + std::string expect = partial.GetHeader("expect"); + std::transform(expect.begin(), expect.end(), expect.begin(), ::tolower); + // Trim OWS (SP/HTAB per RFC 7230 §3.2.3) + while (!expect.empty() && (expect.front() == ' ' || expect.front() == '\t')) + expect.erase(expect.begin()); + while (!expect.empty() && (expect.back() == ' ' || expect.back() == '\t')) + expect.pop_back(); + if (expect == "100-continue") { + // Don't send 100 Continue for WebSocket upgrade requests — + // WebSocketHandshake::Validate() rejects body-bearing + // upgrades, so acknowledging the body is contradictory. + if (partial.upgrade) { + HttpResponse bad_req = HttpResponse::BadRequest( + "WebSocket upgrade must not have a request body"); + bad_req.Header("Connection", "close"); + SendResponse(bad_req); + CloseConnection(); + return; + } + HttpResponse cont; + cont.Status(100, "Continue"); + SendResponse(cont); + sent_100_continue_ = true; + } else { + // Unrecognized Expect value — RFC 7231 §5.1.1: 417 + HttpResponse err; + err.Status(417, "Expectation Failed"); + err.Header("Connection", "close"); + SendResponse(err); + CloseConnection(); + return; + } + } + } +} + +// ---- Main entry point ----------------------------------------------------- + void HttpConnectionHandler::OnRawData(std::shared_ptr conn, std::string& data) { // For HTTP connections draining a response (close_after_write set), // don't process new data. The parser wasn't Reset after the last @@ -69,19 +512,7 @@ void HttpConnectionHandler::OnRawData(std::shared_ptr conn, s // If upgraded to WebSocket, forward raw bytes to WebSocketConnection if (upgraded_ && ws_conn_) { - try { - ws_conn_->OnRawData(data); - } catch (const std::exception& e) { - // App handler threw — log server-side, send WS close 1011. - // Don't call CloseConnection afterward: SendClose arms a 5s deadline - // for the close handshake. CloseConnection would overwrite that deadline - // and tear down the transport before the peer can send their Close reply. - logging::Get()->error("Exception in WS handler: {}", e.what()); - if (ws_conn_->IsOpen()) { - ws_conn_->SendClose(1011, "Internal error"); - } - // If !IsOpen(), a close is already in progress (close_sent_ or !is_open_). - } + HandleUpgradedData(data); return; } @@ -137,23 +568,7 @@ void HttpConnectionHandler::OnRawData(std::shared_ptr conn, s } if (parser_.HasError()) { - // Determine appropriate error response based on parser error type - HttpResponse err_resp; - switch (parser_.GetErrorType()) { - case HttpParser::ParseError::BODY_TOO_LARGE: - err_resp = HttpResponse::PayloadTooLarge(); - break; - case HttpParser::ParseError::HEADER_TOO_LARGE: - err_resp = HttpResponse::HeaderTooLarge(); - break; - default: - err_resp = HttpResponse::BadRequest(parser_.GetError()); - break; - } - err_resp.Header("Connection", "close"); - SendResponse(err_resp); - // Actually close the connection — the stream is in an unknown state - CloseConnection(); + HandleParseError(); return; } @@ -161,402 +576,12 @@ void HttpConnectionHandler::OnRawData(std::shared_ptr conn, s if (consumed == 0) break; if (parser_.GetRequest().complete) { - const HttpRequest& req = parser_.GetRequest(); - - // Reject unsupported HTTP versions — only HTTP/1.0 and HTTP/1.1 supported. - // llhttp will parse any major.minor (e.g. HTTP/2.0, HTTP/0.9), but this - // server only speaks HTTP/1.x, so dispatch would produce wrong responses. - if (req.http_major != 1 || (req.http_minor != 0 && req.http_minor != 1)) { - HttpResponse ver_resp = HttpResponse::HttpVersionNotSupported(); - ver_resp.Header("Connection", "close"); - SendResponse(ver_resp); - CloseConnection(); - return; - } - - // Track the request's HTTP version so SendResponse echoes it correctly - // (e.g. HTTP/1.0 for 1.0 clients). Must be set after the version check. - current_http_minor_ = req.http_minor; - - // RFC 7230 §5.4: HTTP/1.1 requests MUST include Host header - if (req.http_minor >= 1 && !req.HasHeader("host")) { - HttpResponse bad_req = HttpResponse::BadRequest("Missing Host header"); - bad_req.Header("Connection", "close"); - SendResponse(bad_req); - CloseConnection(); - return; - } - - // RFC 7231 §5.1.1: reject unsupported Expect values. - // For complete requests, 100-continue is a no-op (body already arrived). - // Any other value must be rejected with 417. - if (req.HasHeader("expect")) { - std::string expect = req.GetHeader("expect"); - std::transform(expect.begin(), expect.end(), expect.begin(), ::tolower); - while (!expect.empty() && (expect.front() == ' ' || expect.front() == '\t')) - expect.erase(expect.begin()); - while (!expect.empty() && (expect.back() == ' ' || expect.back() == '\t')) - expect.pop_back(); - if (expect != "100-continue") { - HttpResponse err; - err.Status(417, "Expectation Failed"); - err.Header("Connection", "close"); - SendResponse(err); - CloseConnection(); - return; - } - } - - // Check for WebSocket upgrade. - // Guard on method == GET: llhttp sets upgrade=1 for CONNECT too, - // but RFC 6455 §4.1 requires GET. Without this, Route("CONNECT", ...) - // is unreachable — CONNECT enters the WS path and fails validation. - if (req.upgrade && req.method == "GET" && route_check_callback_) { - try { - // Run middleware before upgrade (auth, CORS, rate limiting, etc.) - // Hoist mw_response so successful middleware headers can be merged - // into the 101 response (e.g., Set-Cookie, auth tokens). - HttpResponse mw_response; - if (middleware_callback_) { - if (!middleware_callback_(req, mw_response)) { - // Middleware rejected — default to 403 if nothing was set. - // Only apply when the response is completely untouched - // (default status + no body + no headers). This allows - // middleware to intentionally short-circuit with 200 OK - // plus custom headers (e.g., CORS preflight, Set-Cookie). - if (mw_response.GetStatusCode() == 200 && - mw_response.GetBody().empty() && - mw_response.GetHeaders().empty()) { - mw_response.Status(403).Text("Forbidden"); - } - mw_response.Header("Connection", "close"); - SendResponse(mw_response); - CloseConnection(); - return; - } - } - - // Validate WebSocket handshake per RFC 6455 - std::string ws_error; - if (!WebSocketHandshake::Validate(req, ws_error)) { - int reject_code = 400; - // RFC 6455 §4.4: wrong version → 426 + Sec-WebSocket-Version - if (ws_error.find("version") != std::string::npos || - ws_error.find("Version") != std::string::npos) { - reject_code = 426; - } - HttpResponse reject = WebSocketHandshake::Reject(reject_code, ws_error); - if (reject_code == 426) { - reject.Header("Sec-WebSocket-Version", "13"); - } - reject.Header("Connection", "close"); - SendResponse(reject); - CloseConnection(); - return; - } - - // Check route existence BEFORE sending 101 - if (!route_check_callback_(req.path)) { - auto not_found = HttpResponse::NotFound(); - not_found.Header("Connection", "close"); - SendResponse(not_found); - CloseConnection(); - return; - } - - // Request completed (as upgrade) — reset timeout tracking - request_in_progress_ = false; - conn_->ClearDeadline(); - conn_->SetDeadlineTimeoutCb(nullptr); - - // Route confirmed — send 101 Switching Protocols. - // Merge safe middleware headers (e.g., Set-Cookie, auth tokens). - // Skip headers that are mandatory parts of the 101 handshake response - // to avoid corruption. - HttpResponse upgrade_resp = WebSocketHandshake::Accept(req); - for (const auto& hdr : mw_response.GetHeaders()) { - std::string key = hdr.first; - std::transform(key.begin(), key.end(), key.begin(), ::tolower); - // Skip 101 mandatory headers, framing headers, and WS - // negotiation headers. This server doesn't implement WS - // extensions or subprotocol negotiation, so allowing - // middleware to inject Sec-WebSocket-Extensions (e.g., - // permessage-deflate) would cause clients to send RSV1 - // frames that the parser rejects as protocol errors. - if (key == "connection" || key == "upgrade" || - key == "sec-websocket-accept" || key == "content-length" || - key == "transfer-encoding" || - key == "sec-websocket-extensions" || - key == "sec-websocket-protocol") { - continue; - } - upgrade_resp.Header(hdr.first, hdr.second); - } - SendResponse(upgrade_resp); - - // If the send failed (client disconnected), don't proceed with upgrade. - // SendRaw may have triggered CallCloseCb via EPIPE/ECONNRESET. - if (conn_->IsClosing()) { - return; - } - - // Mark as upgraded IMMEDIATELY after 101 is sent, before any - // code that could throw. This ensures the catch block correctly - // identifies post-101 exceptions and sends WS close 1011 - // instead of raw HTTP 500 on an already-upgraded connection. - upgraded_ = true; - - // Create WebSocket connection - ws_conn_ = std::make_unique(conn_); - if (max_ws_message_size_ > 0) { - ws_conn_->GetParser().SetMaxPayloadSize(max_ws_message_size_); - ws_conn_->SetMaxMessageSize(max_ws_message_size_); - } - // Switch input cap to the WS message size limit. The read loop - // stops at the cap (data stays in kernel buffer, nothing is - // discarded) and requeues, so no parser desync. This bounds - // per-cycle memory allocation against a fast peer while the - // WS parser enforces frame/message limits independently. - if (max_ws_message_size_ > 0) { - conn_->SetMaxInputSize(max_ws_message_size_); - } - - // Wire WS callbacks (called exactly once, ws_conn_ guaranteed to exist) - if (upgrade_callback_) { - upgrade_callback_(shared_from_this(), req); - } - - // Forward any trailing bytes after the HTTP headers as WebSocket data - buf += consumed; - remaining -= consumed; - if (remaining > 0 && ws_conn_) { - std::string trailing(buf, remaining); - ws_conn_->OnRawData(trailing); - } + if (!HandleCompleteRequest(buf, remaining, consumed)) { return; - - } catch (const std::exception& e) { - // Exception in middleware/upgrade handler — log server-side, - // send generic 500 to client (never leak e.what() over the wire). - logging::Get()->error("Exception in upgrade handler: {}", e.what()); - if (!upgraded_) { - // Pre-101: send HTTP 500, close via HTTP path - HttpResponse err = HttpResponse::InternalError(); - err.Header("Connection", "close"); - SendResponse(err); - CloseConnection(); - } else if (ws_conn_) { - // Post-101 with WS connection: send close 1011. - // SendClose now includes CloseAfterWrite for proper drain. - ws_conn_->SendClose(1011, "Internal error"); - } else { - // Post-101 but ws_conn_ is null — make_unique threw (OOM). - // Connection is in a bad state (101 sent, no WS handler). - // Force close the transport immediately. - conn_->ForceClose(); - } - return; - } - } - - // Normal HTTP request -- dispatch to handler - if (request_callback_) { - HttpResponse response; - try { - request_callback_(shared_from_this(), req, response); - } catch (const std::exception& e) { - // Log the exception server-side; never send e.what() to the - // client — it can contain stack traces, file paths, DB strings. - logging::Get()->error("Exception in request handler: {}", e.what()); - response = HttpResponse::InternalError(); - response.Header("Connection", "close"); - SendResponse(response); - CloseConnection(); - return; - } - - // Determine if response sets Connection: close (needed for - // keep-alive logic AND the close decision after sending). - // Scan ALL Connection headers and parse each as a comma-separated - // token list (RFC 7230 §6.1). Values like "keep-alive, close" or - // "upgrade, close" must be recognized, not just exact "close". - bool resp_close = false; - for (const auto& hdr : response.GetHeaders()) { - std::string key = hdr.first; - std::transform(key.begin(), key.end(), key.begin(), ::tolower); - if (key == "connection") { - std::string val = hdr.second; - std::transform(val.begin(), val.end(), val.begin(), ::tolower); - std::istringstream ss(val); - std::string token; - while (std::getline(ss, token, ',')) { - while (!token.empty() && (token.front() == ' ' || token.front() == '\t')) - token.erase(token.begin()); - while (!token.empty() && (token.back() == ' ' || token.back() == '\t')) - token.pop_back(); - if (token == "close") { - resp_close = true; - } - } - } - } - - // HTTP/1.0 persistence requires explicit Connection: keep-alive - // in the response. Without it, a compliant 1.0 client treats the - // response as close-delimited and closes its end, while the server - // keeps waiting — stranding the connection until idle timeout. - if (req.http_minor == 0 && req.keep_alive && !resp_close) { - response.Header("Connection", "keep-alive"); - } - - // If the server will close after this response (client sent - // Connection: close, or HTTP/1.0 without keep-alive), the response - // must include Connection: close so the wire semantics match. - // Without this, an HTTP/1.1 response implies persistence but the - // server tears the socket down immediately after sending. - if (!req.keep_alive && !resp_close) { - response.Header("Connection", "close"); - } - - // RFC 7231 §4.3.2: HEAD responses MUST NOT include a body, - // but MUST include the same headers as the GET response (including - // Content-Length reflecting the GET body size). - if (req.method == "HEAD") { - // Serialize the full response to get auto-computed Content-Length, - // then strip the body from the wire output. - response.Version(1, current_http_minor_); - std::string wire = response.Serialize(); - // Find the end of headers (blank line) - auto header_end = wire.find("\r\n\r\n"); - if (header_end != std::string::npos) { - wire = wire.substr(0, header_end + 4); // Include the blank line - } - conn_->SendRaw(wire.data(), wire.size()); - } else { - SendResponse(response); - } - - // If SendResponse triggered a connection close (e.g., EPIPE), - // stop processing pipelined requests. - if (conn_->IsClosing()) { - return; - } - - if (!req.keep_alive || resp_close) { - CloseConnection(); - return; - } - } - - // Request completed — reset timeout tracking for next request - request_in_progress_ = false; - conn_->ClearDeadline(); - conn_->SetDeadlineTimeoutCb(nullptr); - - // Advance past consumed bytes - buf += consumed; - remaining -= consumed; - - // Reset parser and per-request state for next request (keep-alive / pipelining) - parser_.Reset(); - sent_100_continue_ = false; - - // If there are remaining bytes (pipelined request), arm a new deadline - // AND re-install the 408 callback so timer-driven timeout sends proper response - if (remaining > 0 && request_timeout_sec_ > 0) { - request_in_progress_ = true; - request_start_ = std::chrono::steady_clock::now(); - conn_->SetDeadline(request_start_ + std::chrono::seconds(request_timeout_sec_)); - std::weak_ptr weak_self = shared_from_this(); - conn_->SetDeadlineTimeoutCb([weak_self]() { - if (auto self = weak_self.lock()) { - HttpResponse timeout_resp = HttpResponse::RequestTimeout(); - timeout_resp.Header("Connection", "close"); - self->SendResponse(timeout_resp); - } - }); } + // Continue pipelining loop } else { - // Incomplete request -- need more data. - // If the peer already closed (close_after_write_ set), no more bytes - // will arrive — the request can never complete. Close immediately - // instead of leaking the connection slot until timeout. - if (conn_->IsCloseDeferred()) { - conn_->ForceClose(); - return; - } - // Perform early validation once headers are complete to avoid - // holding connection slots for requests that can never succeed. - if (!sent_100_continue_ && parser_.GetRequest().headers_complete) { - const auto& partial = parser_.GetRequest(); - - // Early reject: unsupported HTTP version - if (partial.http_major != 1 || - (partial.http_minor != 0 && partial.http_minor != 1)) { - HttpResponse ver_resp = HttpResponse::HttpVersionNotSupported(); - ver_resp.Header("Connection", "close"); - SendResponse(ver_resp); - CloseConnection(); - return; - } - - // Early reject: HTTP/1.1 missing Host - if (partial.http_minor >= 1 && !partial.HasHeader("host")) { - HttpResponse bad_req = HttpResponse::BadRequest("Missing Host header"); - bad_req.Header("Connection", "close"); - SendResponse(bad_req); - CloseConnection(); - return; - } - - // Early reject: Content-Length exceeds body size limit. - // Without this, a client can send headers with a huge Content-Length - // and no body, occupying a connection slot until request timeout. - if (max_body_size_ > 0 && - partial.content_length > max_body_size_) { - HttpResponse err = HttpResponse::PayloadTooLarge(); - err.Header("Connection", "close"); - SendResponse(err); - CloseConnection(); - return; - } - - // RFC 7231 §5.1.1: handle Expect header - if (partial.HasHeader("expect")) { - std::string expect = partial.GetHeader("expect"); - std::transform(expect.begin(), expect.end(), expect.begin(), ::tolower); - // Trim OWS (SP/HTAB per RFC 7230 §3.2.3) - while (!expect.empty() && (expect.front() == ' ' || expect.front() == '\t')) - expect.erase(expect.begin()); - while (!expect.empty() && (expect.back() == ' ' || expect.back() == '\t')) - expect.pop_back(); - if (expect == "100-continue") { - // Don't send 100 Continue for WebSocket upgrade requests — - // WebSocketHandshake::Validate() rejects body-bearing - // upgrades, so acknowledging the body is contradictory. - if (partial.upgrade) { - HttpResponse bad_req = HttpResponse::BadRequest( - "WebSocket upgrade must not have a request body"); - bad_req.Header("Connection", "close"); - SendResponse(bad_req); - CloseConnection(); - return; - } - HttpResponse cont; - cont.Status(100, "Continue"); - SendResponse(cont); - sent_100_continue_ = true; - } else { - // Unrecognized Expect value — RFC 7231 §5.1.1: 417 - HttpResponse err; - err.Status(417, "Expectation Failed"); - err.Header("Connection", "close"); - SendResponse(err); - CloseConnection(); - return; - } - } - } + HandleIncompleteRequest(); break; } } diff --git a/server/http_server.cc b/server/http_server.cc index 1bac4fe..ac04086 100644 --- a/server/http_server.cc +++ b/server/http_server.cc @@ -256,6 +256,17 @@ void HttpServer::SetupHandlers(std::shared_ptr http_conn) ); } +void HttpServer::SafeNotifyWsClose(const std::shared_ptr& http_conn) { + if (!http_conn) return; + auto* ws = http_conn->GetWebSocket(); + if (ws) { + try { ws->NotifyTransportClose(); } + catch (const std::exception& e) { + logging::Get()->error("Exception in WS close handler: {}", e.what()); + } + } +} + void HttpServer::HandleNewConnection(std::shared_ptr conn) { // Guard: if the connection already closed (fast disconnect between // RegisterCallbacks enabling epoll and new_conn_callback running here), @@ -288,17 +299,7 @@ void HttpServer::HandleNewConnection(std::shared_ptr conn) { } } // Notify old WS handler outside the lock to prevent deadlock. - // Catch exceptions from user close handlers to prevent breaking - // core cleanup (ReleaseFd, timer removal, connection map erasure). - if (old_handler) { - auto* ws = old_handler->GetWebSocket(); - if (ws) { - try { ws->NotifyTransportClose(); } - catch (const std::exception& e) { - logging::Get()->error("Exception in WS close handler: {}", e.what()); - } - } - } + SafeNotifyWsClose(old_handler); // Arm a connection-level deadline covering the TLS handshake + first HTTP request. // Without this, a client can slow-drip the TLS handshake indefinitely, bypassing @@ -331,16 +332,7 @@ void HttpServer::HandleCloseConnection(std::shared_ptr conn) } } // Notify WS close handler OUTSIDE the lock to prevent deadlock. - // Catch exceptions to prevent breaking core cleanup (ReleaseFd, etc.). - if (http_conn) { - auto* ws = http_conn->GetWebSocket(); - if (ws) { - try { ws->NotifyTransportClose(); } - catch (const std::exception& e) { - logging::Get()->error("Exception in WS close handler: {}", e.what()); - } - } - } + SafeNotifyWsClose(http_conn); } void HttpServer::HandleErrorConnection(std::shared_ptr conn) { @@ -354,15 +346,7 @@ void HttpServer::HandleErrorConnection(std::shared_ptr conn) http_connections_.erase(it); } } - if (http_conn) { - auto* ws = http_conn->GetWebSocket(); - if (ws) { - try { ws->NotifyTransportClose(); } - catch (const std::exception& e) { - logging::Get()->error("Exception in WS close handler: {}", e.what()); - } - } - } + SafeNotifyWsClose(http_conn); } void HttpServer::HandleMessage(std::shared_ptr conn, std::string& message) { @@ -386,13 +370,7 @@ void HttpServer::HandleMessage(std::shared_ptr conn, std::str // notify the old WS handler and replace with a fresh one. if (http_conn->GetConnection() != conn) { // Notify old WebSocket close handler before discarding - auto* old_ws = http_conn->GetWebSocket(); - if (old_ws) { - try { old_ws->NotifyTransportClose(); } - catch (const std::exception& e) { - logging::Get()->error("Exception in WS close handler: {}", e.what()); - } - } + SafeNotifyWsClose(http_conn); http_conn = std::make_shared(conn); SetupHandlers(http_conn); std::lock_guard lck(conn_mtx_); diff --git a/server/websocket_connection.cc b/server/websocket_connection.cc index db5d910..e38f3c7 100644 --- a/server/websocket_connection.cc +++ b/server/websocket_connection.cc @@ -4,10 +4,10 @@ WebSocketConnection::WebSocketConnection(std::shared_ptr conn) : conn_(std::move(conn)) {} -void WebSocketConnection::OnMessage(MessageCallback callback) { message_callback_ = std::move(callback); } -void WebSocketConnection::OnClose(CloseCallback callback) { close_callback_ = std::move(callback); } -void WebSocketConnection::OnPing(PingCallback callback) { ping_callback_ = std::move(callback); } -void WebSocketConnection::OnError(ErrorCallback callback) { error_callback_ = std::move(callback); } +void WebSocketConnection::OnMessage(MessageCallback callback) { callbacks_.message_callback = std::move(callback); } +void WebSocketConnection::OnClose(CloseCallback callback) { callbacks_.close_callback = std::move(callback); } +void WebSocketConnection::OnPing(PingCallback callback) { callbacks_.ping_callback = std::move(callback); } +void WebSocketConnection::OnError(ErrorCallback callback) { callbacks_.error_callback = std::move(callback); } void WebSocketConnection::SendText(const std::string& message) { std::lock_guard lck(send_mtx_); @@ -16,7 +16,7 @@ void WebSocketConnection::SendText(const std::string& message) { // Validate outbound text to prevent emitting protocol-invalid frames // that would cause compliant clients to close with 1007. if (!IsValidUtf8(message)) { - if (error_callback_) error_callback_(*this, "Outbound text message is not valid UTF-8"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Outbound text message is not valid UTF-8"); return; } SendFrame(WebSocketFrame::TextFrame(message)); @@ -72,8 +72,8 @@ void WebSocketConnection::NotifyTransportClose() { // Always report 1006 (Abnormal Closure) for transport-level disconnects. // Even if we sent a Close frame, the peer never completed the handshake // (no Close reply received), so RFC 6455 classifies this as abnormal. - if (close_callback_) { - close_callback_(*this, 1006, "Transport closed"); + if (callbacks_.close_callback) { + callbacks_.close_callback(*this, 1006, "Transport closed"); } } @@ -90,8 +90,8 @@ void WebSocketConnection::OnRawData(const std::string& data) { if (parser_.HasError() && is_open_) { if (!close_sent_) { - if (error_callback_) { - error_callback_(*this, parser_.GetError()); + if (callbacks_.error_callback) { + callbacks_.error_callback(*this, parser_.GetError()); } // Use the correct close code based on the error type std::string err_msg = parser_.GetError(); @@ -129,8 +129,8 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { // IMPORTANT: Receiving a new Text/Binary frame while in_fragment_ is true // is a protocol error per RFC 6455 -- send Close(1002) and return if (in_fragment_) { - if (error_callback_) { - error_callback_(*this, "New data frame received during fragmented message"); + if (callbacks_.error_callback) { + callbacks_.error_callback(*this, "New data frame received during fragmented message"); } SendClose(1002, "Protocol error: interleaved data frames"); return; @@ -140,18 +140,18 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { // Complete single-frame message // RFC 6455 §5.6: text frames must contain valid UTF-8 if (frame.opcode == WebSocketOpcode::Text && !IsValidUtf8(frame.payload)) { - if (error_callback_) error_callback_(*this, "Invalid UTF-8 in text message"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Invalid UTF-8 in text message"); SendClose(1007, "Invalid UTF-8"); return; } - if (message_callback_) { - message_callback_(*this, frame.payload, + if (callbacks_.message_callback) { + callbacks_.message_callback(*this, frame.payload, frame.opcode == WebSocketOpcode::Binary); } } else { // First fragment if (max_message_size_ > 0 && frame.payload.size() > max_message_size_) { - if (error_callback_) error_callback_(*this, "Message exceeds maximum size"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Message exceeds maximum size"); SendClose(1009, "Message too big"); in_fragment_ = false; fragment_buffer_.clear(); @@ -166,13 +166,13 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { case WebSocketOpcode::Continuation: { if (!in_fragment_) { - if (error_callback_) error_callback_(*this, "Unexpected continuation frame"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Unexpected continuation frame"); SendClose(1002, "Protocol error: unexpected continuation"); return; } if (max_message_size_ > 0 && fragment_buffer_.size() + frame.payload.size() > max_message_size_) { - if (error_callback_) error_callback_(*this, "Message exceeds maximum size"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Message exceeds maximum size"); SendClose(1009, "Message too big"); in_fragment_ = false; fragment_buffer_.clear(); @@ -182,14 +182,14 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { if (frame.fin) { // RFC 6455 §5.6: validate reassembled text messages if (fragment_opcode_ == WebSocketOpcode::Text && !IsValidUtf8(fragment_buffer_)) { - if (error_callback_) error_callback_(*this, "Invalid UTF-8 in text message"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Invalid UTF-8 in text message"); SendClose(1007, "Invalid UTF-8"); in_fragment_ = false; fragment_buffer_.clear(); return; } - if (message_callback_) { - message_callback_(*this, fragment_buffer_, + if (callbacks_.message_callback) { + callbacks_.message_callback(*this, fragment_buffer_, fragment_opcode_ == WebSocketOpcode::Binary); } in_fragment_ = false; @@ -201,7 +201,7 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { case WebSocketOpcode::Close: { // RFC 6455 §7.1.5: close body must be 0 bytes or >= 2 bytes if (frame.payload.size() == 1) { - if (error_callback_) error_callback_(*this, "Invalid close frame: 1-byte payload"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Invalid close frame: 1-byte payload"); // Set is_open_ = false BEFORE SendClose so that if the send // fails synchronously (→ CallCloseCb → NotifyTransportClose), // the transport-close path sees is_open_ == false and skips, @@ -212,7 +212,7 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { // complete once we send ours. Close the transport immediately // instead of waiting 5s for a reply that won't come. if (conn_) conn_->CloseAfterWrite(); - if (close_callback_) close_callback_(*this, 1002, "Protocol error"); + if (callbacks_.close_callback) callbacks_.close_callback(*this, 1002, "Protocol error"); return; } // If payload is empty, echo an empty close frame (no code/reason) @@ -227,7 +227,7 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { close_sent_ = true; } if (conn_) conn_->CloseAfterWrite(); - if (close_callback_) close_callback_(*this, 1005, ""); // 1005 = no status received + if (callbacks_.close_callback) callbacks_.close_callback(*this, 1005, ""); // 1005 = no status received break; } @@ -242,13 +242,13 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { } // RFC 6455 §7.1.6: close reason must be valid UTF-8 if (!reason.empty() && !IsValidUtf8(reason)) { - if (error_callback_) error_callback_(*this, "Close reason is not valid UTF-8"); + if (callbacks_.error_callback) callbacks_.error_callback(*this, "Close reason is not valid UTF-8"); is_open_ = false; SendClose(1007, "Invalid UTF-8 in close reason"); // The peer already sent their Close frame — handshake complete. // Close transport immediately instead of waiting 5s. if (conn_) conn_->CloseAfterWrite(); - if (close_callback_) close_callback_(*this, 1007, "Invalid UTF-8 in close reason"); + if (callbacks_.close_callback) callbacks_.close_callback(*this, 1007, "Invalid UTF-8 in close reason"); return; } if (!WebSocketFrame::IsValidCloseCode(code) && frame.payload.size() >= 2) { @@ -275,8 +275,8 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { if (conn_) { conn_->CloseAfterWrite(); } - if (close_callback_) { - close_callback_(*this, code, reason); + if (callbacks_.close_callback) { + callbacks_.close_callback(*this, code, reason); } break; } @@ -288,8 +288,8 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { // no-ops after close_sent_, but the RFC requires auto-pong until // the Close frame is received. SendFrame(WebSocketFrame::PongFrame(frame.payload)); - if (ping_callback_) { - ping_callback_(*this, frame.payload); + if (callbacks_.ping_callback) { + callbacks_.ping_callback(*this, frame.payload); } break; } @@ -300,8 +300,8 @@ void WebSocketConnection::ProcessFrame(const WebSocketFrame& frame) { } default: { - if (error_callback_) { - error_callback_(*this, "Unknown opcode"); + if (callbacks_.error_callback) { + callbacks_.error_callback(*this, "Unknown opcode"); } break; }