diff --git a/.gitignore b/.gitignore index 4611596..f8b7d5f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,10 +3,6 @@ __pycache__/ *.py[cod] *$py.class -# Build tools -# we use `Taskfile` -Makefile - # C extensions *.so @@ -118,6 +114,7 @@ venv.bak/ dmypy.json .pyre/ .pytype/ +ty.toml # Linting and formatting .ruff_cache/ diff --git a/.mypy.ini b/.mypy.ini index e4046a3..cda4398 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -5,8 +5,9 @@ exclude = (?x)( | ^tests/ ) local_partial_types = True -no_implicit_reexport = True -python_version = 3.8 +; We target Python 3.9+ for type checking because modern mypy releases +; have dropped support for Python 3.8, even though our library supports 3.8. +python_version = 3.9 strict = True warn_unreachable = True diff --git a/.ruff.toml b/.ruff.toml index d8f7d95..eef858c 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -36,30 +36,33 @@ ignore = [ [lint.flake8-bugbear] extend-immutable-calls = [ "faceit.http.client.env", - "faceit.resources.pagination.MaxPages", "faceit.resources.pagination.pages", ] [lint.flake8-import-conventions] banned-from = [ "asyncio", + "decouple", "httpx", + "inspect", "json", "logging", "math", "os", "re", "reprlib", + "tenacity", "typing", + "warnings", ] [lint.pep8-naming] -classmethod-decorators = ["field_validator"] +classmethod-decorators = ["field_validator", "model_validator"] +ignore-names = ["env", "pages"] [lint.per-file-ignores] "__init__.py" = ["F401", "PLC041"] -"faceit.py" = ["S106"] -"types.py" = ["F401", "ICN003", "PLC041", "PYI018"] +"types.py" = ["E302", "F401", "ICN003", "PLC041", "PYI018"] "scripts/*" = ["T201", "INP001"] "docs/*" = ["ALL"] "examples/*" = ["ALL"] diff --git a/README.md b/README.md index d31710c..b558bf0 100644 --- a/README.md +++ b/README.md @@ -11,27 +11,27 @@ This library makes it easy to access and use data from the FACEIT gaming platfor ## Requirements -- Python 3.8 or higher +- Python 3.8 or higher ## Features -- **High-level, idiomatic API** – Interact with FACEIT as if it were a native Python service. -- **Full type safety** – Compatible with [mypy](https://mypy-lang.org/) and other type checkers. -- **Sync & async support** – Powered by [httpx](https://www.python-httpx.org/). -- **Pydantic models** – All data models inherit from [`pydantic.BaseModel`](https://docs.pydantic.dev/latest/usage/models/). -- **Advanced pagination** – Supports both cursor-based and unix-time-based iterators. -- **Flexible data access** – Choose between raw data and parsed models (e.g., `.raw_players` / `.players`). -- **Page collection utilities** – Paginated responses in model mode are wrapped in an `ItemPage` collection with convenient methods, such as `.map()`, `.filter()`, `.find()`, and more. +- **High-level, idiomatic API** – Interact with FACEIT as if it were a native Python service. +- **Full type safety** – Compatible with [mypy](https://mypy-lang.org/) and other type checkers. +- **Sync & async support** – Powered by [httpx](https://www.python-httpx.org/). +- **Pydantic models** – All data models inherit from [`pydantic.BaseModel`](https://docs.pydantic.dev/latest/usage/models/). +- **Advanced pagination** – Supports both cursor-based and unix-time-based iterators. +- **Flexible data access** – Choose between raw data and parsed models (e.g., `.raw_players` / `.players`). +- **Page collection utilities** – Paginated responses in model mode are wrapped in an `ItemPage` collection with convenient methods, such as `.map()`, `.filter()`, `.find()`, and more. ## Installation -```console +``` pip install faceit ``` You can also install with the `env` extra to enable loading the API key from environment files (details below): -```console +``` pip install faceit[env] ``` @@ -51,9 +51,7 @@ By default, the key is read from the `FACEIT_API_KEY` variable. To use a different variable, pass an instance of `EnvKey` to the constructor: ```py -from faceit import Faceit, EnvKey - -data = Faceit.data(EnvKey("SECRET")) +data = faceit.SyncDataResource(faceit.EnvKey("SECRET")) ``` > [!NOTE] @@ -62,23 +60,23 @@ data = Faceit.data(EnvKey("SECRET")) ### Minimal Example ```py -from faceit import Faceit, GameID +import faceit # Initialize the API client. # If FACEIT_API_KEY is set in your environment, you can omit the argument. -data = Faceit.data() # or Faceit.data("YOUR_API_KEY") +data = faceit.SyncDataResource() # or faceit.SyncDataResource("YOUR_API_KEY") # Fetch player information by nickname. -player = data.players.get("s1mple") +player = data.players.get("m0NESY") # Retrieve all CS2 match history for the player. # Returns an ItemPage collection (fully-featured iterable). -matches = data.players.all_history(player.id, GameID.CS2) +matches = data.players.all_history(player.id, faceit.GameID.CS2) print(f"Total CS2 matches for {player.nickname}: {len(matches)}") # Example: Find a match by its ID. -match_id = "1-441ff69f-09e3-4c58-b5c4-a0a7424fe8e0" +match_id = "1-964ea204-03cf-4292-99f8-44da63968463" some_match = matches.find("id", match_id) if some_match: @@ -87,8 +85,6 @@ else: print(f"No match found with ID {match_id}") ``` -### More Examples - See additional usage examples in the [examples/](examples/) directory. ## Motivation @@ -109,9 +105,9 @@ The goal is to provide a solution approaching enterprise-level quality, while re ### Planned Improvements -- Support for more endpoints and models -- Webhooks and chat API integration -- Complete documentation and usage guides +- Support for more endpoints and models +- Webhooks and chat API integration +- Complete documentation and usage guides --- diff --git a/Taskfile.yml b/Taskfile.yml deleted file mode 100644 index afb5470..0000000 --- a/Taskfile.yml +++ /dev/null @@ -1,107 +0,0 @@ -version: "3" - -vars: - # Use the minimum supported Python version for compatibility checks - PYTHON_VERSION: '{{.PYTHON_VERSION | default "3.8"}}' - -tasks: - default: - silent: true - cmds: - - task -l - - venv: - desc: Create virtual environment with specified Python version - status: - - test -d .venv - cmds: - - uv venv --python {{.PYTHON_VERSION}} - - sync: - desc: Sync dependencies - cmds: - - uv sync --all-extras - - lock: - desc: Update dependency lock file - cmds: - - uv lock - - refresh: - desc: Update lock file and sync dependencies - deps: [lock, sync] - - setup: - desc: Create venv, install dependencies, install pre-commit hooks - deps: [venv, sync, pre-commit-install] - - check: - desc: Run ruff linter - cmds: - - uv run ruff check - - fix: - desc: Auto-fix lint issues with ruff - cmds: - - uv run ruff check --fix - - format: - aliases: [ft] - desc: Format code with ruff - cmds: - - uv run ruff format - - type: - desc: Run mypy type checks - cmds: - - uv run mypy . - - test: - desc: Run tests with pytest - cmds: - - uv run pytest - - all-checks: - aliases: [allc] - desc: Run all quality checks - deps: [check, type, test] - - pre-commit-install: - aliases: [pci] - desc: Install pre-commit hooks - cmds: - - uv run pre-commit install - - pre-commit-run: - aliases: [pcrun] - desc: Run all pre-commit hooks on all files - cmds: - - uv run pre-commit run --all-files - - clean: - aliases: [cl] - desc: Remove caches and temporary files - cmds: - - uv run scripts/clean.py - - dry-clean: - aliases: [dcl] - desc: Show what would be deleted (dry run) - cmds: - - uv run scripts/clean.py --dry-run - - del-venv: - aliases: [dvenv] - desc: Delete Python virtual environment (.venv) and deactivate it - silent: true - preconditions: - - sh: "[ -d .venv ]" - msg: .venv does not exist - cmds: - - deactivate || true - - cmd: cmd /c "rmdir /s /q .venv" - ignore_error: true - platforms: [windows] - - cmd: rm -rf .venv - ignore_error: true - platforms: [linux, darwin] diff --git a/examples/async_championships_custom_client.py b/examples/async_championships_custom_client.py deleted file mode 100644 index 29dc963..0000000 --- a/examples/async_championships_custom_client.py +++ /dev/null @@ -1,27 +0,0 @@ -import asyncio -import faceit - - -async def main(): - async_client = faceit.http.AsyncClient( - "YOUR_API_KEY", - max_concurrent_requests=faceit.MaxConcurrentRequests.ABSOLUTE, - # Or just "max" (=100). The limit is necessary because higher values - # may cause serious issues. The default is 30. - ) - - # You can initially pass the client to the `faceit.AsyncFaceit` - # constructor using the named argument `client`. - async with faceit.AsyncFaceit.data(client=async_client) as data: - championships = await data.championships.all_items( - faceit.GameID.CS2, - faceit.EventCategory.ONGOING, - max_items=250, # Maximum number of items to retrieve; - # default is `pages(30)` - # (i.e., 30 pages * method limit per page (10 in this case) = 300 items). - ) - print(f"Total ongoing CS2 championships: {len(championships)}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/sync_history.py b/examples/sync_history.py deleted file mode 100644 index e3959c2..0000000 --- a/examples/sync_history.py +++ /dev/null @@ -1,10 +0,0 @@ -import faceit - -with faceit.Faceit.data("YOUR_API_KEY") as data: - player = data.players.get("s1mple") - # Returns an `ItemPage` collection (fully-featured iterable) - matches = data.players.all_history(player.id, faceit.GameID.CS2) - print(f"Total CS2 matches for s1mple: {len(matches)}") - # Example: find a match by attribute - some_match = matches.find("id", "some_match_id") - print(f"First match with the given ID: {some_match or 'No match found'}") diff --git a/examples/sync_top200_without_cm.py b/examples/sync_top200_without_cm.py deleted file mode 100644 index 31b476d..0000000 --- a/examples/sync_top200_without_cm.py +++ /dev/null @@ -1,21 +0,0 @@ -import faceit - -data = faceit.Faceit.data("YOUR_API_KEY") - -cs2_rankings = data.raw_rankings.all_unbounded( - faceit.GameID.CS2, - faceit.Region.EUROPE, - # pages (`int`): explicit page count, not item count. - # Used to separate pages from items in logic. - # Total items = limit * pages (pages = `math.ceil(max_items / limit)`). - max_items=faceit.pages(2), # equivalent to `max_items=200` -) - -for player in cs2_rankings: - print( - f"{player['position']} place:\n" - f"Nickname: {player['nickname']}\n" - f"Country: {player['country']}\n" - f"Skill Level: {player['game_skill_level']}\n" - f"Elo: {player['faceit_elo']}\n" - ) diff --git a/pyproject.toml b/pyproject.toml index e7ec026..8b84e74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "faceit" -version = "0.1.5" +version = "0.2.0b1" description = "The Python wrapper for the Faceit API" readme = "README.md" requires-python = ">=3.8" @@ -20,6 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Typing :: Typed", ] dependencies = [ @@ -35,10 +36,6 @@ env = [ "python-decouple>=3.8", ] -[project.urls] -"Bug Tracker" = "https://github.com/zombyacoff/faceit-python/issues" -"Documentation" = "https://docs.faceit.com/docs" - [dependency-groups] dev = [ "mypy>=1.14", @@ -55,6 +52,9 @@ test = [ "pytest-asyncio>=0.21.1", ] +[project.urls] +"Documentation" = "https://docs.faceit.com/docs" + [tool.hatch.build.targets.wheel] packages = ["src/faceit"] diff --git a/scripts/clean.py b/scripts/clean.py index c6b12f6..422d5b5 100644 --- a/scripts/clean.py +++ b/scripts/clean.py @@ -1,5 +1,4 @@ import shutil -import sys from pathlib import Path PYCACHE = "__pycache__" @@ -29,6 +28,8 @@ def find_and_remove_pycache(root: Path = Path(), *, dry_run: bool = False) -> No if __name__ == "__main__": + import sys + is_dry_run = "--dry-run" in sys.argv for dir_ in DIRS: remove_dir(Path(dir_), dry_run=is_dry_run) diff --git a/src/faceit/__init__.py b/src/faceit/__init__.py index ee30b7c..4f7d5f6 100644 --- a/src/faceit/__init__.py +++ b/src/faceit/__init__.py @@ -1,25 +1,49 @@ from importlib.metadata import PackageNotFoundError, version -from .constants import EventCategory as EventCategory -from .constants import ExpandedField as ExpandedField -from .constants import GameID as GameID -from .constants import Region as Region -from .constants import SkillLevel as SkillLevel -from .exceptions import APIError as APIError -from .exceptions import DecoupleMissingError as DecoupleMissingError -from .exceptions import FaceitError as FaceitError -from .exceptions import MissingAuthTokenError as MissingAuthTokenError -from .faceit import AsyncFaceit as AsyncFaceit -from .faceit import Faceit as Faceit -from .http import EnvKey as EnvKey -from .http import MaxConcurrentRequests as MaxConcurrentRequests -from .resources import AsyncPageIterator as AsyncPageIterator -from .resources import CollectReturnFormat as CollectReturnFormat -from .resources import MaxItems as MaxItems -from .resources import MaxPages as MaxPages -from .resources import SyncPageIterator as SyncPageIterator -from .resources import TimestampPaginationConfig as TimestampPaginationConfig -from .resources import pages as pages +from .constants import EventCategory, ExpandedField, GameID, Region, SkillLevel +from .exceptions import ( + APIError, + DecoupleMissingError, + FaceitError, + MissingAuthTokenError, +) +from .faceit import AsyncFaceit, Faceit +from .http import EnvKey, MaxConcurrentRequests +from .resources import ( + AsyncDataResource, + AsyncPageIterator, + CollectReturnFormat, + MaxItems, + SyncDataResource, + SyncPageIterator, + TimestampPaginationConfig, + pages, +) + +__all__ = [ + "APIError", + "AsyncDataResource", + "AsyncFaceit", # deprecated + "AsyncPageIterator", + "CollectReturnFormat", + "DecoupleMissingError", + "EnvKey", + "EventCategory", + "ExpandedField", + "Faceit", # deprecated + "FaceitError", + "GameID", + "MaxConcurrentRequests", + "MaxItems", + "MissingAuthTokenError", + "Region", + "SkillLevel", + "SyncDataResource", + "SyncPageIterator", + "TimestampPaginationConfig", + "__version__", + "pages", +] try: __version__ = version(__package__ or __name__) diff --git a/src/faceit/constants.py b/src/faceit/constants.py index 3bccc8b..808f715 100644 --- a/src/faceit/constants.py +++ b/src/faceit/constants.py @@ -3,10 +3,10 @@ import logging import re import typing +import warnings from dataclasses import dataclass -from operator import attrgetter +from functools import total_ordering from types import MappingProxyType -from warnings import warn from pydantic import Field, validate_call from typing_extensions import Self, TypeAlias @@ -173,11 +173,11 @@ class Region(StrEnum): # NOTE: Currently includes legacy and game-specific regions (e.g., US for CS:GO). # This structure may be refactored in the future for improved consistency. EUROPE = "EU" + UNITED_STATES = "US" NORTH_AMERICA = "NA" + SOUTH_AMERICA = "SA" OCEANIA = "OCE" SOUTHEAST_ASIA = "SEA" - SOUTH_AMERICA = "SA" - UNITED_STATES = "US" @typing.final @@ -206,12 +206,11 @@ def __str__(self) -> str: return f"{self.lower}+" if self.is_open_ended else f"{self.lower}-{self.upper}" -_DEFAULT_FIRST_ELO_RANGE: typing.Final = EloRange(MIN_ELO, 800) _DEFAULT_TEN_LEVEL_LOWER: typing.Final = 2001 def _create_default_elo_tiers() -> _EloThreshold: - tier_ranges = {1: _DEFAULT_FIRST_ELO_RANGE} + tier_ranges = {1: EloRange(MIN_ELO, 800)} for level in range(2, 10): # `cast(int, ...)` tells the type checker that we know `upper` is @@ -228,8 +227,7 @@ def _create_default_elo_tiers() -> _EloThreshold: def _append_elite_tier( - elite_upper_bound: HighTierLevel, - base_tiers: _EloThreshold = _BASE_ELO_RANGES, + elite_upper_bound: HighTierLevel, base_tiers: _EloThreshold = _BASE_ELO_RANGES ) -> _EloThreshold: return { **base_tiers, @@ -237,17 +235,20 @@ def _append_elite_tier( } -CHALLENGER_CAPPED_ELO_RANGES: typing.Final[_EloThreshold] = _append_elite_tier( +CHALLENGER_CAPPED_ELO_RANGES: typing.Final = _append_elite_tier( HighTierLevel.CHALLENGER ) # Pre-generating this range configuration for future implementation needs # Exposed as a constant for both internal use and potential library consumers -OPEN_ENDED_ELO_RANGES: typing.Final[_EloThreshold] = _append_elite_tier( - HighTierLevel.ABSENT -) +OPEN_ENDED_ELO_RANGES: typing.Final = _append_elite_tier(HighTierLevel.ABSENT) del _append_elite_tier -ELO_THRESHOLDS: typing.Final[typing.Mapping[GameID, _EloThreshold]] = MappingProxyType({ +ELO_THRESHOLDS: typing.Final[ + typing.Mapping[ + GameID, + _EloThreshold, + ] +] = MappingProxyType({ GameID.CS2: { 1: EloRange(MIN_ELO, 500), 2: EloRange(501, 750), @@ -271,7 +272,8 @@ def _append_elite_tier( @typing.final -@dataclass(frozen=True) +@total_ordering +@dataclass(eq=False, frozen=True) class SkillLevel: __slots__ = ("elo_range", "game_id", "level", "name") @@ -281,7 +283,12 @@ class SkillLevel: name: str if typing.TYPE_CHECKING: - _registry: typing.ClassVar[typing.Mapping[GameID, typing.Mapping[int, Self]]] + _registry: typing.ClassVar[ + typing.Mapping[ + GameID, + typing.Mapping[int, Self], + ] + ] @property def is_highest_level(self) -> bool: @@ -322,7 +329,7 @@ def progress_percentage( self, elo: int = Field(ge=MIN_ELO), / ) -> typing.Optional[float]: if self.is_highest_level: - warn( + warnings.warn( "Cannot calculate progress percentage for highest level", UserWarning, stacklevel=4, @@ -330,7 +337,7 @@ def progress_percentage( return None if not self.contains_elo(elo): - warn(f"Elo {elo} is out of range", UserWarning, stacklevel=4) + warnings.warn(f"Elo {elo} is out of range", UserWarning, stacklevel=4) return None assert isinstance(self.elo_range.upper, int) @@ -362,11 +369,13 @@ def get_level( elo: typing.Optional[int] = Field(None, ge=MIN_ELO), ) -> typing.Optional[Self]: if game_id not in cls._registry: - warn(f"Game {game_id!r} is not supported", UserWarning, stacklevel=4) + warnings.warn( + f"Game {game_id!r} is not supported", UserWarning, stacklevel=4 + ) return None if level is not None and elo is not None: - warn( + warnings.warn( "Both 'level' and 'elo' parameters provided; 'level' takes precedence", UserWarning, stacklevel=4, @@ -395,11 +404,32 @@ def get_level( @classmethod @validate_call def get_all_levels(cls, game_id: GameID, /) -> typing.List[Self]: - return sorted(cls._registry.get(game_id, {}).values(), key=attrgetter("level")) + return sorted(cls._registry.get(game_id, {}).values()) def __int__(self) -> int: return self.level + def __eq__(self, other: object) -> bool: + # Explicitly defined for performance and clarity. + # By comparing only the core identity fields (`game_id` and `level`) + # instead of all dataclass fields, we reduce the number of operations. + if not isinstance(other, self.__class__): + return NotImplemented + return self.level == other.level and self.game_id == other.game_id + + def __lt__(self, other: Self) -> bool: + if not isinstance(other, self.__class__): + return NotImplemented + if self.game_id != other.game_id: + raise TypeError( + "Cannot compare levels from different games: " + f"'{self.game_id}' vs '{other.game_id}'" + ) + return self.level < other.level + + def __hash__(self) -> int: + return hash((self.game_id, self.level)) + @classmethod def _initialize_skill_levels_registry(cls) -> None: cls._registry = MappingProxyType({ diff --git a/src/faceit/faceit.py b/src/faceit/faceit.py index cbfdbfc..5067772 100644 --- a/src/faceit/faceit.py +++ b/src/faceit/faceit.py @@ -1,142 +1,47 @@ -from __future__ import annotations - import typing -from abc import ABC -from functools import lru_cache -from warnings import warn +import warnings -from typing_extensions import Self +from typing_extensions import deprecated -from .http import AsyncClient, EnvKey, SyncClient +from .http import AsyncClient, SyncClient from .resources import AsyncDataResource, SyncDataResource -from .types import ClientT, DataResourceT, ValidUUID - -if typing.TYPE_CHECKING: - from .http.client import BaseAPIClient - - -@lru_cache(maxsize=None) -def _get_env_key(key: str, /) -> BaseAPIClient.env: - return EnvKey(f"FACEIT_{key.upper()}") +from .types import ClientT, DataResourceT -class BaseFaceit(ABC, typing.Generic[ClientT, DataResourceT]): +class BaseFaceit(typing.Generic[ClientT, DataResourceT]): __slots__ = () if typing.TYPE_CHECKING: - _client_cls: typing.Type[ClientT] _data_cls: typing.Type[DataResourceT] - def __new__(cls) -> Self: - raise TypeError( - f"Cannot instantiate {cls.__name__} directly. " - "Please use the provided classmethods or factory methods (e.g., `data()`) " - "to obtain an instance. Direct instantiation is intentionally disabled " - "to enforce correct usage patterns." - ) - - @typing.overload - @classmethod - def data(cls) -> DataResourceT: ... - - @typing.overload - @classmethod - def data(cls, *, client: ClientT) -> DataResourceT: ... - - @typing.overload @classmethod - def data( - cls, - api_key: typing.Union[ValidUUID, BaseAPIClient.env], - **client_options: typing.Any, - ) -> DataResourceT: ... - - @classmethod - def data( - cls, - api_key: typing.Union[ValidUUID, BaseAPIClient.env, None] = None, - *, - client: typing.Optional[ClientT] = None, - **client_options: typing.Any, - ) -> DataResourceT: - return typing.cast( - "DataResourceT", - cls._data_cls( - cls._initialize_client( - api_key, client, secret_type="api_key", **client_options - ) - ), + def data(cls, *args: typing.Any, **kwargs: typing.Any) -> DataResourceT: + warnings.warn( + f"`{cls.__name__}.data()` is deprecated and will be removed in a future release. " + f"Please instantiate `{cls._data_cls.__name__}` directly.", + category=DeprecationWarning, + stacklevel=2, ) - - # TODO: The client initialization logic should be revisited when support - # for API resources beyond Data is introduced. - @classmethod - def _initialize_client( - cls, - auth: typing.Union[ValidUUID, BaseAPIClient.env, None] = None, - client: typing.Optional[ClientT] = None, - /, - *, - secret_type: str, - **client_options: typing.Any, - ) -> ClientT: - if auth is not None and client is not None: - raise ValueError(f"Provide either {secret_type!r} or 'client', not both") - - if client is None: - return typing.cast( - "ClientT", - cls._client_cls( - _get_env_key(secret_type) if auth is None else auth, - **client_options, - ), - ) - - if client_options: - warn( - "'client_options' are ignored when an existing client " - "instance is provided. Configure your client before " - "passing it to this constructor.", - UserWarning, - stacklevel=3, - ) - - return client + return typing.cast("DataResourceT", cls._data_cls(*args, **kwargs)) @typing.final +@deprecated( + "`Faceit` is deprecated and will be removed in a future release. " + "Use `SyncDataResource` instead." +) class Faceit(BaseFaceit[SyncClient, SyncDataResource]): - """ - Synchronous Faceit API interface. - - Example (Data API):: - - with Faceit.data("YOUR_API_KEY") as data: - player = data.players.get("s1mple") - assert player.nickname == "s1mple" - - This class uses a synchronous HTTP client under the hood. - """ - __slots__ = () - _client_cls = SyncClient + _data_cls = SyncDataResource @typing.final +@deprecated( + "`AsyncFaceit` is deprecated and will be removed in a future release. " + "Use `AsyncDataResource` instead." +) class AsyncFaceit(BaseFaceit[AsyncClient, AsyncDataResource]): - """ - Asynchronous Faceit API interface. - - Example (Data API):: - - async with AsyncFaceit.data("YOUR_API_KEY") as data: - player = await data.players.get("s1mple") - assert player.nickname == "s1mple" - - This class uses an asynchronous HTTP client under the hood. - """ - __slots__ = () - _client_cls = AsyncClient + _data_cls = AsyncDataResource diff --git a/src/faceit/http/__init__.py b/src/faceit/http/__init__.py index 13cdb49..28c7b0a 100644 --- a/src/faceit/http/__init__.py +++ b/src/faceit/http/__init__.py @@ -1,8 +1,14 @@ -from .client import AsyncClient as AsyncClient -from .client import MaxConcurrentRequests as MaxConcurrentRequests -from .client import SyncClient as SyncClient -from .helpers import Endpoint as Endpoint -from .helpers import RetryArgs as RetryArgs -from .helpers import SupportedMethod as SupportedMethod +from .client import AsyncClient, MaxConcurrentRequests, SyncClient +from .helpers import Endpoint, RetryArgs, SupportedMethod + +__all__ = [ + "AsyncClient", + "Endpoint", + "EnvKey", + "MaxConcurrentRequests", + "RetryArgs", + "SupportedMethod", + "SyncClient", +] EnvKey = SyncClient.env diff --git a/src/faceit/http/client.py b/src/faceit/http/client.py index 5d11440..c9ec86b 100644 --- a/src/faceit/http/client.py +++ b/src/faceit/http/client.py @@ -3,37 +3,28 @@ import asyncio import logging import typing +import warnings from abc import ABC from collections import UserString from functools import lru_cache from threading import Lock from time import time from types import MappingProxyType -from warnings import warn from weakref import WeakSet import httpx +import tenacity +import tenacity.asyncio from pydantic import PositiveInt, validate_call -from tenacity import ( - AsyncRetrying, - RetryCallState, - Retrying, - retry_if_exception, - stop_after_attempt, - wait_random_exponential, -) -from tenacity.asyncio import retry_if_exception as async_retry_if_exception -from typing_extensions import Self +from typing_extensions import Never, Self from faceit.constants import BASE_WIKI_URL from faceit.exceptions import APIError, DecoupleMissingError, MissingAuthTokenError from faceit.utils import ( - REDACTED_MARKER, StrEnum, create_uuid_validator, invoke_callable, locked, - noop, representation, ) @@ -47,10 +38,7 @@ ) try: - from decouple import ( # pyright: ignore[reportMissingImports] - UndefinedValueError as EnvUndefinedValueError, - ) - from decouple import config as env_config # pyright: ignore[reportMissingImports] + import decouple # pyright: ignore[reportMissingImports] except ImportError: ENV_EXTRA_INSTALLED: typing.Final = False else: @@ -68,7 +56,7 @@ _logger = logging.getLogger(__name__) _HttpxClientT = typing.TypeVar("_HttpxClientT", httpx.Client, httpx.AsyncClient) -_RetryerT = typing.TypeVar("_RetryerT", Retrying, AsyncRetrying) +_RetryerT = typing.TypeVar("_RetryerT", tenacity.Retrying, tenacity.AsyncRetrying) class MaxConcurrentRequests(StrEnum): @@ -83,30 +71,23 @@ class BaseAPIClient(ABC, typing.Generic[_HttpxClientT, _RetryerT]): __slots__ = ("_api_key", "_build_endpoint", "_retry_args", "base_url") @typing.final - class env(UserString): # noqa: N801 + class env(UserString): """String subclass representing a key to fetch from environment variables.""" __slots__ = () DEFAULT_API_KEY_ENV: typing.ClassVar = env("FACEIT_SECRET") DEFAULT_BASE_URL: typing.ClassVar = "https://open.faceit.com/data/v4" - DEFAULT_TIMEOUT: typing.ClassVar[float] = 10 + DEFAULT_TIMEOUT: typing.ClassVar = 10.0 DEFAULT_RETRY_ARGS: typing.ClassVar = RetryArgs( - stop=stop_after_attempt(3), - wait=wait_random_exponential(1, 10), - retry=retry_if_exception( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_random_exponential(1, 10), + retry=tenacity.retry_if_exception( lambda e: isinstance( e, - ( - httpx.TimeoutException, - httpx.ConnectError, - httpx.RemoteProtocolError, - ), - ) - or ( - isinstance(e, httpx.HTTPStatusError) - and is_retryable_status(e.response.status_code) + (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError), ) + or (isinstance(e, APIError) and is_retryable_status(e.status_code)) ), reraise=True, before_sleep=lambda s: _logger.warning( @@ -137,13 +118,12 @@ def __init__( ) -> None: self.base_url = base_url.rstrip("/") self._api_key_setter(api_key) - self._retry_args_setter(retry_args or {}) - + self._retry_args_setter(retry_args or RetryArgs()) self._build_endpoint = lru_cache(self._build_endpoint_unwrapped) @property def api_key(self) -> str: - return REDACTED_MARKER + return self._api_key[:4] + "..." + self._api_key[-4:] @api_key.setter def api_key(self, value: typing.Union[ValidUUID, env]) -> None: @@ -166,8 +146,6 @@ def raw_client(self) -> _HttpxClientT: @property def is_closed(self) -> bool: - # Defensive check to handle cases where client initialization failed - # Prevents AttributeError in __del__ during garbage collection return self._client.is_closed if hasattr(self, "_client") else True @property @@ -213,10 +191,9 @@ def _get_secret_from_env(key: str, /) -> str: if not ENV_EXTRA_INSTALLED: raise DecoupleMissingError try: - return typing.cast( # because `decouple` is untyped - "str", env_config(key) - ) - except EnvUndefinedValueError: + # because `decouple` is untyped + return typing.cast("str", decouple.config(key)) + except decouple.UndefinedValueError: raise MissingAuthTokenError(key) from None @staticmethod @@ -228,20 +205,18 @@ def _handle_response(response: httpx.Response, /) -> RawAPIResponse: # TODO: More specific exceptions except httpx.HTTPStatusError as e: # fmt: off - # NOTE: If the error is retryable, tenacity will retry unless attempts are exhausted. - # In that case, tenacity raises `RetryError` (not wrapped as `APIError`) for the caller. if is_retryable_status(e.response.status_code): _logger.warning( "Retryable HTTP error %s at %s: %s", e.response.status_code, e.response.url, e.response.text ) - raise - _logger.exception( - "HTTP error %s at %s: %s", - response.status_code, response.url, response.text, - ) - # fmt: on + else: + _logger.exception( + "HTTP error %s at %s: %s", + response.status_code, response.url, response.text, + ) raise APIError(response.status_code, response.text) from e + # fmt: on except (ValueError, httpx.DecodingError): _logger.exception( "Invalid JSON response from %s: %s", response.url, response.text @@ -249,7 +224,7 @@ def _handle_response(response: httpx.Response, /) -> RawAPIResponse: raise APIError(response.status_code, "Invalid JSON response") from None -class _BaseSyncClient(BaseAPIClient[httpx.Client, Retrying]): +class _BaseSyncClient(BaseAPIClient[httpx.Client, tenacity.Retrying]): __slots__ = ("_client", "_retryer") def __init__( @@ -267,7 +242,7 @@ def __init__( self._client = httpx.Client( timeout=timeout, headers=self._base_headers, **raw_client_kwargs ) - self._retryer = Retrying(**self._retry_args) # type: ignore[arg-type] + self._retryer = tenacity.Retrying(**self._retry_args) # type: ignore[arg-type] def close(self) -> None: if not self.is_closed: @@ -294,7 +269,7 @@ def __exit__(self, *_: object, **__: object) -> None: # NOTE: Logic on eliminating SSL errors was added because during tests # it was found that such errors often pop up even with a small # number of concurrent requests, probably problems on the FACEIT API side -class _BaseAsyncClient(BaseAPIClient[httpx.AsyncClient, AsyncRetrying]): +class _BaseAsyncClient(BaseAPIClient[httpx.AsyncClient, tenacity.AsyncRetrying]): __slots__ = ("__weakref__", "_client", "_retryer") _instances: typing.ClassVar[WeakSet[_BaseAsyncClient]] = WeakSet() @@ -305,7 +280,7 @@ class _BaseAsyncClient(BaseAPIClient[httpx.AsyncClient, AsyncRetrying]): _ssl_error_count: typing.ClassVar = 0 _adaptive_limit_enabled: typing.ClassVar = True _last_ssl_error_time: typing.ClassVar = time() - _recovery_check_time: typing.ClassVar[float] = 0 + _recovery_check_time: typing.ClassVar = 0.0 # Current limit value is based on empirical testing, # but requires further investigation for optimal setting @@ -322,7 +297,7 @@ class _BaseAsyncClient(BaseAPIClient[httpx.AsyncClient, AsyncRetrying]): _min_connections: typing.ClassVar = DEFAULT_MIN_CONNECTIONS _recovery_interval: typing.ClassVar = DEFAULT_RECOVERY_INTERVAL - DEFAULT_KEEPALIVE_EXPIRY: typing.ClassVar[float] = 30 + DEFAULT_KEEPALIVE_EXPIRY: typing.ClassVar = 30.0 def __init__( self, @@ -366,7 +341,7 @@ def __init__( limits=limits, **raw_client_kwargs, ) - self._retryer = AsyncRetrying(**self._retry_args) # type: ignore[arg-type] + self._retryer = tenacity.AsyncRetrying(**self._retry_args) # type: ignore[arg-type] # Initialize or update the semaphore if needed if ( @@ -389,7 +364,7 @@ async def combined_retry(exception: BaseException) -> bool: self.__class__._register_ssl_error() if is_ssl_error(exception) else await invoke_callable( - original_retry.predicate # type: ignore[attr-defined] + original_retry.predicate if isinstance(original_retry, SupportsExceptionPredicate) else original_retry, exception, @@ -400,7 +375,7 @@ async def combined_retry(exception: BaseException) -> bool: lambda _: None ) - async def ssl_before_sleep(retry_state: RetryCallState) -> None: + async def ssl_before_sleep(retry_state: tenacity.RetryCallState) -> None: if retry_state.outcome is None: return @@ -408,14 +383,14 @@ async def ssl_before_sleep(retry_state: RetryCallState) -> None: if exception is not None and is_ssl_error(exception): _logger.warning( "SSL connection error to %s", - str(retry_state.args[0] if retry_state.args else "unknown"), + retry_state.args[0] if retry_state.args else "unknown", ) await asyncio.sleep(0.5) await invoke_callable(original_before_sleep, retry_state) self._retry_args.update({ - "retry": async_retry_if_exception(combined_retry), + "retry": tenacity.asyncio.retry_if_exception(combined_retry), "before_sleep": ssl_before_sleep, }) @@ -514,11 +489,10 @@ async def _check_connection_recovery(cls) -> None: def _update_initial_max_requests( cls, value: typing.Union[MaxConcurrentRequests, PositiveInt], / ) -> int: - max_concurrent_requests = typing.cast( - "int", + max_concurrent_requests = ( cls.MAX_CONCURRENT_REQUESTS_ABSOLUTE if value == MaxConcurrentRequests.ABSOLUTE - else value, + else value ) if max_concurrent_requests > cls._initial_max_requests: cls._initial_max_requests = max_concurrent_requests @@ -526,7 +500,7 @@ def _update_initial_max_requests( return max_concurrent_requests @classmethod - def close(cls) -> typing.NoReturn: + def close(cls) -> Never: """ This method intentionally raises an error to prevent incorrect usage. @@ -546,7 +520,7 @@ async def close_all(cls) -> None: @validate_call def update_rate_limit(cls, new_limit: PositiveInt, /) -> None: if new_limit > cls.MAX_CONCURRENT_REQUESTS_ABSOLUTE: - warn( + warnings.warn( f"Request limit of {new_limit} exceeds " f"maximum allowed ({cls.MAX_CONCURRENT_REQUESTS_ABSOLUTE})", UserWarning, @@ -609,10 +583,11 @@ def configure_adaptive_limits( cls._recovery_interval, ) - def __enter__(self) -> typing.NoReturn: + def __enter__(self) -> Never: raise RuntimeError("Use 'async with' instead.") - __exit__ = noop + def __exit__(self, *_: object, **__: object) -> None: + pass async def __aenter__(self) -> Self: return self diff --git a/src/faceit/http/helpers.py b/src/faceit/http/helpers.py index 513fde1..71e3a8d 100644 --- a/src/faceit/http/helpers.py +++ b/src/faceit/http/helpers.py @@ -9,15 +9,12 @@ from faceit.utils import StrEnum, UnsupportedOperationTypeError, representation if typing.TYPE_CHECKING: - from tenacity import RetryCallState, RetryError - from tenacity.retry import retry_base - from tenacity.stop import stop_base - from tenacity.wait import wait_base + import tenacity from faceit.types import EndpointParam _RetryHook: TypeAlias = typing.Callable[ - [RetryCallState], typing.Union[typing.Awaitable[None], None] + [tenacity.RetryCallState], typing.Union[typing.Awaitable[None], None] ] @@ -26,20 +23,27 @@ class RetryArgs(typing.TypedDict, total=False): sleep: typing.Callable[ [typing.Union[int, float]], typing.Union[typing.Awaitable[None], None] ] - stop: typing.Union[stop_base, typing.Callable[[RetryCallState], bool]] + stop: typing.Union[ + tenacity.stop.stop_base, typing.Callable[[tenacity.RetryCallState], bool] + ] wait: typing.Union[ - wait_base, typing.Callable[[RetryCallState], typing.Union[float, int]] + tenacity.wait.wait_base, + typing.Callable[[tenacity.RetryCallState], typing.Union[float, int]], ] retry: typing.Union[ - retry_base, - typing.Callable[[RetryCallState], typing.Union[typing.Awaitable[bool], bool]], + tenacity.retry_base, + typing.Callable[ + [tenacity.RetryCallState], typing.Union[typing.Awaitable[bool], bool] + ], ] before: _RetryHook after: _RetryHook before_sleep: typing.Optional[_RetryHook] reraise: bool - retry_error_cls: typing.Type[RetryError] - retry_error_callback: typing.Optional[typing.Callable[[RetryCallState], typing.Any]] + retry_error_cls: typing.Type[tenacity.RetryError] + retry_error_callback: typing.Optional[ + typing.Callable[[tenacity.RetryCallState], typing.Any] + ] @typing.runtime_checkable @@ -75,25 +79,28 @@ def __str__(self) -> str: ) def __truediv__(self, other: EndpointParam) -> Self: - if isinstance(other, str): - return self.add(other) if isinstance(other, self.__class__): return self.__class__(*self.path_parts, *other.path_parts, base=self.base) - raise UnsupportedOperationTypeError( - "/", self.__class__.__name__, type(other).__name__ - ) + try: + return self.add(str(other)) + except (TypeError, ValueError): + raise UnsupportedOperationTypeError( + "/", self.__class__.__name__, type(other).__name__ + ) from None def __itruediv__(self, other: EndpointParam) -> Self: - if isinstance(other, str): - if other: - self.path_parts.append(other) - return self if isinstance(other, self.__class__): self.path_parts.extend(other.path_parts) return self - raise UnsupportedOperationTypeError( - "/=", self.__class__.__name__, type(other).__name__ - ) + try: + other_str = str(other) + except (TypeError, ValueError): + raise UnsupportedOperationTypeError( + "/=", self.__class__.__name__, type(other).__name__ + ) from None + if other_str: + self.path_parts.append(other_str) + return self def is_ssl_error(exception: BaseException, /) -> bool: diff --git a/src/faceit/models/__init__.py b/src/faceit/models/__init__.py index 9050687..55874cd 100644 --- a/src/faceit/models/__init__.py +++ b/src/faceit/models/__init__.py @@ -1,9 +1,23 @@ -from .championships import Championship as Championship -from .item_page import ItemPage as ItemPage -from .players import BanEntry as BanEntry -from .players import CS2MatchPlayerStats as CS2MatchPlayerStats -from .players import GeneralTeam as GeneralTeam -from .players import Hub as Hub -from .players import Match as Match -from .players import Player as Player -from .players import Tournament as Tournament +from .championships import Championship +from .item_page import ItemPage +from .players import ( + BanEntry, + CS2MatchPlayerStats, + GeneralTeam, + Hub, + Match, + Player, + Tournament, +) + +__all__ = [ + "BanEntry", + "CS2MatchPlayerStats", + "Championship", + "GeneralTeam", + "Hub", + "ItemPage", + "Match", + "Player", + "Tournament", +] diff --git a/src/faceit/models/custom_types/__init__.py b/src/faceit/models/custom_types/__init__.py index 84c58f0..6a003d5 100644 --- a/src/faceit/models/custom_types/__init__.py +++ b/src/faceit/models/custom_types/__init__.py @@ -1,7 +1,17 @@ -from .common import CountryCode as CountryCode -from .common import LangFormattedAnyHttpUrl as LangFormattedAnyHttpUrl -from .common import NullableList as NullableList -from .common import ResponseContainer as ResponseContainer -from .faceit_uuid import FaceitID as FaceitID -from .faceit_uuid import FaceitMatchID as FaceitMatchID -from .faceit_uuid import FaceitTeamID as FaceitTeamID +from .common import ( + CountryCode, + LangFormattedAnyHttpUrl, + NullableList, + ResponseContainer, +) +from .faceit_uuid import FaceitID, FaceitMatchID, FaceitTeamID + +__all__ = [ + "CountryCode", + "FaceitID", + "FaceitMatchID", + "FaceitTeamID", + "LangFormattedAnyHttpUrl", + "NullableList", + "ResponseContainer", +] diff --git a/src/faceit/models/custom_types/common.py b/src/faceit/models/custom_types/common.py index 5295b58..8ae0a6a 100644 --- a/src/faceit/models/custom_types/common.py +++ b/src/faceit/models/custom_types/common.py @@ -2,7 +2,13 @@ import typing -from pydantic import AfterValidator, AnyHttpUrl, GetCoreSchemaHandler, RootModel +from pydantic import ( + AfterValidator, + AnyHttpUrl, + GetCoreSchemaHandler, + RootModel, + model_validator, +) from pydantic_core import core_schema from pydantic_extra_types.country import CountryAlpha2 from typing_extensions import Annotated, TypeAlias @@ -19,7 +25,7 @@ CountryCode: TypeAlias = Annotated[ # I assume that there must be a better implementation than this. # It is necessary to study this issue in more detail. - CountryAlpha2, AfterValidator(lambda x: typing.cast("str", x).lower()) # noqa: TC008 + CountryAlpha2, AfterValidator(lambda x: x.lower()) ] @@ -50,12 +56,23 @@ def get( def __getattr__(self, name: str) -> typing.Optional[_T]: return self.root.get(name) - def __iter__(self) -> typing.Generator[typing.Tuple[str, _T], None, None]: # noqa: PYI058 - yield from self.items() + def __iter__(self) -> typing.Iterator[str]: # type: ignore[override] + yield from self.root def __getitem__(self, key: str) -> _T: return self.root[key] + @model_validator(mode="before") + def _inject_keys(cls, data: typing.Any) -> typing.Any: + return ( + { + k: {**v, "_container_key": k} if isinstance(v, dict) else v + for k, v in data.items() + } + if isinstance(data, dict) + else data + ) + @typing.final class LangFormattedAnyHttpUrl: diff --git a/src/faceit/models/item_page.py b/src/faceit/models/item_page.py index 56944af..f62e9fc 100644 --- a/src/faceit/models/item_page.py +++ b/src/faceit/models/item_page.py @@ -3,7 +3,7 @@ import typing from functools import cached_property from itertools import chain, starmap -from random import choice +from random import choice as random_choice from pydantic import BaseModel, Field, field_validator from typing_extensions import Annotated, Self, TypeAlias @@ -82,33 +82,44 @@ def find_all(self, attr: str, value: typing.Any) -> typing.List[_T]: return list(self._find_items(attr, value)) @typing.overload - def first(self) -> typing.Optional[_T]: ... + def get_first(self) -> typing.Optional[_T]: ... @typing.overload - def first(self, default: _R, /) -> typing.Union[_T, _R]: ... + def get_first(self, default: _R, /) -> typing.Union[_T, _R]: ... - def first(self, default: typing.Optional[_R] = None) -> typing.Union[_T, _R, None]: + def get_first( + self, default: typing.Optional[_R] = None + ) -> typing.Union[_T, _R, None]: return self[0] if self else default @typing.overload - def last(self) -> typing.Optional[_T]: ... + def get_last(self) -> typing.Optional[_T]: ... @typing.overload - def last(self, default: _R, /) -> typing.Union[_T, _R]: ... + def get_last(self, default: _R, /) -> typing.Union[_T, _R]: ... - def last(self, default: typing.Optional[_R] = None) -> typing.Union[_T, _R, None]: + def get_last( + self, default: typing.Optional[_R] = None + ) -> typing.Union[_T, _R, None]: return self[-1] if self else default @typing.overload - def random(self) -> typing.Optional[_T]: ... + def get_random(self) -> typing.Optional[_T]: ... @typing.overload - def random(self, default: _R, /) -> typing.Union[_T, _R]: ... + def get_random(self, default: _R, /) -> typing.Union[_T, _R]: ... - def random(self, default: typing.Optional[_R] = None) -> typing.Union[_T, _R, None]: + def get_random( + self, default: typing.Optional[_R] = None + ) -> typing.Union[_T, _R, None]: # Intentionally using non-cryptographic RNG as this is for # convenience sampling rather than security-sensitive operations - return choice(self) if self else default # noqa: S311 + return random_choice(self) if self else default # noqa: S311 + + # aliases for backwards compatibility + first = get_first + last = get_last + random = get_random def map(self, func: typing.Callable[[_T], _R], /) -> ItemPage[_R]: return self.__class__._construct_without_metadata(map(func, self)) @@ -135,7 +146,7 @@ def _construct_without_metadata( # methods like `map()` that change the item type from `_T` to `_R` ) -> ItemPage[_R]: return cls.model_construct( # type: ignore[return-value] - items=items or (), + items=tuple(items or ()), _offset=UnsetValue.UNSET, _limit=UnsetValue.UNSET, _time_from=UnsetValue.UNSET, _time_to=UnsetValue.UNSET, ) @@ -148,16 +159,15 @@ def __len__(self) -> int: return len(self.items) def __reversed__(self) -> Self: - return self.with_items(tuple(reversed(self))) + return self.with_items(reversed(self.items)) def __reduce__( self, ) -> typing.Tuple[typing.Type[Self], typing.Tuple[typing.Any, ...]]: - # fmt: off - return (self.__class__, ( - self.items, self._offset, self._limit, self._time_from, self._time_to, - )) - # fmt: on + return ( + self.__class__, + (self.items, self._offset, self._limit, self._time_from, self._time_to), + ) @typing.overload def __getitem__(self, index: typing.SupportsIndex) -> _T: ... @@ -180,7 +190,7 @@ def __getitem__( f"__index__ or slices, not {type(index).__name__}" ) from e - def __contains__(self, item: typing.Any) -> bool: + def __contains__(self, item: _T) -> bool: return item in self.items def __bool__(self) -> bool: diff --git a/src/faceit/models/players/__init__.py b/src/faceit/models/players/__init__.py index d4de648..5280ded 100644 --- a/src/faceit/models/players/__init__.py +++ b/src/faceit/models/players/__init__.py @@ -1,13 +1,26 @@ -from .general import BanEntry as BanEntry -from .general import GameInfo as GameInfo -from .general import GeneralTeam as GeneralTeam -from .general import Hub as Hub -from .general import Player as Player -from .general import Tournament as Tournament -from .match import CS2MatchPlayerStats as CS2MatchPlayerStats -from .match import Match as Match -from .match import PlayerSummary as PlayerSummary -from .match import Results as Results -from .match import Score as Score -from .match import Team as Team -from .match import Teams as Teams +from .general import BanEntry, GameInfo, GeneralTeam, Hub, Player, Tournament +from .match import ( + CS2MatchPlayerStats, + Match, + PlayerSummary, + Results, + Score, + Team, + Teams, +) + +__all__ = [ + "BanEntry", + "CS2MatchPlayerStats", + "GameInfo", + "GeneralTeam", + "Hub", + "Match", + "Player", + "PlayerSummary", + "Results", + "Score", + "Team", + "Teams", + "Tournament", +] diff --git a/src/faceit/models/players/general.py b/src/faceit/models/players/general.py index aa7a0a4..54bf5cf 100644 --- a/src/faceit/models/players/general.py +++ b/src/faceit/models/players/general.py @@ -1,10 +1,11 @@ import typing from datetime import datetime +from enum import IntEnum -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator from typing_extensions import Annotated -from faceit.constants import GameID +from faceit.constants import ELO_THRESHOLDS, GameID, SkillLevel from faceit.models.custom_types import ( FaceitID, LangFormattedAnyHttpUrl, @@ -13,19 +14,53 @@ from faceit.types import RegionIdentifier, UrlOrEmpty +class MatchResult(IntEnum): + LOSE = 0 + WIN = 1 + + class GameInfo(BaseModel): region: RegionIdentifier game_player_id: str - level: Annotated[int, Field(alias="skill_level")] + level: Annotated[typing.Union[int, SkillLevel], Field(alias="skill_level")] elo: Annotated[int, Field(alias="faceit_elo")] game_player_name: str # This attribute appears to be deprecated and is no longer provided # by the API. Remove only if you have confirmed that "skill_level_label" # is not returned in any current responses. - # level_label: str = Field(alias="skill_level_label") - regions: ResponseContainer[RegionIdentifier] = ResponseContainer({}) + # level_label: Annotated[str, Field(alias="skill_level_label")] + # regions: ResponseContainer[RegionIdentifier] = ResponseContainer({}) game_profile_id: str + @model_validator(mode="before") + def _prepare_skill_level(cls, data: typing.Any) -> typing.Any: + if not isinstance(data, dict): + return data + + game_id = data.get("_container_key") + skill_lvl = data.get("skill_level") + + if ( + game_id is not None + and game_id in ELO_THRESHOLDS + and skill_lvl is not None + and skill_lvl in ELO_THRESHOLDS.values() + # Just in case; It may not be necessary at all + and not isinstance(skill_lvl, SkillLevel) + ): + resolved = SkillLevel.get_level(game_id, skill_lvl) + assert resolved is not None, ( + "`resolved` cannot be None because `game_id` was already validated " + "to be present in `ELO_THRESHOLDS`" + ) + data["skill_level"] = resolved + + return data + + +class PlayerSettings(BaseModel): + language: str + class Player(BaseModel): id: Annotated[FaceitID, Field(alias="player_id")] @@ -35,7 +70,7 @@ class Player(BaseModel): cover_image: UrlOrEmpty platforms: typing.Optional[ResponseContainer[str]] games: ResponseContainer[GameInfo] - settings: ResponseContainer[typing.Any] + settings: PlayerSettings friends_ids: typing.List[FaceitID] new_steam_id: str steam_id_64: str @@ -43,8 +78,8 @@ class Player(BaseModel): memberships: typing.List[str] faceit_url: LangFormattedAnyHttpUrl membership_type: str - cover_featured_image: str - infractions: ResponseContainer[typing.Any] + cover_featured_image: UrlOrEmpty + # infractions: ResponseContainer[typing.Any] verified: bool activated_at: datetime @@ -108,135 +143,143 @@ class Tournament(BaseModel): faceit_url: LangFormattedAnyHttpUrl -class LifetimeStats(BaseModel): - # TODO - total_1v1_wins: Annotated[str, Field(alias="Total 1v1 Wins")] - win_rate: Annotated[str, Field(alias="Win Rate %")] - total_sniper_kills: Annotated[str, Field(alias="Total Sniper Kills")] - longest_win_streak: Annotated[str, Field(alias="Longest Win Streak")] - flash_success_rate: Annotated[str, Field(alias="Flash Success Rate")] - total_flash_count: Annotated[str, Field(alias="Total Flash Count")] - utility_success_rate: Annotated[str, Field(alias="Utility Success Rate")] - total_damage: Annotated[str, Field(alias="Total Damage")] - total_1v2_count: Annotated[str, Field(alias="Total 1v2 Count")] - average_kd_ratio: Annotated[str, Field(alias="Average K/D Ratio")] - wins: str - sniper_kill_rate: Annotated[str, Field(alias="Sniper Kill Rate")] - total_rounds_with_extended_stats: Annotated[ - str, Field(alias="Total Rounds with extended stats") - ] - kd_ratio: Annotated[str, Field(alias="K/D Ratio")] - utility_damage_success_rate: Annotated[ - str, Field(alias="Utility Damage Success Rate") +class CSLifetimeStats(BaseModel): # `GameID.CS2` & `GameID.CSGO` + adr: Annotated[float, Field(0, alias="ADR")] + average_headshots_percentage: Annotated[int, Field(alias="Average Headshots %")] + average_kd_ratio: Annotated[float, Field(alias="Average K/D Ratio")] + current_win_streak: Annotated[int, Field(alias="Current Win Streak")] + enemies_flashed_per_round: Annotated[ + float, Field(0.0, alias="Enemies Flashed per Round") ] - total_entry_count: Annotated[str, Field(alias="Total Entry Count")] - average_headshots_percentage: Annotated[str, Field(alias="Average Headshots %")] - total_utility_successes: Annotated[str, Field(alias="Total Utility Successes")] - v2_win_rate: Annotated[str, Field(alias="1v2 Win Rate")] - total_kills_with_extended_stats: Annotated[ - str, Field(alias="Total Kills with extended stats") + entry_rate: Annotated[float, Field(0.0, alias="Entry Rate")] + entry_success_rate: Annotated[float, Field(0.0, alias="Entry Success Rate")] + flash_success_rate: Annotated[float, Field(0.0, alias="Flash Success Rate")] + flashes_per_round: Annotated[float, Field(0.0, alias="Flashes per Round")] + kd_ratio: Annotated[float, Field(alias="K/D Ratio")] + longest_win_streak: Annotated[int, Field(alias="Longest Win Streak")] + matches: Annotated[int, Field(alias="Matches")] + recent_results: Annotated[ + typing.List[MatchResult], Field(alias="Recent Results", max_length=5) ] - matches: str - entry_success_rate: Annotated[str, Field(alias="Entry Success Rate")] - total_utility_damage: Annotated[str, Field(alias="Total Utility Damage")] - total_entry_wins: Annotated[str, Field(alias="Total Entry Wins")] - current_win_streak: Annotated[str, Field(alias="Current Win Streak")] - utility_usage_per_round: Annotated[str, Field(alias="Utility Usage per Round")] - recent_results: Annotated[typing.List[str], Field(alias="Recent Results")] - total_1v1_count: Annotated[str, Field(alias="Total 1v1 Count")] - total_headshots_percentage: Annotated[str, Field(alias="Total Headshots %")] - enemies_flashed_per_round: Annotated[str, Field(alias="Enemies Flashed per Round")] - flashes_per_round: Annotated[str, Field(alias="Flashes per Round")] + sniper_kill_rate: Annotated[float, Field(0.0, alias="Sniper Kill Rate")] sniper_kill_rate_per_round: Annotated[ - str, Field(alias="Sniper Kill Rate per Round") + float, Field(0.0, alias="Sniper Kill Rate per Round") ] - adr: Annotated[str, Field(alias="ADR")] - v1_win_rate: Annotated[str, Field(alias="1v1 Win Rate")] - total_utility_count: Annotated[str, Field(alias="Total Utility Count")] - total_flash_successes: Annotated[str, Field(alias="Total Flash Successes")] - total_1v2_wins: Annotated[str, Field(alias="Total 1v2 Wins")] - total_matches: Annotated[str, Field(alias="Total Matches")] - entry_rate: Annotated[str, Field(alias="Entry Rate")] - total_enemies_flashed: Annotated[str, Field(alias="Total Enemies Flashed")] - utility_damage_per_round: Annotated[str, Field(alias="Utility Damage per Round")] - - -class MapStats(BaseModel): - # TODO значения по умолчанию для полей "_with_extended_stats" = None (или 0?), - # так как они отсутсвтуют для старых матчей (предположу, что появились только в районе лета 2024) - utility_success_rate: Annotated[float, Field(alias="Utility Success Rate")] - entry_success_rate: Annotated[float, Field(alias="Entry Success Rate")] - total_entry_wins: Annotated[int, Field(alias="Total Entry Wins")] - # NOTE посмотреть матчи за 2023 ? + total_1v1_count: Annotated[int, Field(0, alias="Total 1v1 Count")] + total_1v1_wins: Annotated[int, Field(0, alias="Total 1v1 Wins")] + total_1v2_count: Annotated[int, Field(0, alias="Total 1v2 Count")] + total_1v2_wins: Annotated[int, Field(0, alias="Total 1v2 Wins")] + total_damage: Annotated[int, Field(0, alias="Total Damage")] + total_enemies_flashed: Annotated[int, Field(0, alias="Total Enemies Flashed")] + total_entry_count: Annotated[int, Field(0, alias="Total Entry Count")] + total_entry_wins: Annotated[int, Field(0, alias="Total Entry Wins")] + total_flash_count: Annotated[int, Field(0, alias="Total Flash Count")] + total_flash_successes: Annotated[int, Field(0, alias="Total Flash Successes")] + total_headshots_percentage: Annotated[int, Field(alias="Total Headshots %")] + total_kills_with_extended_stats: Annotated[ + int, Field(0, alias="Total Kills with extended stats") + ] + total_matches: Annotated[int, Field(0, alias="Total Matches")] total_rounds_with_extended_stats: Annotated[ - int, Field(alias="Total Rounds with extended stats") + int, Field(0, alias="Total Rounds with extended stats") ] + total_sniper_kills: Annotated[int, Field(0, alias="Total Sniper Kills")] + total_utility_count: Annotated[int, Field(0, alias="Total Utility Count")] + total_utility_damage: Annotated[int, Field(0, alias="Total Utility Damage")] + total_utility_successes: Annotated[int, Field(0, alias="Total Utility Successes")] + utility_damage_per_round: Annotated[ + float, Field(0.0, alias="Utility Damage per Round") + ] + utility_damage_success_rate: Annotated[ + float, Field(0.0, alias="Utility Damage Success Rate") + ] + utility_success_rate: Annotated[float, Field(0.0, alias="Utility Success Rate")] + utility_usage_per_round: Annotated[ + float, Field(0.0, alias="Utility Usage per Round") + ] + v1_win_rate: Annotated[float, Field(0.0, alias="1v1 Win Rate")] + v2_win_rate: Annotated[float, Field(0.0, alias="1v2 Win Rate")] + win_rate: Annotated[int, Field(alias="Win Rate %")] # in percentage + wins: Annotated[int, Field(alias="Wins")] + + +class CSMapStats(BaseModel): # `GameID.CS2` & `GameID.CSGO` + # TODO: Преобразование в проценты (*100) таких полей, как "v2_win_rate", "v1_win_rate", ... ? + adr: Annotated[float, Field(0.0, alias="ADR")] + assists: Annotated[int, Field(alias="Assists")] + average_assists: Annotated[float, Field(alias="Average Assists")] + average_deaths: Annotated[float, Field(alias="Average Deaths")] + average_headshots_percentage: Annotated[float, Field(alias="Average Headshots %")] + average_kd_ratio: Annotated[float, Field(alias="Average K/D Ratio")] + average_kills: Annotated[float, Field(alias="Average Kills")] + average_kr_ratio: Annotated[float, Field(alias="Average K/R Ratio")] + average_mvps: Annotated[float, Field(alias="Average MVPs")] + average_penta_kills: Annotated[float, Field(alias="Average Penta Kills")] + average_quadro_kills: Annotated[float, Field(alias="Average Quadro Kills")] + average_triple_kills: Annotated[float, Field(alias="Average Triple Kills")] deaths: Annotated[int, Field(alias="Deaths")] - # TODO преобразование в проценты (*100) таких полей, как "v2_win_rate", "v1_win_rate", ... - v2_win_rate: Annotated[float, Field(alias="1v2 Win Rate")] - total_kills_with_extended_stats: Annotated[ - int, Field(alias="Total Kills with extended stats") + enemies_flashed_per_round: Annotated[ + float, Field(0.0, alias="Enemies Flashed per Round") ] - v1_win_rate: Annotated[float, Field(alias="1v1 Win Rate")] - total_enemies_flashed: Annotated[int, Field(alias="Total Enemies Flashed")] + entry_rate: Annotated[float, Field(0.0, alias="Entry Rate")] + entry_success_rate: Annotated[float, Field(0.0, alias="Entry Success Rate")] + flash_success_rate: Annotated[float, Field(0.0, alias="Flash Success Rate")] + flashes_per_round: Annotated[float, Field(0.0, alias="Flashes per Round")] + headshots: Annotated[int, Field(alias="Headshots")] + headshots_per_match: Annotated[float, Field(alias="Headshots per Match")] + kd_ratio: Annotated[float, Field(alias="K/D Ratio")] + kills: Annotated[int, Field(alias="Kills")] + kr_ratio: Annotated[float, Field(alias="K/R Ratio")] + matches: Annotated[int, Field(alias="Matches")] mvps: Annotated[int, Field(alias="MVPs")] + penta_kills: Annotated[int, Field(alias="Penta Kills")] + quadro_kills: Annotated[int, Field(alias="Quadro Kills")] rounds: Annotated[int, Field(alias="Rounds")] - average_assists: Annotated[float, Field(alias="Average Assists")] - sniper_kill_rate: Annotated[float, Field(alias="Sniper Kill Rate")] - - # TODO корректную типизацию полей - average_triple_kills: Annotated[str, Field(alias="Average Triple Kills")] - average_quadro_kills: Annotated[str, Field(alias="Average Quadro Kills")] - penta_kills: Annotated[str, Field(alias="Penta Kills")] - total_flash_successes: Annotated[str, Field(alias="Total Flash Successes")] - average_headshots_percentage: Annotated[str, Field(alias="Average Headshots %")] - average_kd_ratio: Annotated[str, Field(alias="Average K/D Ratio")] - enemies_flashed_per_round: Annotated[str, Field(alias="Enemies Flashed per Round")] - total_flash_count: Annotated[str, Field(alias="Total Flash Count")] - quadro_kills: Annotated[str, Field(alias="Quadro Kills")] - flashes_per_round: Annotated[str, Field(alias="Flashes per Round")] - total_headshots_percentage: Annotated[str, Field(alias="Total Headshots %")] - total_1v1_wins: Annotated[str, Field(alias="Total 1v1 Wins")] - average_deaths: Annotated[str, Field(alias="Average Deaths")] - kills: Annotated[str, Field(alias="Kills")] - flash_success_rate: Annotated[str, Field(alias="Flash Success Rate")] - entry_rate: Annotated[str, Field(alias="Entry Rate")] + sniper_kill_rate: Annotated[float, Field(0.0, alias="Sniper Kill Rate")] sniper_kill_rate_per_round: Annotated[ - str, Field(alias="Sniper Kill Rate per Round") + float, Field(0.0, alias="Sniper Kill Rate per Round") + ] + total_1v1_count: Annotated[int, Field(0, alias="Total 1v1 Count")] + total_1v1_wins: Annotated[int, Field(0, alias="Total 1v1 Wins")] + total_1v2_count: Annotated[int, Field(0, alias="Total 1v2 Count")] + total_1v2_wins: Annotated[int, Field(0, alias="Total 1v2 Wins")] + total_damage: Annotated[int, Field(0, alias="Total Damage")] + total_enemies_flashed: Annotated[int, Field(0, alias="Total Enemies Flashed")] + total_entry_count: Annotated[int, Field(0, alias="Total Entry Count")] + total_entry_wins: Annotated[int, Field(0, alias="Total Entry Wins")] + total_flash_count: Annotated[int, Field(0, alias="Total Flash Count")] + total_flash_successes: Annotated[int, Field(0, alias="Total Flash Successes")] + total_headshots_percentage: Annotated[int, Field(alias="Total Headshots %")] + total_kills_with_extended_stats: Annotated[ + int, Field(0, alias="Total Kills with extended stats") + ] + total_matches: Annotated[int, Field(0, alias="Total Matches")] + total_rounds_with_extended_stats: Annotated[ + int, Field(0, alias="Total Rounds with extended stats") + ] + total_sniper_kills: Annotated[int, Field(0, alias="Total Sniper Kills")] + total_utility_count: Annotated[int, Field(0, alias="Total Utility Count")] + total_utility_damage: Annotated[int, Field(0, alias="Total Utility Damage")] + total_utility_successes: Annotated[int, Field(0, alias="Total Utility Successes")] + triple_kills: Annotated[int, Field(alias="Triple Kills")] + utility_damage_per_round: Annotated[ + float, Field(0.0, alias="Utility Damage per Round") ] - average_penta_kills: Annotated[str, Field(alias="Average Penta Kills")] - headshots_per_match: Annotated[str, Field(alias="Headshots per Match")] - adr: Annotated[str, Field(alias="ADR")] - total_damage: Annotated[str, Field(alias="Total Damage")] - total_utility_count: Annotated[str, Field(alias="Total Utility Count")] - total_entry_count: Annotated[str, Field(alias="Total Entry Count")] - wins: Annotated[str, Field(alias="Wins")] - total_sniper_kills: Annotated[str, Field(alias="Total Sniper Kills")] - total_1v2_wins: Annotated[str, Field(alias="Total 1v2 Wins")] - headshots: Annotated[str, Field(alias="Headshots")] - total_1v2_count: Annotated[str, Field(alias="Total 1v2 Count")] - utility_usage_per_round: Annotated[str, Field(alias="Utility Usage per Round")] - total_utility_damage: Annotated[str, Field(alias="Total Utility Damage")] - kd_ratio: Annotated[str, Field(alias="K/D Ratio")] - kr_ratio: Annotated[str, Field(alias="K/R Ratio")] - average_kills: Annotated[str, Field(alias="Average Kills")] - win_rate: Annotated[str, Field(alias="Win Rate %")] utility_damage_success_rate: Annotated[ - str, Field(alias="Utility Damage Success Rate") + float, Field(0.0, alias="Utility Damage Success Rate") ] - utility_damage_per_round: Annotated[str, Field(alias="Utility Damage per Round")] - total_utility_successes: Annotated[str, Field(alias="Total Utility Successes")] - matches: Annotated[str, Field(alias="Matches")] - total_matches: Annotated[str, Field(alias="Total Matches")] - average_mvps: Annotated[str, Field(alias="Average MVPs")] - assists: Annotated[str, Field(alias="Assists")] - total_1v1_count: Annotated[str, Field(alias="Total 1v1 Count")] - triple_kills: Annotated[str, Field(alias="Triple Kills")] - average_kr_ratio: Annotated[str, Field(alias="Average K/R Ratio")] - - -class MapSegment(BaseModel): - stats: MapStats + utility_success_rate: Annotated[float, Field(0.0, alias="Utility Success Rate")] + utility_usage_per_round: Annotated[ + float, Field(0.0, alias="Utility Usage per Round") + ] + v1_win_rate: Annotated[float, Field(0.0, alias="1v1 Win Rate")] + v2_win_rate: Annotated[float, Field(0.0, alias="1v2 Win Rate")] + win_rate: Annotated[int, Field(alias="Win Rate %")] + wins: Annotated[int, Field(alias="Wins")] + + +class Segment(BaseModel): # Возможно является более универсальным + stats: CSMapStats type: str mode: str name: Annotated[str, Field(alias="label")] @@ -247,8 +290,8 @@ class MapSegment(BaseModel): class PlayerStats(BaseModel): id: Annotated[FaceitID, Field(alias="player_id")] game_id: GameID - lifetime: LifetimeStats - maps: Annotated[typing.List[MapSegment], Field(alias="segments")] + lifetime: CSLifetimeStats # Относительно `game_id`; для иных игр модели делать не собираюсь + segments: typing.List[Segment] # TODO: Преобразование списка карт в словарь по "label" # Возможно лучше `GenericContainer` где атрибуты будут автоматически diff --git a/src/faceit/models/players/match.py b/src/faceit/models/players/match.py index 8504abe..5362e2f 100644 --- a/src/faceit/models/players/match.py +++ b/src/faceit/models/players/match.py @@ -15,7 +15,9 @@ _NoOpponent: TypeAlias = typing.Literal["bye"] -_RESULT_MAP: typing.Final = {"faction1": "first", "faction2": "second"} +_F1: typing.Final = "faction1" +_F2: typing.Final = "faction2" +_RESULT_MAP: typing.Final = {_F1: "first", _F2: "second"} class PlayerSummary(BaseModel): @@ -37,13 +39,13 @@ class Team(BaseModel): class Teams(BaseModel): - first: Annotated[Team, Field(alias="faction1")] - second: Annotated[Team, Field(alias="faction2")] + first: Annotated[Team, Field(alias=_F1)] + second: Annotated[Team, Field(alias=_F2)] class Score(BaseModel): - first: Annotated[int, Field(alias="faction1")] - second: Annotated[int, Field(alias="faction2")] + first: Annotated[int, Field(alias=_F1)] + second: Annotated[int, Field(alias=_F2)] class Results(BaseModel): @@ -106,7 +108,7 @@ class CS2MatchPlayerStats(AbstractMatchPlayerStats): map: Annotated[str, Field(alias="Map")] overtime_score: Annotated[int, Field(alias="Overtime score")] deaths: Annotated[int, Field(alias="Deaths")] - game: Annotated[str, Field(alias="Game")] + game: Annotated[typing.Literal[GameID.CS2], Field(alias="Game")] nickname: Annotated[str, Field(alias="Nickname")] updated_at: Annotated[datetime, Field(alias="Updated At")] second_half_score: Annotated[int, Field(alias="Second Half Score")] diff --git a/src/faceit/resources/__init__.py b/src/faceit/resources/__init__.py index f9b1c68..39cad63 100644 --- a/src/faceit/resources/__init__.py +++ b/src/faceit/resources/__init__.py @@ -1,9 +1,20 @@ -from .data import AsyncDataResource as AsyncDataResource -from .data import SyncDataResource as SyncDataResource -from .pagination import AsyncPageIterator as AsyncPageIterator -from .pagination import CollectReturnFormat as CollectReturnFormat -from .pagination import MaxItems as MaxItems -from .pagination import MaxPages as MaxPages -from .pagination import SyncPageIterator as SyncPageIterator -from .pagination import TimestampPaginationConfig as TimestampPaginationConfig -from .pagination import pages as pages +from .data import AsyncDataResource, SyncDataResource +from .pagination import ( + AsyncPageIterator, + CollectReturnFormat, + MaxItems, + SyncPageIterator, + TimestampPaginationConfig, + pages, +) + +__all__ = [ + "AsyncDataResource", + "AsyncPageIterator", + "CollectReturnFormat", + "MaxItems", + "SyncDataResource", + "SyncPageIterator", + "TimestampPaginationConfig", + "pages", +] diff --git a/src/faceit/resources/aggregator.py b/src/faceit/resources/aggregator.py new file mode 100644 index 0000000..a7f72e9 --- /dev/null +++ b/src/faceit/resources/aggregator.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import typing +import warnings +from abc import ABC +from functools import cached_property + +from typing_extensions import Never, Self + +from faceit.http import AsyncClient, EnvKey, SyncClient +from faceit.types import ClientT, Raw, ValidUUID + +if typing.TYPE_CHECKING: + from types import TracebackType + + from faceit.http.client import BaseAPIClient + + _AggregatorT = typing.TypeVar("_AggregatorT", bound="BaseResources[typing.Any]") + + +class BaseResources(ABC, typing.Generic[ClientT]): + __slots__ = ("_client",) + + if typing.TYPE_CHECKING: + _client: ClientT + _client_cls: typing.Type[ClientT] + + def _initialize_client( + self, + auth: typing.Union[ValidUUID, BaseAPIClient.env, None] = None, + client: typing.Optional[ClientT] = None, + /, + *, + secret_type: str, + **client_options: typing.Any, + ) -> None: + if auth is not None and client is not None: + raise ValueError(f"Provide either {secret_type!r} or 'client', not both") + + if client is None: + self._client = self._client_cls( + EnvKey(f"FACEIT_{secret_type.upper()}") if auth is None else auth, + **client_options, + ) + return + + if client_options: + warnings.warn( + "'client_options' are ignored when an existing client " + "instance is provided. Configure your client before " + "passing it to this constructor.", + UserWarning, + stacklevel=3, + ) + + self._client = client + return + + @property + def client(self) -> ClientT: + return self._client + + +class SyncResources(BaseResources[SyncClient]): + __slots__ = () + + _client_cls = SyncClient + + def __enter__(self) -> Self: + self._client.__enter__() + return self + + def __exit__( + self, + typ: typing.Optional[typing.Type[BaseException]], + exc: typing.Optional[BaseException], + tb: typing.Optional[TracebackType], + ) -> None: + self._client.__exit__(typ, exc, tb) + + +class AsyncResources(BaseResources[AsyncClient]): + __slots__ = () + + _client_cls = AsyncClient + + def __enter__(self) -> Never: + self._client.__enter__() + + __exit__ = AsyncClient.__exit__ + + async def __aenter__(self) -> Self: + await self._client.__aenter__() + return self + + async def __aexit__( + self, + typ: typing.Optional[typing.Type[BaseException]], + exc: typing.Optional[BaseException], + tb: typing.Optional[TracebackType], + ) -> None: + await self._client.__aexit__(typ, exc, tb) + + +def resource_aggregator(cls: typing.Type[_AggregatorT], /) -> typing.Type[_AggregatorT]: + for name, resource_type in cls.__annotations__.items(): + + def make_property( + is_raw: bool, resource_type: typing.Type[typing.Any] + ) -> cached_property[_AggregatorT]: + return cached_property(lambda self: resource_type(self._client, is_raw)) + + prop = make_property(Raw in typing.get_args(resource_type), resource_type) + setattr(cls, name, prop) + prop.__set_name__(cls, name) + + return cls diff --git a/src/faceit/resources/base.py b/src/faceit/resources/base.py index 7df1348..0025080 100644 --- a/src/faceit/resources/base.py +++ b/src/faceit/resources/base.py @@ -2,10 +2,10 @@ import logging import typing +import warnings from abc import ABC from dataclasses import dataclass from types import MappingProxyType -from warnings import warn from pydantic import ValidationError @@ -20,13 +20,14 @@ ) from faceit.utils import StrEnum -from .pagination import AsyncPageIterator, SyncPageIterator, TimestampPaginationConfig +from .pagination import AsyncPageIterator, SyncPageIterator if typing.TYPE_CHECKING: _ResponseT = typing.TypeVar("_ResponseT", bound=RawAPIResponse) _logger = logging.getLogger(__name__) +_IsPagedT = typing.TypeVar("_IsPagedT", bound=bool) # Temporary placeholder type for unimplemented models. # Serves as a stub during development and should be replaced with # concrete models as implementation progresses. @@ -41,9 +42,9 @@ class RequestPayload(typing.TypedDict): @typing.final @dataclass(eq=False, frozen=True) -class MappedValidatorConfig(typing.Generic[_T, ModelT]): +class MappedValidatorConfig(typing.Generic[_T, ModelT, _IsPagedT]): validator_map: typing.Mapping[_T, typing.Type[ModelT]] - is_paged: bool + is_paged: _IsPagedT key_name: str = "key" @@ -69,7 +70,6 @@ class BaseResource(ABC, typing.Generic[ClientT]): _sync_page_iterator: typing.ClassVar = SyncPageIterator _async_page_iterator: typing.ClassVar = AsyncPageIterator - _timestamp_cfg: typing.ClassVar = TimestampPaginationConfig _PARAM_NAME_MAP: typing.ClassVar[typing.Mapping[str, str]] = MappingProxyType({ "start": "from", @@ -106,7 +106,7 @@ def _process_response_with_mapped_validator( self, response: RawAPIPageResponse, key: _T, - config: MappedValidatorConfig[_T, ModelT], + config: MappedValidatorConfig[_T, ModelT, typing.Literal[False]], /, ) -> typing.Union[ModelT, RawAPIPageResponse]: ... @@ -115,7 +115,7 @@ def _process_response_with_mapped_validator( self, response: RawAPIPageResponse, key: _T, - config: MappedValidatorConfig[_T, ModelT], + config: MappedValidatorConfig[_T, ModelT, typing.Literal[True]], /, ) -> typing.Union[ItemPage[ModelT], RawAPIPageResponse]: ... @@ -123,13 +123,13 @@ def _process_response_with_mapped_validator( self, response: RawAPIPageResponse, key: _T, - config: MappedValidatorConfig[_T, ModelT], + config: MappedValidatorConfig[_T, ModelT, _IsPagedT], /, ) -> typing.Union[ModelT, ItemPage[ModelT], RawAPIPageResponse]: _logger.debug("Processing response with mapped validator for key: %s", key) validator = config.validator_map.get(key) if validator is None: - warn( + warnings.warn( f"No model defined for {config.key_name} {key!r}. Consider using the raw response.", UserWarning, stacklevel=5, @@ -154,7 +154,7 @@ def _validate_response( if self._raw: return response if validator is None: - warn( + warnings.warn( "No model defined for this response. Validation and model " "parsing are unavailable. Use the raw version for explicit, " "unprocessed data.", diff --git a/src/faceit/resources/data/__init__.py b/src/faceit/resources/data/__init__.py index 229463d..c1f1739 100644 --- a/src/faceit/resources/data/__init__.py +++ b/src/faceit/resources/data/__init__.py @@ -1,38 +1,81 @@ -from typing import final # noqa: ICN003 +import typing -from faceit.resources.resource_aggregator import ( +from faceit.http.client import BaseAPIClient +from faceit.resources.aggregator import ( AsyncResources, + BaseResources, SyncResources, resource_aggregator, ) -from faceit.types import Model, Raw - -from .championships import AsyncChampionships as AsyncChampionships -from .championships import BaseChampionships as BaseChampionships -from .championships import SyncChampionships as SyncChampionships -from .leagues import AsyncLeagues as AsyncLeagues -from .leagues import BaseLeagues as BaseLeagues -from .leagues import SyncLeagues as SyncLeagues -from .matches import AsyncMatches as AsyncMatches -from .matches import BaseMatches as BaseMatches -from .matches import SyncMatches as SyncMatches -from .matchmakings import AsyncMatchmakings as AsyncMatchmakings -from .matchmakings import BaseMatchmakings as BaseMatchmakings -from .matchmakings import SyncMatchmakings as SyncMatchmakings -from .players import AsyncPlayers as AsyncPlayers -from .players import BasePlayers as BasePlayers -from .players import SyncPlayers as SyncPlayers -from .rankings import AsyncRankings as AsyncRankings -from .rankings import BaseRankings as BaseRankings -from .rankings import SyncRankings as SyncRankings -from .teams import AsyncTeams as AsyncTeams -from .teams import BaseTeams as BaseTeams -from .teams import SyncTeams as SyncTeams - - -@final +from faceit.types import ClientT, Model, Raw, ValidUUID + +from .championships import AsyncChampionships, BaseChampionships, SyncChampionships +from .leagues import AsyncLeagues, BaseLeagues, SyncLeagues +from .matches import AsyncMatches, BaseMatches, SyncMatches +from .matchmakings import AsyncMatchmakings, BaseMatchmakings, SyncMatchmakings +from .players import AsyncPlayers, BasePlayers, SyncPlayers +from .rankings import AsyncRankings, BaseRankings, SyncRankings +from .teams import AsyncTeams, BaseTeams, SyncTeams + +__all__ = [ + "AsyncChampionships", + "AsyncDataResource", + "AsyncLeagues", + "AsyncMatches", + "AsyncMatchmakings", + "AsyncPlayers", + "AsyncRankings", + "AsyncTeams", + "BaseChampionships", + "BaseLeagues", + "BaseMatches", + "BaseMatchmakings", + "BasePlayers", + "BaseRankings", + "BaseTeams", + "SyncChampionships", + "SyncDataResource", + "SyncLeagues", + "SyncMatches", + "SyncMatchmakings", + "SyncPlayers", + "SyncRankings", + "SyncTeams", +] + + +class _DataResourceMixin: + @typing.overload + def __init__(self) -> None: ... + + @typing.overload + def __init__(self, *, client: ClientT) -> None: ... + + @typing.overload + def __init__( + self, + api_key: typing.Union[ValidUUID, BaseAPIClient.env], + **client_options: typing.Any, + ) -> None: ... + + def __init__( # type: ignore[misc] + self: BaseResources[ClientT], + api_key: typing.Union[ValidUUID, BaseAPIClient.env, None] = None, + *, + client: typing.Optional[ClientT] = None, + **client_options: typing.Any, + ) -> None: + self._initialize_client( + api_key, + client, + secret_type="api_key", # noqa: S106 + **client_options, + ) + + +@typing.final @resource_aggregator -class SyncDataResource(SyncResources): +class SyncDataResource(SyncResources, _DataResourceMixin): championships: SyncChampionships[Model] raw_championships: SyncChampionships[Raw] @@ -55,9 +98,9 @@ class SyncDataResource(SyncResources): raw_teams: SyncTeams[Raw] -@final +@typing.final @resource_aggregator -class AsyncDataResource(AsyncResources): +class AsyncDataResource(AsyncResources, _DataResourceMixin): championships: AsyncChampionships[Model] raw_championships: AsyncChampionships[Raw] diff --git a/src/faceit/resources/data/championships.py b/src/faceit/resources/data/championships.py index 3de01c3..d0e24d1 100644 --- a/src/faceit/resources/data/championships.py +++ b/src/faceit/resources/data/championships.py @@ -91,7 +91,7 @@ def all_items( game: GameID, category: EventCategory = EventCategory.ALL, max_items: MaxItemsType = pages(30), - ) -> RawAPIPageResponse: ... + ) -> typing.List[RawAPIItem]: ... @typing.overload def all_items( @@ -106,7 +106,7 @@ def all_items( game: GameID, category: EventCategory = EventCategory.ALL, max_items: MaxItemsType = pages(30), - ) -> typing.Union[RawAPIPageResponse, ItemPage[Championship]]: + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[Championship]]: return self.__class__._sync_page_iterator.gather_pages( self.items, game, category, max_items=max_items ) @@ -311,7 +311,7 @@ async def all_items( game: GameID, category: EventCategory = EventCategory.ALL, max_items: MaxItemsType = pages(30), - ) -> RawAPIPageResponse: ... + ) -> typing.List[RawAPIItem]: ... @typing.overload async def all_items( @@ -326,7 +326,7 @@ async def all_items( game: GameID, category: EventCategory = EventCategory.ALL, max_items: MaxItemsType = pages(30), - ) -> typing.Union[RawAPIPageResponse, ItemPage[Championship]]: + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[Championship]]: return await self.__class__._async_page_iterator.gather_pages( self.items, game, category, max_items=max_items ) diff --git a/src/faceit/resources/data/matches.py b/src/faceit/resources/data/matches.py index 85108f4..baa511b 100644 --- a/src/faceit/resources/data/matches.py +++ b/src/faceit/resources/data/matches.py @@ -4,7 +4,7 @@ from abc import ABC from pydantic import AfterValidator, validate_call -from typing_extensions import Annotated, TypeAlias, deprecated +from typing_extensions import Annotated, TypeAlias from faceit.http import AsyncClient, SyncClient from faceit.models.custom_types import FaceitMatchID @@ -55,12 +55,6 @@ def get( __call__ = get - @deprecated( - "`details` is deprecated and will be removed in a future release. Use `get` instead." - ) - def details(self, match_id: typing.Any) -> typing.Any: - return self.get(match_id) - @typing.overload def stats(self: SyncMatches[Raw], match_id: _MatchID) -> RawAPIItem: ... @@ -101,12 +95,6 @@ async def get( __call__ = get - @deprecated( - "`details` is deprecated and will be removed in a future release. Use `get` instead." - ) - async def details(self, match_id: typing.Any) -> typing.Any: - return await self.get(match_id) - @typing.overload async def stats(self: AsyncMatches[Raw], match_id: _MatchID) -> RawAPIItem: ... diff --git a/src/faceit/resources/data/players.py b/src/faceit/resources/data/players.py index 85b4eb9..a28bc20 100644 --- a/src/faceit/resources/data/players.py +++ b/src/faceit/resources/data/players.py @@ -3,8 +3,8 @@ import logging import typing +import warnings from abc import ABC -from warnings import warn from pydantic import AfterValidator, Field, validate_call from typing_extensions import Annotated, TypeAlias @@ -29,7 +29,12 @@ ModelPlaceholder, RequestPayload, ) -from faceit.resources.pagination import MaxItems, MaxItemsType, pages +from faceit.resources.pagination import ( + MaxItems, + MaxItemsType, + TimestampPaginationConfig, + pages, +) from faceit.types import ( APIResponseFormatT, ClientT, @@ -62,7 +67,7 @@ class BasePlayers( __slots__ = () _matches_stats_validator_cfg: typing.ClassVar = MappedValidatorConfig[ - GameID, AbstractMatchPlayerStats + GameID, AbstractMatchPlayerStats, typing.Literal[True] ]( validator_map={ GameID.CS2: CS2MatchPlayerStats, @@ -72,10 +77,10 @@ class BasePlayers( key_name="game", ) - _matches_stats_timestamp_cfg: typing.ClassVar = BaseResource._timestamp_cfg( + _matches_stats_timestamp_cfg: typing.ClassVar = TimestampPaginationConfig( key="stats.Match Finished At", attr="match_finished_at" ) - _history_timestamp_cfg: typing.ClassVar = BaseResource._timestamp_cfg( + _history_timestamp_cfg: typing.ClassVar = TimestampPaginationConfig( key="finished_at", attr="finished_at" ) @@ -101,7 +106,7 @@ def _process_get_request( return RequestPayload(endpoint=self.__class__.PATH, params=params) if game is not None or game_player_id is not None: - warn( + warnings.warn( "When 'player_lookup_key' is provided, " "'game' and 'game_player_id' should not be specified. " "The value of 'player_lookup_key' will take precedence.", @@ -437,7 +442,7 @@ def all_hubs( @typing.overload def stats( self: SyncPlayers[Raw], player_id: PlayerID, game: GameID - ) -> RawAPIPageResponse: ... + ) -> RawAPIItem: ... @typing.overload def stats( @@ -447,11 +452,11 @@ def stats( @validate_call def stats( self, player_id: PlayerIDValidated, game: GameID - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIItem, ModelNotImplemented]: return self._validate_response( self._client.get( self.__class__.PATH / str(player_id) / "stats" / game, - expect_page=True, + expect_item=True, ), ModelPlaceholder, ) @@ -879,7 +884,7 @@ async def all_hubs( @typing.overload async def stats( self: AsyncPlayers[Raw], player_id: PlayerID, game: GameID - ) -> RawAPIPageResponse: ... + ) -> RawAPIItem: ... @typing.overload async def stats( @@ -889,11 +894,11 @@ async def stats( @validate_call async def stats( self, player_id: PlayerIDValidated, game: GameID - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIItem, ModelNotImplemented]: return self._validate_response( await self._client.get( self.__class__.PATH / str(player_id) / "stats" / game, - expect_page=True, + expect_item=True, ), ModelPlaceholder, ) diff --git a/src/faceit/resources/data/rankings.py b/src/faceit/resources/data/rankings.py index 1bbc841..7fcdf39 100644 --- a/src/faceit/resources/data/rankings.py +++ b/src/faceit/resources/data/rankings.py @@ -8,6 +8,7 @@ from faceit.constants import GameID # noqa: TC001 from faceit.http import AsyncClient, SyncClient +from faceit.models import ItemPage # noqa: TC001 from faceit.models.custom_types import CountryCode # noqa: TC001 from faceit.resources.base import BaseResource, FaceitResourcePath, ModelPlaceholder from faceit.resources.pagination import MaxItemsType, pages @@ -56,7 +57,7 @@ def unbounded( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call def unbounded( @@ -67,7 +68,7 @@ def unbounded( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIPageResponse, ItemPage[ModelNotImplemented]]: return self._validate_response( self._client.get( self.__class__.PATH / "games" / game / "regions" / region, @@ -95,7 +96,7 @@ def all_unbounded( region: RegionIdentifier, country: typing.Optional[CountryCode] = None, max_items: MaxItemsType = pages(10), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call def all_unbounded( @@ -104,7 +105,7 @@ def all_unbounded( region: RegionIdentifier, country: typing.Optional[CountryCode] = None, max_items: MaxItemsType = pages(10), - ) -> typing.Union[typing.List[RawAPIItem], ModelNotImplemented]: + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[ModelNotImplemented]]: return self.__class__._sync_page_iterator.gather_pages( self.unbounded, game, region, country, max_items=max_items ) @@ -178,7 +179,7 @@ async def unbounded( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call async def unbounded( @@ -189,7 +190,7 @@ async def unbounded( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIPageResponse, ItemPage[ModelNotImplemented]]: return self._validate_response( await self._client.get( self.__class__.PATH / "games" / game / "regions" / region, @@ -217,7 +218,7 @@ async def all_unbounded( region: RegionIdentifier, country: typing.Optional[CountryCode] = None, max_items: MaxItemsType = pages(10), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call async def all_unbounded( @@ -226,8 +227,8 @@ async def all_unbounded( region: RegionIdentifier, country: typing.Optional[CountryCode] = None, max_items: MaxItemsType = pages(10), - ) -> typing.Union[typing.List[RawAPIItem], ModelNotImplemented]: - return self.__class__._async_page_iterator.gather_pages( + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[ModelNotImplemented]]: + return await self.__class__._async_page_iterator.gather_pages( self.unbounded, game, region, country, max_items=max_items ) diff --git a/src/faceit/resources/data/teams.py b/src/faceit/resources/data/teams.py index 2578fe0..f396e70 100644 --- a/src/faceit/resources/data/teams.py +++ b/src/faceit/resources/data/teams.py @@ -5,10 +5,11 @@ from abc import ABC from pydantic import AfterValidator, Field, validate_call -from typing_extensions import Annotated, TypeAlias, deprecated +from typing_extensions import Annotated, TypeAlias from faceit.constants import GameID # noqa: TC001 from faceit.http import AsyncClient, SyncClient +from faceit.models import ItemPage # noqa: TC001 from faceit.resources.base import BaseResource, FaceitResourcePath, ModelPlaceholder from faceit.resources.pagination import MaxItemsType, pages from faceit.types import ( @@ -55,12 +56,6 @@ def get( __call__ = get - @deprecated( - "`details` is deprecated and will be removed in a future release. Use `get` instead." - ) - def details(self, team_id: typing.Any) -> typing.Any: - return self.get(team_id) - @typing.overload def stats(self: SyncTeams[Raw], team_id: _TeamID, game: GameID) -> RawAPIItem: ... @@ -97,7 +92,7 @@ def tournaments( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call def tournaments( @@ -106,7 +101,7 @@ def tournaments( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIPageResponse, ItemPage[ModelNotImplemented]]: return self._validate_response( self._client.get( self.__class__.PATH / team_id / "tournaments", @@ -124,11 +119,11 @@ def all_tournaments( @typing.overload def all_tournaments( self: SyncTeams[Model], team_id: _TeamID, max_items: MaxItemsType = pages(30) - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... def all_tournaments( self, team_id: _TeamIDValidated, max_items: MaxItemsType = pages(30) - ) -> typing.Union[typing.List[RawAPIItem], ModelNotImplemented]: + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[ModelNotImplemented]]: return self.__class__._sync_page_iterator.gather_pages( self.tournaments, team_id, max_items=max_items ) @@ -154,12 +149,6 @@ async def get( __call__ = get - @deprecated( - "`details` is deprecated and will be removed in a future release. Use `get` instead." - ) - async def details(self, team_id: typing.Any) -> typing.Any: - return await self.get(team_id) - @typing.overload async def stats( self: AsyncTeams[Raw], team_id: _TeamID, game: GameID @@ -198,7 +187,7 @@ async def tournaments( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... @validate_call async def tournaments( @@ -207,7 +196,7 @@ async def tournaments( *, offset: int = Field(0, ge=0), limit: int = Field(20, ge=1, le=100), - ) -> typing.Union[RawAPIPageResponse, ModelNotImplemented]: + ) -> typing.Union[RawAPIPageResponse, ItemPage[ModelNotImplemented]]: return self._validate_response( await self._client.get( self.__class__.PATH / team_id / "tournaments", @@ -225,11 +214,11 @@ async def all_tournaments( @typing.overload async def all_tournaments( self: AsyncTeams[Model], team_id: _TeamID, max_items: MaxItemsType = pages(30) - ) -> ModelNotImplemented: ... + ) -> ItemPage[ModelNotImplemented]: ... async def all_tournaments( self, team_id: _TeamIDValidated, max_items: MaxItemsType = pages(30) - ) -> typing.Union[typing.List[RawAPIItem], ModelNotImplemented]: + ) -> typing.Union[typing.List[RawAPIItem], ItemPage[ModelNotImplemented]]: return await self.__class__._async_page_iterator.gather_pages( self.tournaments, team_id, max_items=max_items ) diff --git a/src/faceit/resources/pagination.py b/src/faceit/resources/pagination.py index a575a46..de30849 100644 --- a/src/faceit/resources/pagination.py +++ b/src/faceit/resources/pagination.py @@ -1,32 +1,30 @@ # mypy: disable-error-code="no-any-return" from __future__ import annotations +import inspect import math import typing +import warnings from abc import ABC from dataclasses import dataclass -from inspect import Parameter, signature from itertools import chain -from warnings import warn from annotated_types import Le from pydantic import PositiveInt from pydantic.fields import FieldInfo -from typing_extensions import Self, TypeAlias, deprecated +from typing_extensions import Self, TypeAlias from faceit.constants import RAW_RESPONSE_ITEMS_KEY from faceit.models import ItemPage from faceit.models.item_page import PaginationTimeRange from faceit.types import ( _T, - AsyncPaginationMethod, - AsyncUnixPaginationMethod, - BaseUnixPaginationMethod, + AsyncResourceMethodProtocol, + BaseResourceMethodProtocol, PaginationMethodT, RawAPIItem, RawAPIPageResponse, - SyncPaginationMethod, - SyncUnixPaginationMethod, + SyncResourceMethodProtocol, ) from faceit.utils import ( StrEnum, @@ -83,7 +81,7 @@ class PaginationMaxParams(typing.NamedTuple): @typing.final -class pages(int): # noqa: N801 +class pages(int): __slots__ = () @extends(int.__new__) @@ -97,14 +95,6 @@ def __new__(cls, x: typing.Any = 2, base: typing.Any = None) -> typing.Any: ) -@typing.final -@deprecated( - "`MaxPages` is deprecated and will be removed in a future release. Use `pages` instead." -) -class MaxPages(pages): # type: ignore[misc] - __slots__ = () - - @dataclass(eq=False, frozen=True) class _MethodCall(typing.Generic[PaginationMethodT]): call: PaginationMethodT @@ -130,21 +120,22 @@ def from_max_pages(cls, max_pages: MaxItemsType, /) -> Self: def _has_unix_pagination_params( - method: BaseUnixPaginationMethod[typing.Any], / + method: BaseResourceMethodProtocol[typing.Any], / ) -> bool: return all( - param in signature(method).parameters for param in _UNIX_PAGINATION_PARAMS + param in inspect.signature(method).parameters + for param in _UNIX_PAGINATION_PARAMS ) -def _get_le(param: Parameter, /) -> typing.Optional[Le]: +def _get_le(param: inspect.Parameter, /) -> typing.Optional[Le]: return next( (items for items in param.default.metadata if isinstance(items, Le)), None ) def _extract_pagination_limits( - limit_param: Parameter, offset_param: Parameter, method_name: str, / + limit_param: inspect.Parameter, offset_param: inspect.Parameter, method_name: str, / ) -> PaginationMaxParams: # Validates pagination parameters for: # 1. Development - ensures correct function signatures with clear error messages @@ -186,13 +177,17 @@ def check_pagination_support( return False limit_param, offset_param = ( - signature(func).parameters.get(arg) for arg in _PAGINATION_ARGS + inspect.signature(func).parameters.get(arg) for arg in _PAGINATION_ARGS ) if limit_param is None or offset_param is None: return False - return _extract_pagination_limits(limit_param, offset_param, func.__name__) + return _extract_pagination_limits( + limit_param, + offset_param, + typing.cast("str", getattr(func, "__name__", "")), + ) _ITERATOR_SLOTS = ( @@ -333,7 +328,7 @@ def set_max_pages(max_pages: int, /) -> None: def warn_if_exceeds_safe(max_pages: int, /) -> int: if max_pages > self.__class__.SAFE_MAX_PAGES: - warn( + warnings.warn( f"The computed number of pages ({max_pages}) exceeds the " f"recommended safe maximum ({self.__class__.SAFE_MAX_PAGES}). " "Proceed at your own risk.", @@ -394,7 +389,7 @@ def _handle_iteration_state(self, page: typing.Optional[_PageT], /) -> _PageT: @staticmethod def _remove_pagination_args(**kwargs: _T) -> typing.Dict[str, _T]: if any(kwargs.pop(arg, None) for arg in _PAGINATION_ARGS): - warn( + warnings.warn( f"Pagination parameters {_PAGINATION_ARGS} should not be " "provided by users. These parameters are managed internally " "by the pagination system.", @@ -422,7 +417,7 @@ def _validate_unix_pagination_parameter( f"Key and attribute parameters must be non-empty strings: {key}, {attr}." ) if any(kwargs.pop(arg, None) for arg in _UNIX_PAGINATION_PARAMS): - warn( + warnings.warn( "The parameters start and to will be managed automatically with Unix " "timestamp pagination. Your provided values will be ignored.", UserWarning, @@ -530,10 +525,7 @@ def _create_unix_timestamp_iterator( class _BaseSyncPageIterator( - BasePageIterator[ - typing.Union[SyncPaginationMethod[_PageT], SyncUnixPaginationMethod[_PageT]], - _PageT, - ], + BasePageIterator[SyncResourceMethodProtocol[_PageT], _PageT], typing.Iterator[_PageT], ): __slots__ = () @@ -558,10 +550,7 @@ def __next__(self) -> _PageT: class _BasyAsyncPageIterator( - BasePageIterator[ - typing.Union[AsyncPaginationMethod[_PageT], AsyncUnixPaginationMethod[_PageT]], - _PageT, - ], + BasePageIterator[AsyncResourceMethodProtocol[_PageT], _PageT], typing.AsyncIterator[_PageT], ): __slots__ = () @@ -610,14 +599,14 @@ def collect( @classmethod def unix( cls, - method: SyncUnixPaginationMethod[_PageT], + method: SyncResourceMethodProtocol[_PageT], /, *args: typing.Any, max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, key: str, attr: str, **kwargs: typing.Any, - ) -> typing.Generator[_PageT, None, None]: + ) -> typing.Iterator[_PageT]: cls._validate_unix_pagination_parameter(method, kwargs, key, attr) kwargs["max_items"] = max_items @@ -650,38 +639,10 @@ def unix( @classmethod def gather_pages( cls, - method: SyncPaginationMethod[ItemPage[_T]], - /, - *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, - unix: typing.Literal[False] = ..., - return_format: CollectReturnFormat = ..., - deduplicate: bool = ..., - **kwargs: typing.Any, - ) -> ItemPage[_T]: ... - - @typing.overload - @classmethod - def gather_pages( - cls, - method: SyncPaginationMethod[RawAPIPageResponse], - /, - *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, - unix: typing.Literal[False] = ..., - return_format: CollectReturnFormat = ..., - deduplicate: bool = ..., - **kwargs: typing.Any, - ) -> typing.List[RawAPIItem]: ... - - @typing.overload - @classmethod - def gather_pages( - cls, - method: SyncUnixPaginationMethod[ItemPage[_T]], + method: SyncResourceMethodProtocol[ItemPage[_T]], /, *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, + max_items: MaxItemsType = ..., unix: _OptionalTimestampPaginationConfig = ..., return_format: CollectReturnFormat = ..., deduplicate: bool = ..., @@ -692,10 +653,10 @@ def gather_pages( @classmethod def gather_pages( cls, - method: SyncUnixPaginationMethod[RawAPIPageResponse], + method: SyncResourceMethodProtocol[RawAPIPageResponse], /, *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, + max_items: MaxItemsType = ..., unix: _OptionalTimestampPaginationConfig = ..., return_format: CollectReturnFormat = ..., deduplicate: bool = ..., @@ -706,8 +667,8 @@ def gather_pages( def gather_pages( cls, method: typing.Union[ - SyncPaginationMethod[typing.Union[ItemPage[_T], RawAPIPageResponse]], - SyncUnixPaginationMethod[typing.Union[ItemPage[_T], RawAPIPageResponse]], + SyncResourceMethodProtocol[ItemPage[_T]], + SyncResourceMethodProtocol[RawAPIPageResponse], ], /, *args: typing.Any, @@ -720,15 +681,12 @@ def gather_pages( cls._validate_unix_config(unix) kwargs["max_items"] = max_items if unix is False: - casted_method = typing.cast( - "typing.Union[SyncPaginationMethod[_PageT], SyncUnixPaginationMethod[_PageT]]", - method, - ) + casted_method = typing.cast("SyncResourceMethodProtocol[_PageT]", method) # Type annotation needed as mypy can't infer that # both branches return compatible iterable iterator: typing.Iterator[typing.Any] = cls(casted_method, *args, **kwargs) else: - casted_method = typing.cast("SyncUnixPaginationMethod[_PageT]", method) + casted_method = typing.cast("SyncResourceMethodProtocol[_PageT]", method) iterator = cls.unix(casted_method, *args, **unix, **kwargs) return cls.gather_from_iterator( iterator, return_format, deduplicate=deduplicate @@ -797,14 +755,14 @@ async def collect( @classmethod async def unix( cls, - method: AsyncUnixPaginationMethod[_PageT], + method: AsyncResourceMethodProtocol[_PageT], /, *args: typing.Any, max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, key: str, attr: str, **kwargs: typing.Any, - ) -> typing.AsyncGenerator[_PageT, None]: + ) -> typing.AsyncIterator[_PageT]: cls._validate_unix_pagination_parameter(method, kwargs, key, attr) kwargs["max_items"] = max_items @@ -837,38 +795,10 @@ async def unix( @classmethod async def gather_pages( cls, - method: AsyncPaginationMethod[ItemPage[_T]], - /, - *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, - unix: typing.Literal[False] = ..., - return_format: CollectReturnFormat = ..., - deduplicate: bool = ..., - **kwargs: typing.Any, - ) -> ItemPage[_T]: ... - - @typing.overload - @classmethod - async def gather_pages( - cls, - method: AsyncPaginationMethod[RawAPIPageResponse], + method: AsyncResourceMethodProtocol[ItemPage[_T]], /, *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, - unix: typing.Literal[False] = ..., - return_format: CollectReturnFormat = ..., - deduplicate: bool = ..., - **kwargs: typing.Any, - ) -> typing.List[RawAPIItem]: ... - - @typing.overload - @classmethod - async def gather_pages( - cls, - method: AsyncUnixPaginationMethod[ItemPage[_T]], - /, - *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, + max_items: MaxItemsType = ..., unix: _OptionalTimestampPaginationConfig = ..., return_format: CollectReturnFormat = ..., deduplicate: bool = ..., @@ -879,10 +809,10 @@ async def gather_pages( @classmethod async def gather_pages( cls, - method: AsyncUnixPaginationMethod[RawAPIPageResponse], + method: AsyncResourceMethodProtocol[RawAPIPageResponse], /, *args: typing.Any, - max_items: MaxItemsType = BasePageIterator.DEFAULT_MAX_ITEMS, + max_items: MaxItemsType = ..., unix: _OptionalTimestampPaginationConfig = ..., return_format: CollectReturnFormat = ..., deduplicate: bool = ..., @@ -893,8 +823,8 @@ async def gather_pages( async def gather_pages( cls, method: typing.Union[ - AsyncPaginationMethod[typing.Union[ItemPage[_T], RawAPIPageResponse]], - AsyncUnixPaginationMethod[typing.Union[ItemPage[_T], RawAPIPageResponse]], + AsyncResourceMethodProtocol[ItemPage[_T]], + AsyncResourceMethodProtocol[RawAPIPageResponse], ], /, *args: typing.Any, @@ -907,15 +837,12 @@ async def gather_pages( cls._validate_unix_config(unix) kwargs["max_items"] = max_items if unix is False: - casted_method = typing.cast( - "typing.Union[AsyncPaginationMethod[_PageT], AsyncUnixPaginationMethod[_PageT]]", - method, - ) + casted_method = typing.cast("AsyncResourceMethodProtocol[_PageT]", method) iterator: typing.AsyncIterator[typing.Any] = cls( casted_method, *args, **kwargs ) else: - casted_method = typing.cast("AsyncUnixPaginationMethod[_PageT]", method) + casted_method = typing.cast("AsyncResourceMethodProtocol[_PageT]", method) iterator = cls.unix(casted_method, *args, **unix, **kwargs) return await cls.gather_from_iterator( iterator, return_format, deduplicate=deduplicate diff --git a/src/faceit/resources/resource_aggregator.py b/src/faceit/resources/resource_aggregator.py deleted file mode 100644 index f441e05..0000000 --- a/src/faceit/resources/resource_aggregator.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import annotations - -import typing -from abc import ABC -from dataclasses import dataclass -from functools import cached_property - -from typing_extensions import Self - -from faceit.http import AsyncClient, SyncClient -from faceit.types import ClientT -from faceit.utils import noop - -if typing.TYPE_CHECKING: - from types import TracebackType - -_AggregatorT = typing.TypeVar("_AggregatorT", bound="BaseResources[typing.Any]") - - -@dataclass(eq=False, frozen=True) -class BaseResources(ABC, typing.Generic[ClientT]): - __slots__ = ("_client",) - - _client: ClientT - - @property - def client(self) -> ClientT: - return self._client - - -class SyncResources(BaseResources[SyncClient]): - __slots__ = () - - def __enter__(self) -> Self: - self._client.__enter__() - return self - - def __exit__( - self, - typ: typing.Optional[typing.Type[BaseException]], - exc: typing.Optional[BaseException], - tb: typing.Optional[TracebackType], - ) -> None: - self._client.__exit__(typ, exc, tb) - - -class AsyncResources(BaseResources[AsyncClient]): - __slots__ = () - - def __enter__(self) -> typing.NoReturn: - self._client.__enter__() - - __exit__ = noop - - async def __aenter__(self) -> Self: - await self._client.__aenter__() - return self - - async def __aexit__( - self, - typ: typing.Optional[typing.Type[BaseException]], - exc: typing.Optional[BaseException], - tb: typing.Optional[TracebackType], - ) -> None: - await self._client.__aexit__(typ, exc, tb) - - -def resource_aggregator(cls: typing.Type[_AggregatorT], /) -> typing.Type[_AggregatorT]: - for name, resource_type in cls.__annotations__.items(): - - def make_property( - is_raw: bool, - resource_type: typing.Type[typing.Any] = resource_type, - ) -> cached_property[_AggregatorT]: - return cached_property(lambda self: resource_type(self._client, is_raw)) - - prop = make_property(name.startswith("raw_")) - setattr(cls, name, prop) - prop.__set_name__(cls, name) - - return cls diff --git a/src/faceit/types.py b/src/faceit/types.py index ba1d611..1692249 100644 --- a/src/faceit/types.py +++ b/src/faceit/types.py @@ -1,8 +1,7 @@ import typing -from threading import Lock from uuid import UUID -from pydantic import AnyHttpUrl, BaseModel, Field +from pydantic import AnyHttpUrl, BaseModel from typing_extensions import NotRequired, ParamSpec, TypeAlias from .constants import Region @@ -11,11 +10,9 @@ from .http import Endpoint from .http.client import BaseAPIClient from .resources import AsyncDataResource, SyncDataResource - from .resources.base import BaseResource _T = typing.TypeVar("_T") _R = typing.TypeVar("_R") -_T_co = typing.TypeVar("_T_co", covariant=True) _P = ParamSpec("_P") ModelT = typing.TypeVar("ModelT", bound="BaseModel") @@ -25,7 +22,9 @@ ) APIResponseFormatT = typing.TypeVar("APIResponseFormatT", "Raw", "Model") -PaginationMethodT = typing.TypeVar("PaginationMethodT", bound="BaseMethodProtocol") +PaginationMethodT = typing.TypeVar( + "PaginationMethodT", bound="BaseResourceMethodProtocol[typing.Any]" +) EmptyString: TypeAlias = typing.Literal[""] UrlOrEmpty: TypeAlias = typing.Union[AnyHttpUrl, EmptyString] @@ -58,45 +57,10 @@ RawAPIResponse: TypeAlias = typing.Union[RawAPIItem, RawAPIPageResponse] -class BaseMethodProtocol(typing.Protocol): +# fmt: off +class BaseResourceMethodProtocol(typing.Protocol[_T]): __name__: str - __self__: "BaseResource[typing.Any]" - __call__: typing.Callable[..., typing.Any] - - -class BasePaginationMethod(BaseMethodProtocol, typing.Protocol[_T_co]): - def __call__( - self, - *args: typing.Any, - offset: int = Field(...), - limit: int = Field(...), - **kwargs: typing.Any, - ) -> _T_co: ... - - -class SyncPaginationMethod(BasePaginationMethod[_T_co], typing.Protocol): ... - - -class AsyncPaginationMethod( - BasePaginationMethod[typing.Awaitable[_T_co]], typing.Protocol -): ... - - -class BaseUnixPaginationMethod(BaseMethodProtocol, typing.Protocol[_T_co]): - def __call__( - self, - *args: typing.Any, - offset: int = Field(...), - limit: int = Field(...), - start: typing.Optional[int] = None, - to: typing.Optional[int] = None, - **kwargs: typing.Any, - ) -> _T_co: ... - - -class SyncUnixPaginationMethod(BaseUnixPaginationMethod[_T_co], typing.Protocol): ... - - -class AsyncUnixPaginationMethod( - BaseUnixPaginationMethod[typing.Awaitable[_T_co]], typing.Protocol -): ... + __call__: typing.Callable[..., _T] +class SyncResourceMethodProtocol(BaseResourceMethodProtocol[_T], typing.Protocol): ... +class AsyncResourceMethodProtocol(BaseResourceMethodProtocol[typing.Awaitable[_T]], typing.Protocol): ... +# fmt: on diff --git a/src/faceit/utils.py b/src/faceit/utils.py index 8996392..e4a3ebc 100644 --- a/src/faceit/utils.py +++ b/src/faceit/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import json import reprlib import typing @@ -8,7 +9,6 @@ from enum import Enum, IntEnum, auto from functools import reduce, wraps from hashlib import sha256 -from inspect import isawaitable, iscoroutinefunction from uuid import UUID from typing_extensions import Self, TypeIs @@ -22,12 +22,14 @@ _CallableT = typing.TypeVar("_CallableT", bound=typing.Callable[..., typing.Any]) _ClassT = typing.TypeVar("_ClassT", bound=type) -REDACTED_MARKER: typing.Final = "[REDACTED]" - _UUID_BYTES: typing.Final = 16 _UNINITIALIZED_MARKER: typing.Final = "uninitialized" +class UnsetValue(IntEnum): + UNSET = -1 + + # NOTE: Inspired by irgeek/StrEnum: # https://github.com/irgeek/StrEnum/blob/master/strenum/__init__.py#L21 # Previously depended on `StrEnum`, but only core features were needed - @@ -36,7 +38,7 @@ class StrEnum(str, Enum): _value_: str def __new__( - cls, value: typing.Union[str, auto], *args: typing.Any, **kwargs: typing.Any + cls, value: typing.Union[str, auto], *args: object, **kwargs: object ) -> Self: if isinstance(value, (str, auto)): return super().__new__(cls, value, *args, **kwargs) @@ -45,7 +47,7 @@ def __new__( ) @staticmethod - def _generate_next_value_(name: str, *_: typing.Any) -> str: + def _generate_next_value_(name: str, *_: object) -> str: return name def __str__(self) -> str: @@ -54,23 +56,10 @@ def __str__(self) -> str: class StrEnumWithAll(StrEnum): @classmethod - def all(cls) -> typing.Tuple[Self, ...]: + def get_all_values(cls) -> typing.Tuple[Self, ...]: return tuple(cls) - -class UnsetValue(IntEnum): - UNSET = -1 - - -class _Noop: - def __call__(self, *_: typing.Any, **__: typing.Any) -> Self: - return self - - def __await__(self) -> typing.Generator[None]: - yield - - -noop = _Noop() + all = get_all_values # alias for backwards compatibility def UnsupportedOperationTypeError( # noqa: N802 @@ -85,7 +74,7 @@ def locked( lock: typing.Union[SyncLock, AsyncLock], / ) -> typing.Callable[[typing.Callable[_P, _T]], typing.Callable[_P, _T]]: def decorator(func: typing.Callable[_P, _T], /) -> typing.Callable[_P, _T]: - if iscoroutinefunction(func): + if inspect.iscoroutinefunction(func): @wraps(func) async def async_wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: @@ -106,7 +95,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: def extends( _: _CallableT, / -) -> typing.Callable[[typing.Callable[..., typing.Any]], _CallableT]: +) -> typing.Callable[[typing.Callable[..., object]], _CallableT]: """ Decorator that assigns the type signature of the given function to the decorated function. Type checking is enforced only at the function boundary @@ -121,28 +110,30 @@ async def invoke_callable( *args: typing.Any, **kwargs: typing.Any, ) -> _T: - if callable(func): - result = func(*args, **kwargs) - if isawaitable(result): - result = await result - return typing.cast("_T", result) - raise TypeError( - f"Expected a callable object, got {type(func).__name__} ({func!r}). " - "Argument 'func' must be a function or object with a __call__ method." - ) + if not callable(func): + raise TypeError( + f"Expected a callable object, got {type(func).__name__} ({func!r}). " + "Argument 'func' must be a function or object with a __call__ method." + ) + result = func(*args, **kwargs) + if inspect.isawaitable(result): + result = await result + return typing.cast("_T", result) def deep_get( - dictionary: typing.Dict[str, typing.Any], + dictionary: typing.Mapping[str, typing.Any], keys: str, /, default: typing.Optional[_T] = None, -) -> typing.Union[_T, typing.Any]: - return reduce( - lambda d, k: d.get(k, default) if isinstance(d, dict) else default, - keys.split("."), - dictionary, - ) +) -> typing.Union[_T, typing.Any, None]: + current = dictionary + try: + for key in keys.split("."): + current = current[key] + except (KeyError, TypeError, AttributeError): + return default + return current def get_nested_property( @@ -152,9 +143,7 @@ def get_nested_property( return default try: return reduce( - lambda o, k: default if o is None else getattr(o, k), - path.split("."), - obj, + lambda o, k: default if o is None else getattr(o, k), path.split("."), obj ) except (AttributeError, TypeError): return default @@ -260,7 +249,9 @@ def _apply_representation( has_str = getattr(cls, "__str__", object.__str__) is not object.__str__ if use_str and not has_str: - raise TypeError(f"Class {cls.__name__} must define __str__ method") + raise TypeError( + f"Class {cls.__name__} must define '__str__' method when 'use_str=True'" + ) def build_repr(self: _ClassT) -> str: str_args = f"'{self}'" if use_str else _format_fields(self, fields, joiner=", ") @@ -291,6 +282,6 @@ def representation( ) -> typing.Union[_ClassT, typing.Callable[[_ClassT], _ClassT]]: return ( _apply_representation(fields[0], fields[1:], use_str) - if fields and isinstance(fields[0], type) + if fields and inspect.isclass(fields[0]) else lambda cls: _apply_representation(cls, fields, use_str) ) diff --git a/tests/test_custom_types.py b/tests/test_custom_types.py index 663c5af..ce507ec 100644 --- a/tests/test_custom_types.py +++ b/tests/test_custom_types.py @@ -17,19 +17,21 @@ ) langph = LangFormattedAnyHttpUrl._LANG_PLACEHOLDER -urls = [ - (f"https://example.com/{langph}/docs", "https://example.com/docs"), - (f"http://{langph}/foo/bar", "http://foo/bar"), - (f"{langph}/foo/{langph}/bar", "foo/bar"), - ("https://example.com/foo/bar", "https://example.com/foo/bar"), - ("foo/bar", "foo/bar"), - (f"foo/{langph}/bar", "foo/bar"), - (f"foo/{langph}", "foo"), - (f"{langph}/foo", "foo"), -] - - -@pytest.mark.parametrize("input_value,expected", urls) + + +@pytest.mark.parametrize( + "input_value,expected", + [ + (f"https://example.com/{langph}/docs", "https://example.com/docs"), + (f"http://{langph}/foo/bar", "http://foo/bar"), + (f"{langph}/foo/{langph}/bar", "foo/bar"), + ("https://example.com/foo/bar", "https://example.com/foo/bar"), + ("foo/bar", "foo/bar"), + (f"foo/{langph}/bar", "foo/bar"), + (f"foo/{langph}", "foo"), + (f"{langph}/foo", "foo"), + ], +) def test_validate_success(input_value, expected): if expected == "" or expected.startswith("http"): assert LangFormattedAnyHttpUrl._validate(input_value) == AnyHttpUrl(expected) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index f7ad5cb..194d8e1 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -26,7 +26,6 @@ _BaseSyncClient, is_ssl_error, ) -from faceit.utils import REDACTED_MARKER @pytest.fixture @@ -137,7 +136,6 @@ def test_api_key_property(self, valid_uuid): client = SyncClient(valid_uuid) masked_key = client.api_key assert masked_key != valid_uuid - assert masked_key == REDACTED_MARKER client.close() # Ensure proper cleanup def test_base_headers(self, valid_uuid): @@ -197,7 +195,7 @@ def test_handle_response_http_error(self, error_response): def test_handle_response_server_error(self, server_error_response): """Test server error response handling.""" - with pytest.raises(httpx.HTTPStatusError): + with pytest.raises(APIError): BaseAPIClient._handle_response(server_error_response) def test_handle_response_invalid_json(self, invalid_json_response): @@ -643,11 +641,7 @@ def test_retry_predicate(self, server_error_response, error_response): # Should retry on protocol error assert retry_predicate(httpx.RemoteProtocolError("Protocol error")) - assert retry_predicate( - httpx.HTTPStatusError( - "Server error", request=Mock(), response=server_error_response - ) - ) + assert retry_predicate(APIError(500, server_error_response.text)) assert not retry_predicate( httpx.HTTPStatusError( diff --git a/tests/test_repr.py b/tests/test_repr.py index 435b445..c5630df 100644 --- a/tests/test_repr.py +++ b/tests/test_repr.py @@ -5,7 +5,7 @@ from faceit.utils import _UNINITIALIZED_MARKER, representation -DEFINE_STR_ERROR_MSG = "must define __str__ method" +DEFINE_STR_ERROR_MSG = "must define '__str__' method" @pytest.fixture(scope="session") diff --git a/tests/test_resource_aggregator.py b/tests/test_resource_aggregator.py deleted file mode 100644 index 7f2659c..0000000 --- a/tests/test_resource_aggregator.py +++ /dev/null @@ -1,54 +0,0 @@ -from dataclasses import dataclass - -import pytest - -from faceit.resources.resource_aggregator import ( - BaseResources, - resource_aggregator, -) - - -class DummyClient: - pass - - -@dataclass(eq=False, frozen=True) -class DummyResource: - client: DummyClient - raw: bool = False - - -@resource_aggregator -class ResourcesForTest(BaseResources): - resource1: DummyResource - raw_resource2: DummyResource - - -@pytest.fixture -def client() -> DummyClient: - return DummyClient() - - -@pytest.fixture -def resources(client: DummyClient) -> ResourcesForTest: - return ResourcesForTest(client) - - -def test_properties_created(resources: ResourcesForTest): - assert hasattr(resources, "resource1") - assert hasattr(resources, "raw_resource2") - - -def test_property_types(resources: ResourcesForTest): - assert isinstance(resources.resource1, DummyResource) - assert isinstance(resources.raw_resource2, DummyResource) - - -def test_property_raw_flag(resources: ResourcesForTest): - assert resources.resource1.raw is False - assert resources.raw_resource2.raw is True - - -def test_client_passed(resources: ResourcesForTest, client: DummyClient): - assert resources.resource1.client is client - assert resources.raw_resource2.client is client diff --git a/tests/test_skill_level.py b/tests/test_skill_level.py new file mode 100644 index 0000000..6202a0f --- /dev/null +++ b/tests/test_skill_level.py @@ -0,0 +1,58 @@ +import pytest +from faceit import SkillLevel, GameID + + +class TestSkillLevelComparison: + @pytest.fixture + def cs2_lvl1(self): + return SkillLevel.get_level(GameID.CS2, level=1) + + @pytest.fixture + def cs2_lvl2(self): + return SkillLevel.get_level(GameID.CS2, level=2) + + @pytest.fixture + def csgo_lvl1(self): + return SkillLevel.get_level(GameID.CSGO, level=1) + + def test_equality(self, cs2_lvl1): + same_lvl1 = SkillLevel.get_level(GameID.CS2, level=1) + assert cs2_lvl1 == same_lvl1 + assert cs2_lvl1 is same_lvl1 + assert hash(cs2_lvl1) == hash(same_lvl1) + + def test_inequality(self, cs2_lvl1, cs2_lvl2, csgo_lvl1): + assert cs2_lvl1 != cs2_lvl2 + assert cs2_lvl1 != csgo_lvl1 + + def test_less_than(self, cs2_lvl1, cs2_lvl2): + assert cs2_lvl1 < cs2_lvl2 + assert not (cs2_lvl2 < cs2_lvl1) + + def test_total_ordering_methods(self, cs2_lvl1, cs2_lvl2): + assert cs2_lvl2 > cs2_lvl1 + assert cs2_lvl1 <= cs2_lvl2 + assert cs2_lvl2 >= cs2_lvl1 + assert cs2_lvl1 <= cs2_lvl1 + + def test_different_games_comparison_raises_error(self, cs2_lvl1, csgo_lvl1): + with pytest.raises( + TypeError, match="Cannot compare levels from different games" + ): + _ = cs2_lvl1 < csgo_lvl1 + + with pytest.raises(TypeError): + _ = cs2_lvl1 >= csgo_lvl1 + + def test_comparison_with_other_types(self, cs2_lvl1): + assert cs2_lvl1 != "level 1" + assert cs2_lvl1 != 1 + with pytest.raises(TypeError): + assert cs2_lvl1 < 5 + + def test_sorting(self): + levels = SkillLevel.get_all_levels(GameID.CS2) + shuffled = sorted(levels, key=lambda x: x.level, reverse=True) + shuffled.sort() + assert shuffled == levels + assert all(shuffled[i] < shuffled[i + 1] for i in range(len(shuffled) - 1))