diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 3be0cbe10f20..f125d38e9900 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -364,6 +364,7 @@ "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", + "core.middleware.structlog_context.StructlogContextMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "simple_history.middleware.HistoryRequestMiddleware", @@ -1449,6 +1450,9 @@ PROMETHEUS_ENABLED = env.bool("PROMETHEUS_ENABLED", False) +if PROMETHEUS_ENABLED: + MIDDLEWARE.append("core.middleware.worker_rss.WorkerRSSMiddleware") + DOCGEN_MODE = env.bool("DOCGEN_MODE", default=False) REQUIRE_AUTHENTICATION_FOR_API_DOCS = env.bool( diff --git a/api/core/middleware/structlog_context.py b/api/core/middleware/structlog_context.py new file mode 100644 index 000000000000..5960478ef1e6 --- /dev/null +++ b/api/core/middleware/structlog_context.py @@ -0,0 +1,66 @@ +from typing import Any + +from django.http import HttpRequest, HttpResponse +from structlog.contextvars import bind_contextvars, clear_contextvars + +from api_keys.user import APIKeyUser +from environments.models import Environment +from users.models import FFAdminUser + + +class StructlogContextMiddleware: + """ + Scopes structlog.contextvars to a single HTTP request and binds + request-derived identifiers so every log event emitted during the + request automatically carries them. + + Bindings (best-effort, only what the middleware can derive): + - user.id — from request.user.uuid (FFAdminUser only) + - organisation.id — from request.user (FFAdminUser or APIKeyUser) + - project.id — from project_pk URL kwarg + - environment.id — from environment_api_key URL kwarg (cached lookup) + + Cleanup runs in `finally` to prevent Gunicorn's long-lived gthread + workers from leaking one request's bindings into the next. + """ + + def __init__(self, get_response): # type: ignore[no-untyped-def] + self.get_response = get_response + + def __call__(self, request: HttpRequest) -> HttpResponse: + try: + self._bind_user_and_organisation(request) + return self.get_response(request) # type: ignore[no-any-return] + finally: + clear_contextvars() + + def process_view( + self, + request: HttpRequest, + view_func: Any, + view_args: tuple[Any, ...], + view_kwargs: dict[str, Any], + ) -> None: + project_pk = view_kwargs.get("project_pk") + if project_pk is not None: + bind_contextvars(project__id=project_pk) + + environment_api_key = view_kwargs.get("environment_api_key") + if environment_api_key is not None: + environment = Environment.get_from_cache(environment_api_key) + if environment is not None: + bind_contextvars(environment__id=environment.id) + + @staticmethod + def _bind_user_and_organisation(request: HttpRequest) -> None: + user = getattr(request, "user", None) + if user is None or not getattr(user, "is_authenticated", False): + return + + if isinstance(user, FFAdminUser): + bind_contextvars(user__id=str(user.uuid)) + first_org = user.organisations.first() + if first_org is not None: + bind_contextvars(organisation__id=first_org.id) + elif isinstance(user, APIKeyUser): + bind_contextvars(organisation__id=user.key.organisation_id) diff --git a/api/core/middleware/worker_rss.py b/api/core/middleware/worker_rss.py new file mode 100644 index 000000000000..cd883f5df9ec --- /dev/null +++ b/api/core/middleware/worker_rss.py @@ -0,0 +1,16 @@ +from django.http import HttpRequest, HttpResponse + +from metrics.worker_metrics import update_worker_metrics + + +class WorkerRSSMiddleware: + def __init__(self, get_response): # type: ignore[no-untyped-def] + self.get_response = get_response + + def __call__(self, request: HttpRequest) -> HttpResponse: + response = self.get_response(request) + try: + update_worker_metrics() + except Exception: + pass + return response diff --git a/api/metrics/worker_metrics.py b/api/metrics/worker_metrics.py new file mode 100644 index 000000000000..2fd1e777e66e --- /dev/null +++ b/api/metrics/worker_metrics.py @@ -0,0 +1,86 @@ +import os as os +from pathlib import Path +from typing import Iterable + +import prometheus_client + +PROC_SELF_STATUS_PATH = Path("/proc/self/status") +MAX_RSS_KB_TO_BYTES = 1024 +MAX_RSS_STATUS_FIELD = "VmHWM" + +flagsmith_worker_rss_bytes = prometheus_client.Gauge( + "flagsmith_worker_rss_bytes", + "Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status.", + ["pid"], + multiprocess_mode="liveall", +) + + +def update_worker_metrics() -> None: + """ + Update the RSS gauge with the current worker process high-water mark. + """ + current_pid = os.getpid() + + rss_value = get_current_process_max_rss_bytes() + if rss_value is not None: + flagsmith_worker_rss_bytes.labels(pid=str(current_pid)).set(rss_value) + + +def clear_worker_metrics() -> None: + """ + Clear the RSS memory usage metric for the current worker process. + This should be called when a worker process is shutting down to prevent stale metrics. + """ + current_pid = os.getpid() + try: + flagsmith_worker_rss_bytes.remove(str(current_pid)) + except (KeyError, ValueError): + pass + + +def get_current_process_max_rss_bytes() -> int | None: + try: + proc_status_lines = PROC_SELF_STATUS_PATH.read_text( + encoding="utf-8" + ).splitlines() + except (FileNotFoundError, OSError, UnicodeDecodeError): + return None + + max_rss_kb = _get_proc_status_memory_kb(proc_status_lines, MAX_RSS_STATUS_FIELD) + if max_rss_kb is None: + return None + + return max_rss_kb * MAX_RSS_KB_TO_BYTES + + +def _get_proc_status_memory_kb( + proc_status_lines: Iterable[str], + field_name: str, +) -> int | None: + for line in proc_status_lines: + name, separator, value = line.strip().partition(":") + if separator and name == field_name: + return _parse_proc_status_memory_kb(value) + + return None + + +def _parse_proc_status_memory_kb(value: str) -> int | None: + parts = value.split() + if len(parts) != 2: + return None + + memory_kb_text, unit = parts + if unit != "kB": + return None + + try: + memory_kb = int(memory_kb_text) + except ValueError: + return None + + if memory_kb < 0: + return None + + return memory_kb diff --git a/api/tests/integration/core/test_integration_core_worker_rss_metric.py b/api/tests/integration/core/test_integration_core_worker_rss_metric.py new file mode 100644 index 000000000000..92f2c06d0817 --- /dev/null +++ b/api/tests/integration/core/test_integration_core_worker_rss_metric.py @@ -0,0 +1,46 @@ +import os + +from django.conf import settings as django_settings +from django.test import Client, override_settings +from prometheus_client import REGISTRY, generate_latest +from pytest_mock import MockerFixture + +from metrics.worker_metrics import clear_worker_metrics + + +@override_settings( + MIDDLEWARE=[ + *django_settings.MIDDLEWARE, + "core.middleware.worker_rss.WorkerRSSMiddleware", + ] +) +def test_worker_rss_metric__request_through_middleware__appears_in_prometheus_output( + client: Client, + mocker: MockerFixture, +) -> None: + # Given - deterministic RSS reading so the test is independent of /proc availability + # on macOS/Windows CI runners. + expected_rss = 12_345_678 + mocker.patch( + "metrics.worker_metrics.get_current_process_max_rss_bytes", + return_value=expected_rss, + ) + + # When - any cheap, known-reachable endpoint trips the middleware after response. + response = client.get("/api/v1/swagger.json", HTTP_ACCEPT="application/json") + + # Then - the response is unaffected by the middleware, and the gauge is exposed + # with a sample for the current worker's PID via the Prometheus exposition format. + assert response.status_code == 200 + output = generate_latest(REGISTRY).decode() + assert "flagsmith_worker_rss_bytes" in output + assert f'pid="{os.getpid()}"' in output + + +def teardown_function(function: object) -> None: + # Prevent labelled-child leakage to other tests in the same xdist worker by removing + # this PID's sample after each test. Uses the existing module API. + try: + clear_worker_metrics() + except Exception: + pass diff --git a/api/tests/unit/core/middleware/test_unit_core_middleware_structlog_context.py b/api/tests/unit/core/middleware/test_unit_core_middleware_structlog_context.py new file mode 100644 index 000000000000..0b2b03cdfcbc --- /dev/null +++ b/api/tests/unit/core/middleware/test_unit_core_middleware_structlog_context.py @@ -0,0 +1,386 @@ +import pytest +import structlog +from django.http import HttpResponse +from pytest_structlog import StructuredLogCapture +from structlog.contextvars import bind_contextvars, clear_contextvars, get_contextvars + +from api_keys.user import APIKeyUser +from core.middleware.structlog_context import StructlogContextMiddleware +from users.models import FFAdminUser + + +@pytest.fixture(autouse=True) +def _clear_structlog_contextvars(): # type: ignore[no-untyped-def] + """Ensure each test starts and ends with a clean structlog.contextvars state.""" + clear_contextvars() + yield + clear_contextvars() + + +def test_structlog_context__without_middleware__event_only_has_manual_kwargs( + log: StructuredLogCapture, +) -> None: + """ + BEFORE snapshot for Flagsmith #7298. + + Demonstrates current behaviour: a logger.info call only emits the + kwargs the caller passes explicitly. No automatic user/org/project/ + environment context is added — because no middleware binds anything + onto structlog.contextvars yet. + + After the request-context middleware lands, a call site running inside + a request scope would additionally pick up user.id, organisation.id, + project.id, environment.id automatically. + """ + # Given + logger = structlog.get_logger("demo") + + # When — emit the kind of event that exists today at e.g. + # api/projects/code_references/views.py:61 + logger.info( + "scan.created", + organisation__id=42, + code_references__count=5, + ) + + # Then — captured event contains only what was passed in + assert log.events == [ + { + "level": "info", + "event": "scan.created", + "organisation__id": 42, + "code_references__count": 5, + } + ] + + +def test_structlog_context__with_middleware__event_inherits_bound_contextvars( + mocker, # type: ignore[no-untyped-def] + log: StructuredLogCapture, +) -> None: + """ + AFTER snapshot for Flagsmith #7298. + + Demonstrates the behaviour with the request-context middleware in place. + A logger.info call inside a request scope automatically picks up the + user/org context from the middleware's contextvars bindings, even when + the call site does NOT pass them as kwargs. + + Compare to test_structlog_context__without_middleware__event_only_has_manual_kwargs + above (the BEFORE snapshot) — the same call site now emits an event + carrying identifiers the caller never passed. + """ + # Given — an authenticated FFAdminUser + user = mocker.MagicMock(spec=FFAdminUser) + user.is_authenticated = True + user.uuid = "alice-uuid" + user.organisations.first.return_value = mocker.MagicMock(id=7) + request = mocker.MagicMock() + request.user = user + + logger = structlog.get_logger("demo") + + def view(_request): # type: ignore[no-untyped-def] + # The view emits its event with ONLY the kwargs not covered by the + # middleware. Note the absence of organisation__id — it's now bound + # globally by the middleware and merged in automatically. + + logger.info( + "scan.created", + code_references__count=5, + ) + return HttpResponse() + + middleware = StructlogContextMiddleware(view) + + # When — request flows through the middleware + middleware(request) + + # Then — the captured event has the manual kwarg AND the auto-bound + # user.id and organisation.id from the middleware + assert log.events == [ + { + "level": "info", + "event": "scan.created", + "user__id": "alice-uuid", + "organisation__id": 7, + "code_references__count": 5, + } + ] + + +def test_structlog_context_middleware__any_request__returns_response_unchanged(mocker): # type: ignore[no-untyped-def] + # Given + expected_response = HttpResponse(status=200) + middleware = StructlogContextMiddleware(lambda _request: expected_response) + + # When + result = middleware(mocker.MagicMock()) + + # Then + assert result is expected_response + + +def test_structlog_context_middleware__bindings_made_in_view__cleared_after_response( + mocker, +): # type: ignore[no-untyped-def] + # Given — a view that binds something onto contextvars + def view_that_binds(_request): # type: ignore[no-untyped-def] + bind_contextvars(some_key="some_value") + return HttpResponse() + + middleware = StructlogContextMiddleware(view_that_binds) + + # When + middleware(mocker.MagicMock()) + + # Then — bindings made during the request are cleared on exit + assert get_contextvars() == {} + + +def test_structlog_context_middleware__view_exception__contextvars_still_cleared( + mocker, +): # type: ignore[no-untyped-def] + # Given — a view that binds then raises + def faulty_view(_request): # type: ignore[no-untyped-def] + bind_contextvars(some_key="some_value") + raise ValueError("boom") + + middleware = StructlogContextMiddleware(faulty_view) + + # When / Then — exception propagates, but contextvars are still cleared + with pytest.raises(ValueError): + middleware(mocker.MagicMock()) + + assert get_contextvars() == {} + + +def test_structlog_context_middleware__ffadmin_user__binds_user_id_and_organisation_id( + mocker, +): # type: ignore[no-untyped-def] + # Given — an authenticated FFAdminUser with one organisation + user = mocker.MagicMock(spec=FFAdminUser) + user.is_authenticated = True + user.uuid = "alice-uuid" + user.organisations.first.return_value = mocker.MagicMock(id=42) + + captured: dict = {} + + def view_captures_context(_request): # type: ignore[no-untyped-def] + captured.update(get_contextvars()) + return HttpResponse() + + request = mocker.MagicMock() + request.user = user + + middleware = StructlogContextMiddleware(view_captures_context) + + # When + middleware(request) + + # Then — both user.id and organisation.id bound during the request + assert captured == {"user__id": "alice-uuid", "organisation__id": 42} + + +def test_structlog_context_middleware__api_key_user__binds_only_organisation_id(mocker): # type: ignore[no-untyped-def] + # Given — an authenticated APIKeyUser (Master API Key principal). + # APIKeyUser.key is set in __init__, so we instantiate a real one with + # a mocked MasterAPIKey rather than spec-mocking the class (spec only + # exposes class-level attributes). + master_key = mocker.MagicMock() + master_key.organisation_id = 7 + user = APIKeyUser(key=master_key) + + captured: dict = {} + + def view_captures_context(_request): # type: ignore[no-untyped-def] + captured.update(get_contextvars()) + return HttpResponse() + + request = mocker.MagicMock() + request.user = user + + middleware = StructlogContextMiddleware(view_captures_context) + + # When + middleware(request) + + # Then — only organisation.id bound. APIKeyUser has no uuid, so no + # user.id binding (per ticket: avoids mixing identifier kinds) + assert captured == {"organisation__id": 7} + + +def test_structlog_context_middleware__anonymous_user__binds_nothing(mocker): # type: ignore[no-untyped-def] + # Given — an unauthenticated request (e.g. DRF auth not yet resolved + # at middleware time, or an actually-anonymous request) + user = mocker.MagicMock() + user.is_authenticated = False + + captured: dict = {} + + def view_captures_context(_request): # type: ignore[no-untyped-def] + captured.update(get_contextvars()) + return HttpResponse() + + request = mocker.MagicMock() + request.user = user + + middleware = StructlogContextMiddleware(view_captures_context) + + # When + middleware(request) + + # Then — nothing bound + assert captured == {} + + +def test_structlog_context_middleware__ffadmin_user_with_no_organisations__binds_only_user_id( + mocker, +): # type: ignore[no-untyped-def] + # Given — an FFAdminUser that belongs to no organisations + user = mocker.MagicMock(spec=FFAdminUser) + user.is_authenticated = True + user.uuid = "alice-uuid" + user.organisations.first.return_value = None + + captured: dict = {} + + def view_captures_context(_request): # type: ignore[no-untyped-def] + captured.update(get_contextvars()) + return HttpResponse() + + request = mocker.MagicMock() + request.user = user + + middleware = StructlogContextMiddleware(view_captures_context) + + # When + middleware(request) + + # Then — only user.id bound; organisation.id is skipped gracefully + assert captured == {"user__id": "alice-uuid"} + + +def test_structlog_context_middleware__process_view_with_project_pk__binds_project_id( + mocker, +): # type: ignore[no-untyped-def] + # Given + middleware = StructlogContextMiddleware(lambda _r: HttpResponse()) + + # When — process_view is invoked by Django with URL kwargs resolved + middleware.process_view( + request=mocker.MagicMock(), + view_func=mocker.MagicMock(), + view_args=(), + view_kwargs={"project_pk": 42}, + ) + + # Then + assert get_contextvars() == {"project__id": 42} + + +def test_structlog_context_middleware__process_view_with_environment_api_key__binds_environment_id( + mocker, +): # type: ignore[no-untyped-def] + # Given — Environment.get_from_cache returns a hit + environment = mocker.MagicMock(id=99) + mocker.patch( + "core.middleware.structlog_context.Environment.get_from_cache", + return_value=environment, + ) + middleware = StructlogContextMiddleware(lambda _r: HttpResponse()) + + # When + middleware.process_view( + request=mocker.MagicMock(), + view_func=mocker.MagicMock(), + view_args=(), + view_kwargs={"environment_api_key": "some-key"}, + ) + + # Then + assert get_contextvars() == {"environment__id": 99} + + +def test_structlog_context_middleware__process_view_with_unknown_environment_key__binds_nothing( + mocker, +): # type: ignore[no-untyped-def] + # Given — get_from_cache returns None (invalid/unknown key) + mocker.patch( + "core.middleware.structlog_context.Environment.get_from_cache", + return_value=None, + ) + middleware = StructlogContextMiddleware(lambda _r: HttpResponse()) + + # When + middleware.process_view( + request=mocker.MagicMock(), + view_func=mocker.MagicMock(), + view_args=(), + view_kwargs={"environment_api_key": "bad-key"}, + ) + + # Then — no environment.id binding; lookup failure is silent + assert get_contextvars() == {} + + +def test_structlog_context_middleware__process_view_without_url_kwargs__binds_nothing( + mocker, +): # type: ignore[no-untyped-def] + # Given + middleware = StructlogContextMiddleware(lambda _r: HttpResponse()) + + # When — process_view called with no relevant kwargs + middleware.process_view( + request=mocker.MagicMock(), + view_func=mocker.MagicMock(), + view_args=(), + view_kwargs={}, + ) + + # Then + assert get_contextvars() == {} + + +def test_structlog_context_middleware__sequential_requests_on_same_instance__no_leakage( + mocker, +): # type: ignore[no-untyped-def] + """ + Cross-request leakage test (Flagsmith #7298 acceptance criterion). + + Django reuses one middleware instance per worker, and Gunicorn's gthread + workers reuse threads across requests. Without `clear_contextvars()` in + `finally`, request B on the same thread would inherit request A's + bindings. This test verifies the cleanup actually works under that + realistic reuse pattern. + """ + # Given — a single middleware instance (the "long-lived worker") + captured_states: list[dict] = [] + + def view_captures_state(_request): # type: ignore[no-untyped-def] + captured_states.append(dict(get_contextvars())) + return HttpResponse() + + middleware = StructlogContextMiddleware(view_captures_state) + + # First request: an authenticated FFAdminUser — bindings expected + first_user = mocker.MagicMock(spec=FFAdminUser) + first_user.is_authenticated = True + first_user.uuid = "alice-uuid" + first_user.organisations.first.return_value = mocker.MagicMock(id=42) + first_request = mocker.MagicMock() + first_request.user = first_user + + # Second request: anonymous — should see ZERO bindings + second_user = mocker.MagicMock() + second_user.is_authenticated = False + second_request = mocker.MagicMock() + second_request.user = second_user + + # When — both requests run sequentially on the same middleware + middleware(first_request) + middleware(second_request) + + # Then — first saw its bindings, second saw nothing leaked through + assert captured_states[0] == {"user__id": "alice-uuid", "organisation__id": 42} + assert captured_states[1] == {} diff --git a/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py b/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py new file mode 100644 index 000000000000..f9b4bb71fd64 --- /dev/null +++ b/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py @@ -0,0 +1,53 @@ +from django.http import HttpResponse + +from core.middleware.worker_rss import WorkerRSSMiddleware + + +def test_worker_rss_middleware__any_request__calls_update_after_response(mocker): # type: ignore[no-untyped-def] + # Given + call_order = [] + + def fake_get_response(request): # type: ignore[no-untyped-def] + call_order.append("handled") + return HttpResponse() + + mocker.patch( + "core.middleware.worker_rss.update_worker_metrics", + side_effect=lambda: call_order.append("updated"), + ) + middleware = WorkerRSSMiddleware(fake_get_response) + + # When + middleware(mocker.MagicMock()) + + # Then — metric must be updated after the request is handled, not before + assert call_order == ["handled", "updated"] + + +def test_worker_rss_middleware__any_request__returns_response_unchanged(mocker): # type: ignore[no-untyped-def] + # Given + expected_response = HttpResponse(status=200) + mocker.patch("core.middleware.worker_rss.update_worker_metrics") + middleware = WorkerRSSMiddleware(lambda _request: expected_response) + + # When + result = middleware(mocker.MagicMock()) + + # Then + assert result is expected_response + + +def test_worker_rss_middleware__update_raises__request_still_completes(mocker): # type: ignore[no-untyped-def] + # Given + expected_response = HttpResponse(status=200) + mocker.patch( + "core.middleware.worker_rss.update_worker_metrics", + side_effect=Exception("metric failure"), + ) + middleware = WorkerRSSMiddleware(lambda _request: expected_response) + + # When + result = middleware(mocker.MagicMock()) + + # Then — exception is swallowed, response still returned + assert result is expected_response diff --git a/api/tests/unit/metrics/test_unit_worker_metrics.py b/api/tests/unit/metrics/test_unit_worker_metrics.py new file mode 100644 index 000000000000..c9b3f9238e4c --- /dev/null +++ b/api/tests/unit/metrics/test_unit_worker_metrics.py @@ -0,0 +1,244 @@ +from pathlib import Path +from typing import Optional + +import pytest + +from metrics import worker_metrics + + +class MockGaugeLabels: + def __init__(self) -> None: + self.set_called_with: Optional[int] = None + + def set(self, value: int) -> None: + self.set_called_with = value + + +class MockGauge: + def __init__(self) -> None: + self.labels_called_with: Optional[str] = None + self.remove_called_with: Optional[str] = None + self.mock_labels: MockGaugeLabels = MockGaugeLabels() + self.should_raise_on_remove: Optional[Exception] = None + + def labels(self, *, pid: str) -> MockGaugeLabels: + self.labels_called_with = pid + return self.mock_labels + + def remove(self, *, pid: str) -> None: + self.remove_called_with = pid + if self.should_raise_on_remove: + raise self.should_raise_on_remove + + +class UnreadableStatusPath: + def read_text(self, encoding: str) -> str: + raise OSError("status file unavailable") + + +def test_get_current_process_max_rss_bytes__vmhwm_available__returns_bytes( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + max_rss_kb = 123 + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\nVmHWM:\t{max_rss_kb} kB\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result == max_rss_kb * worker_metrics.MAX_RSS_KB_TO_BYTES + + +def test_get_current_process_max_rss_bytes__vmhwm_has_extra_whitespace__returns_bytes( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + max_rss_kb = 456 + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\n VmHWM: {max_rss_kb} kB \nVmRSS:\t10 kB\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result == max_rss_kb * worker_metrics.MAX_RSS_KB_TO_BYTES + + +def test_get_current_process_max_rss_bytes__status_file_missing__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", tmp_path / "missing") + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_get_current_process_max_rss_bytes__vmhwm_missing__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + status_path = tmp_path / "status" + status_path.write_text("Name:\tgunicorn\nVmRSS:\t10 kB\n", encoding="utf-8") + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +@pytest.mark.parametrize( + "vmhwm_value", + [ + "-1 kB", + "not-a-number kB", + "123 MB", + "123", + "123 kB extra", + ], +) +def test_get_current_process_max_rss_bytes__vmhwm_invalid__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + vmhwm_value: str, +) -> None: + # Given + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\nVmHWM:\t{vmhwm_value}\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_get_current_process_max_rss_bytes__status_file_read_error__returns_none( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", UnreadableStatusPath()) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_update_worker_metrics__rss_available__updates_gauge( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_rss = 1048576 # 1 MB + mock_pid = 12345 + mock_gauge = MockGauge() + + monkeypatch.setattr( + worker_metrics, "get_current_process_max_rss_bytes", lambda: mock_rss + ) + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.update_worker_metrics() + + # Then + assert mock_gauge.labels_called_with == str(mock_pid) + assert mock_gauge.mock_labels.set_called_with == mock_rss + + +def test_update_worker_metrics__rss_none__does_not_update_gauge( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 12345 + mock_gauge = MockGauge() + + monkeypatch.setattr( + worker_metrics, "get_current_process_max_rss_bytes", lambda: None + ) + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.update_worker_metrics() + + # Then + assert mock_gauge.labels_called_with is None + + +def test_clear_worker_metrics__label_present__removes_gauge_label( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) + + +def test_clear_worker_metrics__keyerror__silently_handles( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + mock_gauge.should_raise_on_remove = KeyError("Label not found") + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When/Then (should not raise) + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) + + +def test_clear_worker_metrics__valueerror__silently_handles( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + mock_gauge.should_raise_on_remove = ValueError("Invalid label") + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When/Then (should not raise) + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) diff --git a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md index b931a958595e..96638e6f6579 100644 --- a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md @@ -133,3 +133,14 @@ Labels: - `task_type` - `result` +### `flagsmith_worker_rss_bytes` + +Gauge. + +Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status. + +Labels: + - `pid` + + + diff --git a/docs/docs/deployment-self-hosting/observability/metrics.mdx b/docs/docs/deployment-self-hosting/observability/metrics.mdx index 3becac991b2c..1d02f3b5ad40 100644 --- a/docs/docs/deployment-self-hosting/observability/metrics.mdx +++ b/docs/docs/deployment-self-hosting/observability/metrics.mdx @@ -16,7 +16,12 @@ The metrics provided by Flagsmith are described below. +## Monitoring worker memory + +The `flagsmith_worker_rss_bytes` gauge reports the peak resident-set size for each worker process. See the +[Worker RSS monitoring guide](./worker-rss-monitoring) for PromQL examples, Grafana panels, and interpretation notes. + ## StatsD -The Flagsmith WSGI worker emits per-request access log metrics (request counts, durations, HTTP statuses) to StatsD -when configured. See [StatsD](/deployment-self-hosting/observability/monitoring#statsd) for setup. +The Flagsmith WSGI worker emits per-request access log metrics (request counts, durations, HTTP statuses) to StatsD when +configured. See [StatsD](/deployment-self-hosting/observability/monitoring#statsd) for setup. diff --git a/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md b/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md new file mode 100644 index 000000000000..2bb4a86e8883 --- /dev/null +++ b/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md @@ -0,0 +1,129 @@ +--- +title: Worker RSS monitoring +sidebar_position: 15 +description: Track the peak memory of each Flagsmith API worker process with Prometheus and Grafana. +--- + +The `flagsmith_worker_rss_bytes` gauge exposes the peak resident-set size of every API worker process, labelled by +process ID. This is the most reliable signal for detecting workers that grow unboundedly (a leak) versus workers that +grow under load and stabilise. Use this page once you have Prometheus scraping configured — see +[Monitoring](./monitoring) for setup. + +## Overview + +A worker's RSS is the amount of physical memory the operating system currently has mapped for that process. Python-level +profilers tend to miss leaks that live in C extensions, page caches, or the allocator's free lists, so process-level RSS +is often the only reliable signal in production. + +`flagsmith_worker_rss_bytes` reports the **high-water mark** — the peak RSS observed for the worker since it started. +The value is read from the `VmHWM` line of `/proc/self/status`, which the Linux kernel maintains atomically. The metric +is updated once per HTTP request handled by the worker. + +The gauge has a single label, `pid`, identifying the worker process. When Flagsmith is deployed with multiple gunicorn +workers, you will see one time series per worker. + +## Enabling + +Set the environment variable: + +```bash +PROMETHEUS_ENABLED=true +``` + +This activates the `WorkerRSSMiddleware` that updates the gauge after each request. No further configuration is required +for single-process deployments. + +### Multi-worker deployments + +To aggregate metrics across gunicorn workers, set `PROMETHEUS_MULTIPROC_DIR` to a writable directory: + +```bash +PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus +``` + +The official Flagsmith Docker image sets this automatically. For bare-metal or custom-container deployments, configure +it yourself; otherwise the `/metrics` endpoint will only report data from whichever worker happened to handle the scrape +request. + +## Sample output + +Scraping `/metrics` on a Flagsmith API with two workers running yields output similar to: + +```text +# HELP flagsmith_worker_rss_bytes Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status. +# TYPE flagsmith_worker_rss_bytes gauge +flagsmith_worker_rss_bytes{pid="1234"} 4.8259072e+07 +flagsmith_worker_rss_bytes{pid="1235"} 5.2215808e+07 +``` + +Each `pid` corresponds to a live worker. Values are in bytes; the example above shows roughly 46 MiB and 50 MiB +respectively. + +## PromQL examples + +Useful queries to drop into dashboards or alerts. + +**Per-worker peak RSS (raw):** + +```promql +flagsmith_worker_rss_bytes +``` + +**Maximum peak across all workers:** + +```promql +max(flagsmith_worker_rss_bytes) +``` + +**Peak per worker over the last hour:** + +```promql +max_over_time(flagsmith_worker_rss_bytes[1h]) +``` + +**Growth indicator — peak RSS now minus peak RSS one hour ago:** + +```promql +flagsmith_worker_rss_bytes - flagsmith_worker_rss_bytes offset 1h +``` + +A consistently positive value across many workers and time windows points to a leak. A value that spikes once after a +deployment and then stays flat is normal — the workers grew under load and levelled off. + +## Grafana panel + +A reasonable starting point for a "Worker memory" panel: + +| Setting | Value | +| ------------- | -------------------------------------------- | +| Visualisation | Time series | +| Query | `flagsmith_worker_rss_bytes` | +| Legend | `{{pid}}` | +| Unit | bytes (IEC) — Grafana renders as KiB/MiB/GiB | +| Stacking | Disabled — each worker is independent | + +Add a second panel showing `max(flagsmith_worker_rss_bytes)` for a single-number overview. + +## Interpretation notes + +The metric is a high-water mark, not a current reading. Understanding the implications avoids false alerts. + +- **The value never decreases for a given PID.** Once a worker has peaked at a particular RSS, the gauge for that PID + will stay at that value until the worker process exits. Recovery is observed through PID rotation: when a worker is + recycled (for example, by gunicorn's `--max-requests` setting or by a deployment), the old PID's time series goes + stale and a new PID appears with a fresh, lower value. +- **Steady high RSS is normal after warm-up.** A worker that loads caches at startup will reach its steady-state peak + quickly and stay there. This appears as a flat line in Grafana, not a leak. +- **Periodic large workloads inflate the peak.** If a worker occasionally processes a large payload (for example, a bulk + export), the gauge will pin at that peak for the rest of the worker's lifetime even after the memory has been freed. + Investigate via PID rotation rather than waiting for the value to fall. +- **Leak signature.** A genuine leak shows up as the peak climbing across many worker restarts — every newly forked + worker reaches a higher peak than its predecessor. +- **Quirk: parent-process inheritance.** On Linux, the kernel may preserve the high-water mark across `execve()`, so a + freshly spawned worker can report a non-zero baseline inherited from its parent. Treat the first scrape after a + deployment as informational rather than a true zero. + +## Related documentation + +- [Metrics reference](./metrics) — full catalogue of exported Prometheus metrics. +- [Monitoring](./monitoring) — enabling `/metrics` and other vendor integrations.