Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,12 @@ BOOST_ENDPOINT_THROTTLE_ADD_OR_UPDATE=10/hour
# Comma-separated hostnames allowed for git clone URLs (HTTPS only).
# Default when unset: github.com. Set to empty to disable host allowlisting (not recommended).
BOOST_ALLOWED_CLONE_HOSTS=github.com

# Redis task-lock settings for Celery add-or-update deduplication (settings_override.py).
# BOOST_TASK_LOCK_TIMEOUT: lock TTL in seconds (default 1800).
# BOOST_TASK_LOCK_ON_CONFLICT: skip (reject duplicate immediately) or wait (block up to wait_timeout).
# BOOST_TASK_LOCK_WAIT_TIMEOUT: max seconds to wait when on_conflict=wait (default 300).

BOOST_TASK_LOCK_TIMEOUT=1800
BOOST_TASK_LOCK_ON_CONFLICT=skip
BOOST_TASK_LOCK_WAIT_TIMEOUT=300
6 changes: 4 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ updates:
labels:
- dependencies
- python
# Note: dependency-type-based groups are not supported for uv yet
# (see dependabot/dependabot-core#13202). Group by semver bump size instead.
# Direct deps only (packaging, Weblate[all]). Transitive lockfile entries are
# pinned by Weblate and updated via weblate-pin-bump, not independently.
allow:
- dependency-type: direct
groups:
uv-patch-minor:
update-types:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/weblate-pin-bump.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
- uses: astral-sh/setup-uv@fac544c07dec837d0ccb6301d7b5580bf5edae39
with:
version: 0.11.12
- name: Install apt dependencies (Weblate venv)
run: sudo ./.github/ci/apt-install
- name: Resolve and apply Weblate bump
id: bump
env:
Expand Down
1 change: 1 addition & 0 deletions src/boost_weblate/endpoint/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class BoostEndpointErrorCode(StrEnum):
REQUIRED_FIELD = "required_field"
TASK_USER_NOT_FOUND = "task_user_not_found"
TASK_INTERNAL_ERROR = "task_internal_error"
TASK_DUPLICATE = "task_duplicate"


class BoostEndpointError(WeblateError):
Expand Down
5 changes: 5 additions & 0 deletions src/boost_weblate/endpoint/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
wrap_task_error,
)
from boost_weblate.endpoint.services import BoostComponentService
from boost_weblate.utils.task_lock import (
build_add_or_update_lock_key,
redis_task_lock,
)


@app.task(trail=False)
@redis_task_lock(key_builder=build_add_or_update_lock_key)
def boost_add_or_update_task(
*,
organization: str,
Expand Down
34 changes: 34 additions & 0 deletions src/boost_weblate/settings_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,40 @@ def allowed_clone_hosts() -> list[str]:

ALLOWED_CLONE_HOSTS = allowed_clone_hosts()

_DEFAULT_BOOST_TASK_LOCK_TIMEOUT = 1800
_DEFAULT_BOOST_TASK_LOCK_ON_CONFLICT = "skip"
_DEFAULT_BOOST_TASK_LOCK_WAIT_TIMEOUT = 300


def boost_task_lock_settings() -> dict[str, Any]:
"""Redis task-lock settings for Celery deduplication (env overrides optional)."""
return {
"timeout": int(
os.environ.get(
"BOOST_TASK_LOCK_TIMEOUT",
str(_DEFAULT_BOOST_TASK_LOCK_TIMEOUT),
)
),
"on_conflict": os.environ.get(
"BOOST_TASK_LOCK_ON_CONFLICT",
_DEFAULT_BOOST_TASK_LOCK_ON_CONFLICT,
)
.lower()
.strip(),
"wait_timeout": int(
os.environ.get(
"BOOST_TASK_LOCK_WAIT_TIMEOUT",
str(_DEFAULT_BOOST_TASK_LOCK_WAIT_TIMEOUT),
)
),
Comment thread
whisper67265 marked this conversation as resolved.
}


_task_lock_settings = boost_task_lock_settings()
BOOST_TASK_LOCK_TIMEOUT = _task_lock_settings["timeout"]
BOOST_TASK_LOCK_ON_CONFLICT = _task_lock_settings["on_conflict"]
BOOST_TASK_LOCK_WAIT_TIMEOUT = _task_lock_settings["wait_timeout"]


def merge_boost_endpoint_throttle_rates(
rest_framework: dict[str, Any],
Expand Down
122 changes: 122 additions & 0 deletions src/boost_weblate/utils/task_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# SPDX-FileCopyrightText: 2026 Andrew Zhang <whisper67265@outlook.com>
#
# SPDX-License-Identifier: BSL-1.0

"""Redis distributed lock decorator for Celery task deduplication."""

from __future__ import annotations

import functools
import hashlib
import json
import logging
from collections.abc import Callable
from typing import Any, ParamSpec, TypeVar

from django_redis import get_redis_connection
from redis.exceptions import LockError, RedisError
from redis.lock import Lock

from boost_weblate.endpoint.errors import BoostEndpointError, BoostEndpointErrorCode
from boost_weblate.settings_override import (
BOOST_TASK_LOCK_ON_CONFLICT,
BOOST_TASK_LOCK_TIMEOUT,
BOOST_TASK_LOCK_WAIT_TIMEOUT,
)

logger = logging.getLogger(__name__)

P = ParamSpec("P")
R = TypeVar("R")

_ADD_OR_UPDATE_TASK_NAME = "boost_add_or_update_task"


def _get_redis_client():
return get_redis_connection("default")


def build_add_or_update_lock_key(**task_kwargs: Any) -> str:
"""Build a stable Redis lock key from add-or-update task kwargs."""
add_or_update = task_kwargs["add_or_update"]
canonical_add_or_update = {
lang: sorted(submodules) for lang, submodules in sorted(add_or_update.items())
}
extensions = task_kwargs.get("extensions")
if extensions is not None:
extensions = sorted(extensions)
payload = {
"organization": task_kwargs["organization"],
"version": task_kwargs["version"],
"extensions": extensions,
"add_or_update": canonical_add_or_update,
}
digest = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()[
:32
]
return f"boost_weblate:task_lock:{_ADD_OR_UPDATE_TASK_NAME}:{digest}"


def redis_task_lock(
key_builder: Callable[..., str],
*,
timeout: int | None = None,
on_conflict: str | None = None,
wait_timeout: int | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""Acquire a Redis lock before running a Celery task body."""

def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
key = key_builder(**kwargs)
client = _get_redis_client()
lock_ttl = timeout if timeout is not None else BOOST_TASK_LOCK_TIMEOUT
conflict_mode = (
on_conflict if on_conflict is not None else BOOST_TASK_LOCK_ON_CONFLICT
)
wait_limit = (
wait_timeout
if wait_timeout is not None
else BOOST_TASK_LOCK_WAIT_TIMEOUT
)

lock = Lock(client, name=key, timeout=lock_ttl, thread_local=False)

if conflict_mode == "wait":
acquired = lock.acquire(blocking=True, blocking_timeout=wait_limit)
else:
acquired = lock.acquire(blocking=False)

if not acquired:
logger.info(
"Duplicate task rejected (lock held): lock_key=%s",
key,
)
raise BoostEndpointError(
"Duplicate add-or-update task already running",
code=BoostEndpointErrorCode.TASK_DUPLICATE,
metadata={"lock_key": key},
)

try:
return func(*args, **kwargs)
finally:
try:
lock.release()
except LockError:
logger.debug(
"Lock release failed (lock-specific): lock_key=%s",
key,
exc_info=True,
)
except RedisError:
logger.warning(
"Lock release failed (Redis infrastructure): lock_key=%s",
key,
exc_info=True,
)

return wrapper

return decorator
140 changes: 140 additions & 0 deletions tests/endpoint/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,43 @@
"add_or_update": {"zh_Hans": ["json"]},
}

_TASK_KWARGS = {
"organization": "org",
"add_or_update": {"zh_Hans": ["a"], "ja": ["json"]},
"version": "boost-1.0",
"extensions": [".md"],
"user_id": 7,
}


class _FakeLock:
"""Redis lock stub that always acquires."""

def __init__(self) -> None:
self.released = False

def acquire(
self, blocking: bool = True, blocking_timeout: float | None = None
) -> bool:
return True

def release(self) -> None:
self.released = True


@pytest.fixture(autouse=True)
def _mock_task_lock_acquire(monkeypatch: pytest.MonkeyPatch) -> None:
"""Default: task lock always acquired unless a test overrides Lock."""

def _fake_lock(*_args, **_kwargs) -> _FakeLock:
return _FakeLock()

monkeypatch.setattr("boost_weblate.utils.task_lock.Lock", _fake_lock)
monkeypatch.setattr(
"boost_weblate.utils.task_lock._get_redis_client",
lambda: MagicMock(),
)


def _throttle_rest_framework(**rate_overrides: str) -> dict:
rf = deepcopy(settings.REST_FRAMEWORK)
Expand Down Expand Up @@ -275,6 +312,109 @@ def process_all(self, _submodules, *, user, request=None): # noqa: ANN001
assert exc_info.value.code == BoostEndpointErrorCode.TASK_INTERNAL_ERROR


def test_boost_add_or_update_task_duplicate_raises_task_duplicate(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from boost_weblate.endpoint import tasks as tasks_mod

user = MagicMock()
monkeypatch.setattr(tasks_mod.User.objects, "get", lambda pk: user)

process_calls = 0

class FakeService:
def __init__(self, **_kw): # noqa: ANN003
pass

def process_all(self, _submodules, *, user, request=None): # noqa: ANN001
nonlocal process_calls
process_calls += 1
return {}

monkeypatch.setattr(tasks_mod, "BoostComponentService", FakeService)

acquire_results = iter([True, False])

class DuplicateFakeLock:
def acquire(
self, blocking: bool = True, blocking_timeout: float | None = None
) -> bool:
return next(acquire_results)

def release(self) -> None:
pass

monkeypatch.setattr(
"boost_weblate.utils.task_lock.Lock",
lambda *_a, **_k: DuplicateFakeLock(),
)

tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS)

with pytest.raises(BoostEndpointError) as exc_info:
tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS)

assert exc_info.value.code == BoostEndpointErrorCode.TASK_DUPLICATE
assert "lock_key" in exc_info.value.metadata
assert process_calls == 2


def test_boost_add_or_update_task_lock_released_on_success(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from boost_weblate.endpoint import tasks as tasks_mod

user = MagicMock()
monkeypatch.setattr(tasks_mod.User.objects, "get", lambda pk: user)

class FakeService:
def __init__(self, **_kw): # noqa: ANN003
pass

def process_all(self, _submodules, *, user, request=None): # noqa: ANN001
return {}

monkeypatch.setattr(tasks_mod, "BoostComponentService", FakeService)

lock = _FakeLock()
monkeypatch.setattr(
"boost_weblate.utils.task_lock.Lock",
lambda *_a, **_k: lock,
)

tasks_mod.boost_add_or_update_task.run(**_TASK_KWARGS)
assert lock.released is True


def test_build_add_or_update_lock_key_is_stable() -> None:
from boost_weblate.utils.task_lock import build_add_or_update_lock_key

key_a = build_add_or_update_lock_key(
organization="org",
version="boost-1.0",
extensions=[".md", ".adoc"],
add_or_update={"ja": ["json", "asio"], "zh_Hans": ["a"]},
user_id=1,
)
key_b = build_add_or_update_lock_key(
organization="org",
version="boost-1.0",
extensions=[".adoc", ".md"],
add_or_update={"zh_Hans": ["a"], "ja": ["asio", "json"]},
user_id=99,
)
key_c = build_add_or_update_lock_key(
organization="org",
version="boost-2.0",
extensions=[".md", ".adoc"],
add_or_update={"ja": ["json", "asio"], "zh_Hans": ["a"]},
user_id=1,
)

assert key_a == key_b
assert key_a != key_c


def test_boost_endpoint_info_returns_429_when_scoped_throttled(
scoped_low_throttle_rates,
) -> None:
Expand Down
Loading