diff --git a/.env.example b/.env.example index c65267d..8afaffe 100644 --- a/.env.example +++ b/.env.example @@ -177,3 +177,12 @@ BOOST_ENDPOINT_THROTTLE_ADD_OR_UPDATE=10/hour # Comma-separated hostnames allowed for git clone URLs (HTTPS only). # Default when unset: github.com. Set to empty to disable host allowlisting (not recommended). BOOST_ALLOWED_CLONE_HOSTS=github.com + +# Redis task-lock settings for Celery add-or-update deduplication (settings_override.py). +# BOOST_TASK_LOCK_TIMEOUT: lock TTL in seconds (default 1800). +# BOOST_TASK_LOCK_ON_CONFLICT: skip (reject duplicate immediately) or wait (block up to wait_timeout). +# BOOST_TASK_LOCK_WAIT_TIMEOUT: max seconds to wait when on_conflict=wait (default 300). + +BOOST_TASK_LOCK_TIMEOUT=1800 +BOOST_TASK_LOCK_ON_CONFLICT=skip +BOOST_TASK_LOCK_WAIT_TIMEOUT=300 diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 1c53c1d..b452eee 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -21,8 +21,10 @@ updates: labels: - dependencies - python - # Note: dependency-type-based groups are not supported for uv yet - # (see dependabot/dependabot-core#13202). Group by semver bump size instead. + # Direct deps only (packaging, Weblate[all]). Transitive lockfile entries are + # pinned by Weblate and updated via weblate-pin-bump, not independently. + allow: + - dependency-type: direct groups: uv-patch-minor: update-types: diff --git a/.github/workflows/weblate-pin-bump.yml b/.github/workflows/weblate-pin-bump.yml index 7c2c0ff..2683998 100644 --- a/.github/workflows/weblate-pin-bump.yml +++ b/.github/workflows/weblate-pin-bump.yml @@ -29,6 +29,8 @@ jobs: - uses: astral-sh/setup-uv@fac544c07dec837d0ccb6301d7b5580bf5edae39 with: version: 0.11.12 + - name: Install apt dependencies (Weblate venv) + run: sudo ./.github/ci/apt-install - name: Resolve and apply Weblate bump id: bump env: diff --git a/src/boost_weblate/endpoint/errors.py b/src/boost_weblate/endpoint/errors.py index 95b4152..8050c9d 100644 --- a/src/boost_weblate/endpoint/errors.py +++ b/src/boost_weblate/endpoint/errors.py @@ -31,6 +31,7 @@ class BoostEndpointErrorCode(StrEnum): REQUIRED_FIELD = "required_field" TASK_USER_NOT_FOUND = "task_user_not_found" TASK_INTERNAL_ERROR = "task_internal_error" + TASK_DUPLICATE = "task_duplicate" class BoostEndpointError(WeblateError): diff --git a/src/boost_weblate/endpoint/tasks.py b/src/boost_weblate/endpoint/tasks.py index 27a9b43..8638d96 100644 --- a/src/boost_weblate/endpoint/tasks.py +++ b/src/boost_weblate/endpoint/tasks.py @@ -18,9 +18,14 @@ wrap_task_error, ) from boost_weblate.endpoint.services import BoostComponentService +from boost_weblate.utils.task_lock import ( + build_add_or_update_lock_key, + redis_task_lock, +) @app.task(trail=False) +@redis_task_lock(key_builder=build_add_or_update_lock_key) def boost_add_or_update_task( *, organization: str, diff --git a/src/boost_weblate/settings_override.py b/src/boost_weblate/settings_override.py index 75c0359..7162cb4 100644 --- a/src/boost_weblate/settings_override.py +++ b/src/boost_weblate/settings_override.py @@ -134,6 +134,40 @@ def allowed_clone_hosts() -> list[str]: ALLOWED_CLONE_HOSTS = allowed_clone_hosts() +_DEFAULT_BOOST_TASK_LOCK_TIMEOUT = 1800 +_DEFAULT_BOOST_TASK_LOCK_ON_CONFLICT = "skip" +_DEFAULT_BOOST_TASK_LOCK_WAIT_TIMEOUT = 300 + + +def boost_task_lock_settings() -> dict[str, Any]: + """Redis task-lock settings for Celery deduplication (env overrides optional).""" + return { + "timeout": int( + os.environ.get( + "BOOST_TASK_LOCK_TIMEOUT", + str(_DEFAULT_BOOST_TASK_LOCK_TIMEOUT), + ) + ), + "on_conflict": os.environ.get( + "BOOST_TASK_LOCK_ON_CONFLICT", + _DEFAULT_BOOST_TASK_LOCK_ON_CONFLICT, + ) + .lower() + .strip(), + "wait_timeout": int( + os.environ.get( + "BOOST_TASK_LOCK_WAIT_TIMEOUT", + str(_DEFAULT_BOOST_TASK_LOCK_WAIT_TIMEOUT), + ) + ), + } + + +_task_lock_settings = boost_task_lock_settings() +BOOST_TASK_LOCK_TIMEOUT = _task_lock_settings["timeout"] +BOOST_TASK_LOCK_ON_CONFLICT = _task_lock_settings["on_conflict"] +BOOST_TASK_LOCK_WAIT_TIMEOUT = _task_lock_settings["wait_timeout"] + def merge_boost_endpoint_throttle_rates( rest_framework: dict[str, Any], diff --git a/src/boost_weblate/utils/task_lock.py b/src/boost_weblate/utils/task_lock.py new file mode 100644 index 0000000..3e49fb6 --- /dev/null +++ b/src/boost_weblate/utils/task_lock.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: 2026 Andrew Zhang +# +# SPDX-License-Identifier: BSL-1.0 + +"""Redis distributed lock decorator for Celery task deduplication.""" + +from __future__ import annotations + +import functools +import hashlib +import json +import logging +from collections.abc import Callable +from typing import Any, ParamSpec, TypeVar + +from django_redis import get_redis_connection +from redis.exceptions import LockError, RedisError +from redis.lock import Lock + +from boost_weblate.endpoint.errors import BoostEndpointError, BoostEndpointErrorCode +from boost_weblate.settings_override import ( + BOOST_TASK_LOCK_ON_CONFLICT, + BOOST_TASK_LOCK_TIMEOUT, + BOOST_TASK_LOCK_WAIT_TIMEOUT, +) + +logger = logging.getLogger(__name__) + +P = ParamSpec("P") +R = TypeVar("R") + +_ADD_OR_UPDATE_TASK_NAME = "boost_add_or_update_task" + + +def _get_redis_client(): + return get_redis_connection("default") + + +def build_add_or_update_lock_key(**task_kwargs: Any) -> str: + """Build a stable Redis lock key from add-or-update task kwargs.""" + add_or_update = task_kwargs["add_or_update"] + canonical_add_or_update = { + lang: sorted(submodules) for lang, submodules in sorted(add_or_update.items()) + } + extensions = task_kwargs.get("extensions") + if extensions is not None: + extensions = sorted(extensions) + payload = { + "organization": task_kwargs["organization"], + "version": task_kwargs["version"], + "extensions": extensions, + "add_or_update": canonical_add_or_update, + } + digest = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()[ + :32 + ] + return f"boost_weblate:task_lock:{_ADD_OR_UPDATE_TASK_NAME}:{digest}" + + +def redis_task_lock( + key_builder: Callable[..., str], + *, + timeout: int | None = None, + on_conflict: str | None = None, + wait_timeout: int | None = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """Acquire a Redis lock before running a Celery task body.""" + + def decorator(func: Callable[P, R]) -> Callable[P, R]: + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + key = key_builder(**kwargs) + client = _get_redis_client() + lock_ttl = timeout if timeout is not None else BOOST_TASK_LOCK_TIMEOUT + conflict_mode = ( + on_conflict if on_conflict is not None else BOOST_TASK_LOCK_ON_CONFLICT + ) + wait_limit = ( + wait_timeout + if wait_timeout is not None + else BOOST_TASK_LOCK_WAIT_TIMEOUT + ) + + lock = Lock(client, name=key, timeout=lock_ttl, thread_local=False) + + if conflict_mode == "wait": + acquired = lock.acquire(blocking=True, blocking_timeout=wait_limit) + else: + acquired = lock.acquire(blocking=False) + + if not acquired: + logger.info( + "Duplicate task rejected (lock held): lock_key=%s", + key, + ) + raise BoostEndpointError( + "Duplicate add-or-update task already running", + code=BoostEndpointErrorCode.TASK_DUPLICATE, + metadata={"lock_key": key}, + ) + + try: + return func(*args, **kwargs) + finally: + try: + lock.release() + except LockError: + logger.debug( + "Lock release failed (lock-specific): lock_key=%s", + key, + exc_info=True, + ) + except RedisError: + logger.warning( + "Lock release failed (Redis infrastructure): lock_key=%s", + key, + exc_info=True, + ) + + return wrapper + + return decorator diff --git a/tests/endpoint/test_views.py b/tests/endpoint/test_views.py index e7017da..27fd5b8 100644 --- a/tests/endpoint/test_views.py +++ b/tests/endpoint/test_views.py @@ -38,6 +38,43 @@ "add_or_update": {"zh_Hans": ["json"]}, } +_TASK_KWARGS = { + "organization": "org", + "add_or_update": {"zh_Hans": ["a"], "ja": ["json"]}, + "version": "boost-1.0", + "extensions": [".md"], + "user_id": 7, +} + + +class _FakeLock: + """Redis lock stub that always acquires.""" + + def __init__(self) -> None: + self.released = False + + def acquire( + self, blocking: bool = True, blocking_timeout: float | None = None + ) -> bool: + return True + + def release(self) -> None: + self.released = True + + +@pytest.fixture(autouse=True) +def _mock_task_lock_acquire(monkeypatch: pytest.MonkeyPatch) -> None: + """Default: task lock always acquired unless a test overrides Lock.""" + + def _fake_lock(*_args, **_kwargs) -> _FakeLock: + return _FakeLock() + + monkeypatch.setattr("boost_weblate.utils.task_lock.Lock", _fake_lock) + monkeypatch.setattr( + "boost_weblate.utils.task_lock._get_redis_client", + lambda: MagicMock(), + ) + def _throttle_rest_framework(**rate_overrides: str) -> dict: rf = deepcopy(settings.REST_FRAMEWORK) @@ -275,6 +312,109 @@ def process_all(self, _submodules, *, user, request=None): # noqa: ANN001 assert exc_info.value.code == BoostEndpointErrorCode.TASK_INTERNAL_ERROR +def test_boost_add_or_update_task_duplicate_raises_task_duplicate( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from boost_weblate.endpoint import tasks as tasks_mod + + user = MagicMock() + monkeypatch.setattr(tasks_mod.User.objects, "get", lambda pk: user) + + process_calls = 0 + + class FakeService: + def __init__(self, **_kw): # noqa: ANN003 + pass + + def process_all(self, _submodules, *, user, request=None): # noqa: ANN001 + nonlocal process_calls + process_calls += 1 + return {} + + monkeypatch.setattr(tasks_mod, "BoostComponentService", FakeService) + + acquire_results = iter([True, False]) + + class DuplicateFakeLock: + def acquire( + self, blocking: bool = True, blocking_timeout: float | None = None + ) -> bool: + return next(acquire_results) + + def release(self) -> None: + pass + + monkeypatch.setattr( + "boost_weblate.utils.task_lock.Lock", + lambda *_a, **_k: DuplicateFakeLock(), + ) + + tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS) + + with pytest.raises(BoostEndpointError) as exc_info: + tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS) + + assert exc_info.value.code == BoostEndpointErrorCode.TASK_DUPLICATE + assert "lock_key" in exc_info.value.metadata + assert process_calls == 2 + + +def test_boost_add_or_update_task_lock_released_on_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from boost_weblate.endpoint import tasks as tasks_mod + + user = MagicMock() + monkeypatch.setattr(tasks_mod.User.objects, "get", lambda pk: user) + + class FakeService: + def __init__(self, **_kw): # noqa: ANN003 + pass + + def process_all(self, _submodules, *, user, request=None): # noqa: ANN001 + return {} + + monkeypatch.setattr(tasks_mod, "BoostComponentService", FakeService) + + lock = _FakeLock() + monkeypatch.setattr( + "boost_weblate.utils.task_lock.Lock", + lambda *_a, **_k: lock, + ) + + tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS) + assert lock.released is True + + +def test_build_add_or_update_lock_key_is_stable() -> None: + from boost_weblate.utils.task_lock import build_add_or_update_lock_key + + key_a = build_add_or_update_lock_key( + organization="org", + version="boost-1.0", + extensions=[".md", ".adoc"], + add_or_update={"ja": ["json", "asio"], "zh_Hans": ["a"]}, + user_id=1, + ) + key_b = build_add_or_update_lock_key( + organization="org", + version="boost-1.0", + extensions=[".adoc", ".md"], + add_or_update={"zh_Hans": ["a"], "ja": ["asio", "json"]}, + user_id=99, + ) + key_c = build_add_or_update_lock_key( + organization="org", + version="boost-2.0", + extensions=[".md", ".adoc"], + add_or_update={"ja": ["json", "asio"], "zh_Hans": ["a"]}, + user_id=1, + ) + + assert key_a == key_b + assert key_a != key_c + + def test_boost_endpoint_info_returns_429_when_scoped_throttled( scoped_low_throttle_rates, ) -> None: