diff --git a/docs/migration.md b/docs/migration.md index 3b47f9aad..8740d08c4 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -545,6 +545,57 @@ await client.read_resource("test://resource") await client.read_resource(str(my_any_url)) ``` +### Resource templates: matching behavior changes + +Resource template matching has been rewritten with RFC 6570 support. +Several behaviors have changed: + +**Path-safety checks applied by default.** Extracted parameter values +containing `..` as a path component, a null byte, or looking like an +absolute path (`/etc/passwd`, `C:\Windows`) now cause the read to +fail — the client receives an "Unknown resource" error and template +iteration stops, so a strict template's rejection does not fall +through to a later permissive template. This is checked on the +decoded value, so `..%2Fetc`, `%2E%2E`, and `%00` are caught too. +Note that `..` is only flagged as a standalone path component, so +values like `v1.0..v2.0` or `HEAD~3..HEAD` are unaffected. + +If a parameter legitimately needs to receive absolute paths or +traversal sequences, exempt it: + +```python +from mcp.server.mcpserver import ResourceSecurity + +@mcp.resource( + "inspect://file/{+target}", + security=ResourceSecurity(exempt_params={"target"}), +) +def inspect_file(target: str) -> str: ... +``` + +**Template literals and structural delimiters match exactly.** The +previous matcher built a regex without escaping, so `.` matched any +character and simple `{var}` swallowed `?`, `#`, `&`, and `,`. Now +`data://v1.0/{id}` no longer matches `data://v1X0/42`, and +`api://{id}` no longer matches `api://foo?x=1` — use `api://{id}{?x}` +or `api://{+id}` if you need to capture a query tail. + +**Template syntax errors surface at decoration time.** Unclosed +braces, duplicate variable names, and unsupported syntax raise +`InvalidUriTemplate` when the decorator runs rather than `re.error` +on first match. + +**Static URIs with Context-only handlers now error.** A non-template +URI paired with a handler that takes only a `Context` parameter +previously registered but was silently unreachable (the resource +could never be read). This now raises `ValueError` at decoration time. +Context injection for static resources is planned; until then, use a +template with at least one variable or access context through other +means. + +See [Resources](server/resources.md) for the full template syntax, +security configuration, and filesystem safety utilities. + ### Lowlevel `Server`: constructor parameters are now keyword-only All parameters after `name` are now keyword-only. If you were passing `version` or other parameters positionally, use keyword arguments instead: diff --git a/docs/server/resources.md b/docs/server/resources.md new file mode 100644 index 000000000..9aff5d94c --- /dev/null +++ b/docs/server/resources.md @@ -0,0 +1,408 @@ +# Resources + +Resources give clients read-only access to your data. Think of them as +the files, records, and reference material an LLM might need as context: +a config file, a database schema, the contents of a document, yesterday's +log output. + +Resources are different from tools. A tool is something the model +*calls* to make something happen: send an email, run a query, write a +file. A resource is something the application *reads* to understand the +world. Reading a resource should not change state or kick off expensive +work. If it does either, you probably want a tool. + +For the protocol-level details (message formats, lifecycle, pagination), +see the [MCP resources specification](https://modelcontextprotocol.io/specification/latest/server/resources). + +## A static resource + +The simplest case is a fixed URI that returns the same kind of content +every time. + +```python +from mcp.server.mcpserver import MCPServer + +mcp = MCPServer("docs-server") + + +@mcp.resource("config://features") +def feature_flags() -> str: + return '{"beta_search": true, "new_editor": false}' +``` + +When a client reads `config://features`, your function runs and the +return value is sent back. Return `str` for text, `bytes` for binary +data, or anything JSON-serializable. + +The URI scheme (`config://` here) is up to you. The protocol reserves +`file://` and `https://` for their usual meanings, but custom schemes +like `config://`, `db://`, or `notes://` are encouraged. They make the +URI self-describing. + +## Resource templates + +Most interesting data is parameterized. You don't want to register a +separate resource for every user, every file, every database row. +Instead, register a template with placeholders: + +```python +@mcp.resource("tickets://{ticket_id}") +def get_ticket(ticket_id: str) -> dict[str, str]: + ticket = helpdesk.find(ticket_id) + return {"id": ticket_id, "subject": ticket.subject, "status": ticket.status} +``` + +The `{ticket_id}` in the URI maps to the `ticket_id` parameter in your +function. A client reading `tickets://TKT-1042` calls +`get_ticket("TKT-1042")`. Reading `tickets://TKT-2001` calls +`get_ticket("TKT-2001")`. One template, unlimited resources. + +### Parameter types + +Extracted values arrive as strings, but you can declare a more specific +type and the SDK will convert: + +```python +@mcp.resource("orders://{order_id}") +def get_order(order_id: int) -> dict[str, Any]: + # "12345" from the URI becomes the int 12345 + return db.orders.get(order_id) +``` + +### Multi-segment paths + +A plain `{name}` stops at the first slash. If your template is +`files://{name}`, a client reading `files://readme.txt` matches fine, +but `files://guides/intro.md` does not: the slash after `guides` ends +the match, and `intro.md` is left over. + +To capture the whole path including slashes, use `{+name}`: + +```python +@mcp.resource("files://{+path}") +def read_file(path: str) -> str: + # files://readme.txt gives path = "readme.txt" + # files://guides/intro.md gives path = "guides/intro.md" + ... +``` + +Reach for `{+name}` whenever the value is hierarchical: filesystem +paths, nested object keys, URL paths you're proxying. + +### Query parameters + +Say you want clients to read `logs://api` for recent logs, but also +`logs://api?since=15m&level=error` when they need to narrow it down. +The `?since=15m&level=error` part is optional configuration, and you +don't want a separate template for every combination. + +Declare these as query parameters with `{?name}`, or list several at +once with `{?a,b,c}`: + +```python +@mcp.resource("logs://{service}{?since,level}") +def tail_logs(service: str, since: str = "1h", level: str = "info") -> str: + return log_store.query(service, since=since, min_level=level) +``` + +The path identifies *which* resource; the query tunes *how* you read +it. + +Query params are matched leniently: order doesn't matter, extras are +ignored, and omitted params fall through to your function defaults. + +### Path segments as a list + +If you want each path segment as a separate list item rather than one +string with slashes, use `{/name*}`: + +```python +@mcp.resource("tree://nodes{/path*}") +def walk_tree(path: list[str]) -> dict[str, Any]: + # tree://nodes/a/b/c gives path = ["a", "b", "c"] + node = root + for segment in path: + node = node.children[segment] + return node.to_dict() +``` + +### Template reference + +The template syntax follows [RFC 6570](https://datatracker.ietf.org/doc/html/rfc6570). +The most common patterns: + +| Pattern | Example input | You get | +|--------------|-----------------------|-------------------------| +| `{name}` | `alice` | `"alice"` | +| `{name}` | `docs/intro.md` | *no match* (stops at `/`) | +| `{+path}` | `docs/intro.md` | `"docs/intro.md"` | +| `{.ext}` | `.json` | `"json"` | +| `{/segment}` | `/v2` | `"v2"` | +| `{?key}` | `?key=value` | `"value"` | +| `{?a,b}` | `?a=1&b=2` | `"1"`, `"2"` | +| `{/path*}` | `/a/b/c` | `["a", "b", "c"]` | + +## Security + +Template parameters come from the client. If they flow into filesystem +or database operations, a hostile client can try path traversal +(`../../etc/passwd`) or injection attacks. + +### What the SDK checks by default + +Before your handler runs, the SDK rejects any parameter that: + +- would escape its starting directory via `..` components +- looks like an absolute path (`/etc/passwd`, `C:\Windows`) + +The `..` check is component-based, not a substring scan. Values like +`v1.0..v2.0` or `HEAD~3..HEAD` pass because `..` is not a standalone +path segment there. + +These checks apply to the decoded value, so they catch traversal +regardless of how it was encoded in the URI (`../etc`, `..%2Fetc`, +`%2E%2E/etc`, `..%5Cetc` all get caught). + +A request that trips these checks is treated as a non-match: the SDK +raises `ResourceError("Unknown resource: {uri}")`, which the client +receives as a JSON-RPC error. Your handler never sees the bad input. + +### Filesystem handlers: use safe_join + +The built-in checks stop obvious attacks but can't know your sandbox +boundary. For filesystem access, use `safe_join` to resolve the path +and verify it stays inside your base directory: + +```python +from mcp.shared.path_security import safe_join + +DOCS_ROOT = "/srv/app/docs" + + +@mcp.resource("files://{+path}") +def read_file(path: str) -> str: + full_path = safe_join(DOCS_ROOT, path) + return full_path.read_text() +``` + +`safe_join` catches symlink escapes, `..` sequences, and absolute-path +tricks that a simple string check would miss. If the resolved path +escapes the base, it raises `PathEscapeError`, which surfaces to the +client as a `ResourceError`. + +### When the defaults get in the way + +Sometimes the checks block legitimate values. An external-tool wrapper +might intentionally receive an absolute path, or a parameter might be a +relative reference like `../sibling` that your handler interprets +safely without touching the filesystem. Exempt that parameter: + +```python +from mcp.server.mcpserver import ResourceSecurity + + +@mcp.resource( + "inspect://file/{+target}", + security=ResourceSecurity(exempt_params={"target"}), +) +def inspect_file(target: str) -> str: + # target might be "/usr/bin/python3"; this handler is trusted + return describe_binary(target) +``` + +Or relax the policy for the whole server: + +```python +mcp = MCPServer( + resource_security=ResourceSecurity(reject_path_traversal=False), +) +``` + +The configurable checks: + +| Setting | Default | What it does | +|-------------------------|---------|-------------------------------------| +| `reject_path_traversal` | `True` | Rejects `..` sequences that escape the starting directory | +| `reject_absolute_paths` | `True` | Rejects `/foo`, `C:\foo`, UNC paths | +| `reject_null_bytes` | `True` | Rejects values containing `\x00` | +| `exempt_params` | empty | Parameter names to skip checks for | + +These checks are a heuristic pre-filter; for filesystem access, +`safe_join` remains the containment boundary. + +## Errors + +If your handler can't fulfil the request, raise an exception. The SDK +turns it into an error response: + +```python +@mcp.resource("articles://{article_id}") +def get_article(article_id: str) -> str: + article = db.articles.find(article_id) + if article is None: + raise ValueError(f"No article with id {article_id}") + return article.content +``` + +## Resources on the low-level server + +If you're building on the low-level `Server`, you register handlers for +the `resources/list` and `resources/read` protocol methods directly. +There's no decorator; you return the protocol types yourself. + +### Static resources + +For fixed URIs, keep a registry and dispatch on exact match: + +```python +from typing import Any + +from mcp.server.lowlevel import Server +from mcp.types import ( + ListResourcesResult, + PaginatedRequestParams, + ReadResourceRequestParams, + ReadResourceResult, + Resource, + TextResourceContents, +) +from mcp.server.context import ServerRequestContext + +RESOURCES = { + "config://features": lambda: '{"beta_search": true}', + "status://health": lambda: check_health(), +} + + +async def on_list_resources( + ctx: ServerRequestContext[Any], params: PaginatedRequestParams | None +) -> ListResourcesResult: + return ListResourcesResult( + resources=[Resource(name=uri, uri=uri) for uri in RESOURCES] + ) + + +async def on_read_resource( + ctx: ServerRequestContext[Any], params: ReadResourceRequestParams +) -> ReadResourceResult: + if (producer := RESOURCES.get(params.uri)) is not None: + return ReadResourceResult( + contents=[TextResourceContents(uri=params.uri, text=producer())] + ) + raise ValueError(f"Unknown resource: {params.uri}") + + +server = Server( + "my-server", + on_list_resources=on_list_resources, + on_read_resource=on_read_resource, +) +``` + +The list handler tells clients what's available; the read handler +serves the content. Check your registry first, fall through to +templates (below) if you have any, then raise for anything else. + +### Templates + +The template engine `MCPServer` uses lives in `mcp.shared.uri_template` +and works on its own. You get the same parsing and matching; you wire +up the routing and security policy yourself. + +#### Matching requests + +Parse your templates once, then match incoming URIs against them in +your read handler: + +```python +from typing import Any + +from mcp.server.context import ServerRequestContext +from mcp.server.lowlevel import Server +from mcp.shared.uri_template import UriTemplate +from mcp.types import ReadResourceRequestParams, ReadResourceResult, TextResourceContents + +TEMPLATES = { + "files": UriTemplate.parse("files://{+path}"), + "row": UriTemplate.parse("db://{table}/{id}"), +} + + +async def on_read_resource( + ctx: ServerRequestContext[Any], params: ReadResourceRequestParams +) -> ReadResourceResult: + if (vars := TEMPLATES["files"].match(params.uri)) is not None: + content = read_file_safely(vars["path"]) + return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=content)]) + + if (vars := TEMPLATES["row"].match(params.uri)) is not None: + row = db.get(vars["table"], int(vars["id"])) + return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=row.to_json())]) + + raise ValueError(f"Unknown resource: {params.uri}") + + +server = Server("my-server", on_read_resource=on_read_resource) +``` + +`UriTemplate.match()` returns the extracted variables or `None`. URL +decoding happens inside `match()`; the decoded values are returned +as-is without path-safety validation. + +Values come out as strings. Convert them yourself: `int(vars["id"])`, +`Path(vars["path"])`, whatever your handler needs. + +#### Applying security checks + +The path traversal and absolute-path checks that `MCPServer` runs by +default are in `mcp.shared.path_security`. Call them before using an +extracted value: + +```python +from mcp.shared.path_security import contains_path_traversal, is_absolute_path, safe_join + +DOCS_ROOT = "/srv/app/docs" + + +def read_file_safely(path: str) -> str: + if contains_path_traversal(path) or is_absolute_path(path): + raise ValueError("rejected") + return safe_join(DOCS_ROOT, path).read_text() +``` + +If a parameter isn't a filesystem path (say, a git ref or a search +query), skip the checks for that value. You control the policy per +handler rather than through a config object. + +#### Listing templates + +Clients discover templates through `resources/templates/list`. Return +the protocol `ResourceTemplate` type, using the same template strings +you parsed above: + +```python +from typing import Any + +from mcp.types import ListResourceTemplatesResult, PaginatedRequestParams, ResourceTemplate + + +async def on_list_resource_templates( + ctx: ServerRequestContext[Any], params: PaginatedRequestParams | None +) -> ListResourceTemplatesResult: + return ListResourceTemplatesResult( + resource_templates=[ + ResourceTemplate(name="files", uri_template=str(TEMPLATES["files"])), + ResourceTemplate(name="row", uri_template=str(TEMPLATES["row"])), + ] + ) + + +server = Server( + "my-server", + on_read_resource=on_read_resource, + on_list_resource_templates=on_list_resource_templates, +) +``` + +`str(template)` gives back the original template string, so your list +handler and your matching logic can share one source of truth. diff --git a/mkdocs.yml b/mkdocs.yml index 3a555785a..7568ba28a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -16,6 +16,8 @@ nav: - Migration Guide: migration.md - Documentation: - Concepts: concepts.md + - Server: + - Resources: server/resources.md - Low-Level Server: low-level-server.md - Authorization: authorization.md - Testing: testing.md diff --git a/src/mcp/server/mcpserver/__init__.py b/src/mcp/server/mcpserver/__init__.py index 0857e38bd..35c98a00c 100644 --- a/src/mcp/server/mcpserver/__init__.py +++ b/src/mcp/server/mcpserver/__init__.py @@ -3,7 +3,16 @@ from mcp.types import Icon from .context import Context +from .resources import DEFAULT_RESOURCE_SECURITY, ResourceSecurity from .server import MCPServer from .utilities.types import Audio, Image -__all__ = ["MCPServer", "Context", "Image", "Audio", "Icon"] +__all__ = [ + "MCPServer", + "Context", + "Image", + "Audio", + "Icon", + "ResourceSecurity", + "DEFAULT_RESOURCE_SECURITY", +] diff --git a/src/mcp/server/mcpserver/resources/__init__.py b/src/mcp/server/mcpserver/resources/__init__.py index b5805fb34..a6cdfa106 100644 --- a/src/mcp/server/mcpserver/resources/__init__.py +++ b/src/mcp/server/mcpserver/resources/__init__.py @@ -1,6 +1,11 @@ from .base import Resource from .resource_manager import ResourceManager -from .templates import ResourceTemplate +from .templates import ( + DEFAULT_RESOURCE_SECURITY, + ResourceSecurity, + ResourceSecurityError, + ResourceTemplate, +) from .types import ( BinaryResource, DirectoryResource, @@ -20,4 +25,7 @@ "DirectoryResource", "ResourceTemplate", "ResourceManager", + "ResourceSecurity", + "ResourceSecurityError", + "DEFAULT_RESOURCE_SECURITY", ] diff --git a/src/mcp/server/mcpserver/resources/resource_manager.py b/src/mcp/server/mcpserver/resources/resource_manager.py index 6bf17376d..5aaccebd3 100644 --- a/src/mcp/server/mcpserver/resources/resource_manager.py +++ b/src/mcp/server/mcpserver/resources/resource_manager.py @@ -8,7 +8,7 @@ from pydantic import AnyUrl from mcp.server.mcpserver.resources.base import Resource -from mcp.server.mcpserver.resources.templates import ResourceTemplate +from mcp.server.mcpserver.resources.templates import DEFAULT_RESOURCE_SECURITY, ResourceSecurity, ResourceTemplate from mcp.server.mcpserver.utilities.logging import get_logger from mcp.types import Annotations, Icon @@ -64,6 +64,7 @@ def add_template( icons: list[Icon] | None = None, annotations: Annotations | None = None, meta: dict[str, Any] | None = None, + security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ) -> ResourceTemplate: """Add a template from a function.""" template = ResourceTemplate.from_function( @@ -76,12 +77,23 @@ def add_template( icons=icons, annotations=annotations, meta=meta, + security=security, ) self._templates[template.uri_template] = template return template async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContextT, RequestT]) -> Resource: - """Get resource by URI, checking concrete resources first, then templates.""" + """Get resource by URI, checking concrete resources first, then templates. + + Note: + Pydantic's ``AnyUrl`` normalises percent-encoding and + resolves ``..`` segments during validation, so a value + constructed as ``AnyUrl("file:///a/%2E%2E/b")`` arrives + here as ``file:///b``. The JSON-RPC protocol layer passes + raw ``str`` values and is unaffected, but internal callers + wrapping URIs in ``AnyUrl`` should be aware that security + checks see the already-normalised form. + """ uri_str = str(uri) logger.debug("Getting resource", extra={"uri": uri_str}) @@ -91,7 +103,7 @@ async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContext # Then check templates for template in self._templates.values(): - if params := template.matches(uri_str): + if (params := template.matches(uri_str)) is not None: try: return await template.create_resource(uri_str, params, context=context) except Exception as e: # pragma: no cover diff --git a/src/mcp/server/mcpserver/resources/templates.py b/src/mcp/server/mcpserver/resources/templates.py index 2d612657c..ce21ce8b0 100644 --- a/src/mcp/server/mcpserver/resources/templates.py +++ b/src/mcp/server/mcpserver/resources/templates.py @@ -3,16 +3,17 @@ from __future__ import annotations import inspect -import re -from collections.abc import Callable +from collections.abc import Callable, Mapping, Set +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any -from urllib.parse import unquote from pydantic import BaseModel, Field, validate_call from mcp.server.mcpserver.resources.types import FunctionResource, Resource from mcp.server.mcpserver.utilities.context_injection import find_context_parameter, inject_context from mcp.server.mcpserver.utilities.func_metadata import func_metadata +from mcp.shared.path_security import contains_path_traversal, is_absolute_path +from mcp.shared.uri_template import UriTemplate from mcp.types import Annotations, Icon if TYPE_CHECKING: @@ -20,6 +21,82 @@ from mcp.server.mcpserver.context import Context +@dataclass(frozen=True) +class ResourceSecurity: + """Security policy applied to extracted resource template parameters. + + These checks run after :meth:`~mcp.shared.uri_template.UriTemplate.match` + has extracted and decoded parameter values. They catch path-traversal + and absolute-path injection regardless of how the value was encoded in + the URI (literal, ``%2F``, ``%5C``, ``%2E%2E``). + + Example:: + + # Opt out for a parameter that legitimately contains .. + @mcp.resource( + "git://diff/{+range}", + security=ResourceSecurity(exempt_params={"range"}), + ) + def git_diff(range: str) -> str: ... + """ + + reject_path_traversal: bool = True + """Reject values containing ``..`` as a path component.""" + + reject_absolute_paths: bool = True + """Reject values that look like absolute filesystem paths.""" + + reject_null_bytes: bool = True + """Reject values containing NUL (``\\x00``). Null bytes defeat string + comparisons (``"..\\x00" != ".."``) and can cause truncation in C + extensions or subprocess calls.""" + + exempt_params: Set[str] = field(default_factory=frozenset[str]) + """Parameter names to skip all checks for.""" + + def validate(self, params: Mapping[str, str | list[str]]) -> str | None: + """Check all parameter values against the configured policy. + + Args: + params: Extracted template parameters. List values (from + explode variables) are checked element-wise. + + Returns: + The name of the first parameter that fails, or ``None`` if + all values pass. + """ + for name, value in params.items(): + if name in self.exempt_params: + continue + values = value if isinstance(value, list) else [value] + for v in values: + if self.reject_null_bytes and "\0" in v: + return name + if self.reject_path_traversal and contains_path_traversal(v): + return name + if self.reject_absolute_paths and is_absolute_path(v): + return name + return None + + +DEFAULT_RESOURCE_SECURITY = ResourceSecurity() +"""Secure-by-default policy: traversal, absolute paths, and null bytes rejected.""" + + +class ResourceSecurityError(ValueError): + """Raised when an extracted parameter fails :class:`ResourceSecurity` checks. + + Distinct from a simple ``None`` non-match so that template + iteration can stop at the first security rejection rather than + falling through to a later, possibly more permissive, template. + """ + + def __init__(self, template: str, param: str) -> None: + super().__init__(f"Parameter {param!r} of template {template!r} failed security validation") + self.template = template + self.param = param + + class ResourceTemplate(BaseModel): """A template for dynamically creating resources.""" @@ -34,6 +111,8 @@ class ResourceTemplate(BaseModel): fn: Callable[..., Any] = Field(exclude=True) parameters: dict[str, Any] = Field(description="JSON schema for function parameters") context_kwarg: str | None = Field(None, description="Name of the kwarg that should receive context") + parsed_template: UriTemplate = Field(exclude=True, description="Parsed RFC 6570 template") + security: ResourceSecurity = Field(exclude=True, description="Path-safety policy for extracted parameters") @classmethod def from_function( @@ -48,12 +127,20 @@ def from_function( annotations: Annotations | None = None, meta: dict[str, Any] | None = None, context_kwarg: str | None = None, + security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ) -> ResourceTemplate: - """Create a template from a function.""" + """Create a template from a function. + + Raises: + InvalidUriTemplate: If ``uri_template`` is malformed or uses + unsupported RFC 6570 features. + """ func_name = name or fn.__name__ if func_name == "": raise ValueError("You must provide a name for lambda functions") # pragma: no cover + parsed = UriTemplate.parse(uri_template) + # Find context parameter if it exists if context_kwarg is None: # pragma: no branch context_kwarg = find_context_parameter(fn) @@ -80,20 +167,35 @@ def from_function( fn=fn, parameters=parameters, context_kwarg=context_kwarg, + parsed_template=parsed, + security=security, ) - def matches(self, uri: str) -> dict[str, Any] | None: - """Check if URI matches template and extract parameters. + def matches(self, uri: str) -> dict[str, str | list[str]] | None: + """Check if a URI matches this template and extract parameters. + + Delegates to :meth:`UriTemplate.match` for RFC 6570 extraction, + then applies this template's :class:`ResourceSecurity` policy + (path traversal, absolute paths). - Extracted parameters are URL-decoded to handle percent-encoded characters. + Returns: + Extracted parameters on success, or ``None`` if the URI + doesn't match the template. + + Raises: + ResourceSecurityError: If the URI matches but an extracted + parameter fails security validation. Raising (rather + than returning ``None``) prevents the resource manager + from silently falling through to a later, possibly more + permissive, template. """ - # Convert template to regex pattern - pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)") - match = re.match(f"^{pattern}$", uri) - if match: - # URL-decode all extracted parameter values - return {key: unquote(value) for key, value in match.groupdict().items()} - return None + params = self.parsed_template.match(uri) + if params is None: + return None + failed = self.security.validate(params) + if failed is not None: + raise ResourceSecurityError(self.uri_template, failed) + return params async def create_resource( self, diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index 2a7a58117..4a5462fe9 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -5,7 +5,6 @@ import base64 import inspect import json -import re from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import Any, Generic, Literal, TypeVar, overload @@ -33,7 +32,13 @@ from mcp.server.mcpserver.context import Context from mcp.server.mcpserver.exceptions import ResourceError from mcp.server.mcpserver.prompts import Prompt, PromptManager -from mcp.server.mcpserver.resources import FunctionResource, Resource, ResourceManager +from mcp.server.mcpserver.resources import ( + DEFAULT_RESOURCE_SECURITY, + FunctionResource, + Resource, + ResourceManager, + ResourceSecurity, +) from mcp.server.mcpserver.tools import Tool, ToolManager from mcp.server.mcpserver.utilities.context_injection import find_context_parameter from mcp.server.mcpserver.utilities.logging import configure_logging, get_logger @@ -43,6 +48,7 @@ from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError +from mcp.shared.uri_template import UriTemplate from mcp.types import ( Annotations, BlobResourceContents, @@ -144,7 +150,9 @@ def __init__( warn_on_duplicate_prompts: bool = True, lifespan: Callable[[MCPServer[LifespanResultT]], AbstractAsyncContextManager[LifespanResultT]] | None = None, auth: AuthSettings | None = None, + resource_security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ): + self._resource_security = resource_security self.settings = Settings( debug=debug, log_level=log_level, @@ -626,6 +634,7 @@ def resource( icons: list[Icon] | None = None, annotations: Annotations | None = None, meta: dict[str, Any] | None = None, + security: ResourceSecurity | None = None, ) -> Callable[[_CallableT], _CallableT]: """Decorator to register a function as a resource. @@ -635,8 +644,9 @@ def resource( - bytes for binary content - other types will be converted to JSON - If the URI contains parameters (e.g. "resource://{param}") or the function - has parameters, it will be registered as a template resource. + If the URI contains parameters (e.g. "resource://{param}"), it is + registered as a template resource. Otherwise it is registered as a + static resource; function parameters on a static URI raise an error. Args: uri: URI for the resource (e.g. "resource://my-resource" or "resource://{param}") @@ -647,6 +657,9 @@ def resource( icons: Optional list of icons for the resource annotations: Optional annotations for the resource meta: Optional metadata dictionary for the resource + security: Path-safety policy for extracted template parameters. + Defaults to the server's ``resource_security`` setting. + Only applies to template resources. Example: ```python @@ -668,6 +681,13 @@ async def get_weather(city: str) -> str: data = await fetch_weather(city) return f"Weather for {city}: {data}" ``` + + Raises: + InvalidUriTemplate: If ``uri`` is not a valid RFC 6570 template. + ValueError: If URI template parameters don't match the + function's parameters. + TypeError: If the decorator is applied without being called + (``@resource`` instead of ``@resource("uri")``). """ # Check if user passed function directly instead of calling decorator if callable(uri): @@ -676,22 +696,20 @@ async def get_weather(city: str) -> str: "Did you forget to call it? Use @resource('uri') instead of @resource" ) + # Parse once, early — surfaces malformed-template errors at + # decoration time with a clear position, and gives us correct + # variable names for all RFC 6570 operators. + parsed = UriTemplate.parse(uri) + uri_params = set(parsed.variable_names) + def decorator(fn: _CallableT) -> _CallableT: - # Check if this should be a template sig = inspect.signature(fn) - has_uri_params = "{" in uri and "}" in uri - has_func_params = bool(sig.parameters) - - if has_uri_params or has_func_params: - # Check for Context parameter to exclude from validation - context_param = find_context_parameter(fn) - - # Validate that URI params match function params (excluding context) - uri_params = set(re.findall(r"{(\w+)}", uri)) - # We need to remove the context_param from the resource function if - # there is any. - func_params = {p for p in sig.parameters.keys() if p != context_param} + context_param = find_context_parameter(fn) + func_params = {p for p in sig.parameters.keys() if p != context_param} + # Template/static is decided purely by the URI: variables + # present means template, none means static. + if uri_params: if uri_params != func_params: raise ValueError( f"Mismatch between URI parameters {uri_params} and function parameters {func_params}" @@ -707,9 +725,24 @@ def decorator(fn: _CallableT) -> _CallableT: mime_type=mime_type, icons=icons, annotations=annotations, + security=security if security is not None else self._resource_security, meta=meta, ) else: + if func_params: + raise ValueError( + f"Resource {uri!r} has no URI template variables, but the " + f"handler declares parameters {func_params}. Add matching " + f"{{...}} variables to the URI or remove the parameters." + ) + if context_param is not None: + raise ValueError( + f"Resource {uri!r} has no URI template variables, but the " + f"handler declares a Context parameter. Context injection " + f"for static resources is not yet supported but is planned. " + f"For now, add a template variable to the URI or remove the " + f"Context parameter." + ) # Register as regular resource resource = FunctionResource.from_function( fn=fn, diff --git a/src/mcp/shared/path_security.py b/src/mcp/shared/path_security.py new file mode 100644 index 000000000..dfcd479be --- /dev/null +++ b/src/mcp/shared/path_security.py @@ -0,0 +1,158 @@ +"""Filesystem path safety primitives for resource handlers. + +These functions help MCP servers defend against path-traversal attacks +when extracted URI template parameters are used in filesystem +operations. They are standalone utilities usable from both the +high-level :class:`~mcp.server.mcpserver.MCPServer` and lowlevel server +implementations. + +The canonical safe pattern:: + + from mcp.shared.path_security import safe_join + + @mcp.resource("file://docs/{+path}") + def read_doc(path: str) -> str: + return safe_join("/data/docs", path).read_text() +""" + +import string +from pathlib import Path + +__all__ = ["PathEscapeError", "contains_path_traversal", "is_absolute_path", "safe_join"] + + +class PathEscapeError(ValueError): + """Raised by :func:`safe_join` when the resolved path escapes the base.""" + + +def contains_path_traversal(value: str) -> bool: + r"""Check whether a value, treated as a relative path, escapes its origin. + + This is a **base-free** check: it does not know the sandbox root, so + it detects only whether ``..`` components would move above the + starting point. Use :func:`safe_join` when you know the root — it + additionally catches symlink escapes and absolute-path injection. + + The check is component-based: ``..`` is dangerous only as a + standalone path segment, not as a substring. Both ``/`` and ``\`` + are treated as separators. + + Example:: + + >>> contains_path_traversal("a/b/c") + False + >>> contains_path_traversal("../etc") + True + >>> contains_path_traversal("a/../../b") + True + >>> contains_path_traversal("a/../b") + False + >>> contains_path_traversal("1.0..2.0") + False + >>> contains_path_traversal("..") + True + + Args: + value: A string that may be used as a filesystem path. + + Returns: + ``True`` if the path would escape its starting directory. + """ + depth = 0 + for part in value.replace("\\", "/").split("/"): + if part == "..": + depth -= 1 + if depth < 0: + return True + elif part and part != ".": + depth += 1 + return False + + +def is_absolute_path(value: str) -> bool: + r"""Check whether a value is an absolute filesystem path. + + Absolute paths are dangerous when joined onto a base: in Python, + ``Path("/data") / "/etc/passwd"`` yields ``/etc/passwd`` — the + absolute right-hand side silently discards the base. + + Detects POSIX absolute (``/foo``), Windows drive (``C:\foo``), + and Windows UNC/absolute (``\\server\share``, ``\foo``). + + Example:: + + >>> is_absolute_path("relative/path") + False + >>> is_absolute_path("/etc/passwd") + True + >>> is_absolute_path("C:\\Windows") + True + >>> is_absolute_path("") + False + + Args: + value: A string that may be used as a filesystem path. + + Returns: + ``True`` if the path is absolute on any common platform. + """ + if not value: + return False + if value[0] in ("/", "\\"): + return True + # Windows drive letter: C:, C:\, C:/. ASCII-only so that values + # like "Ω:namespace" are not falsely rejected. + if len(value) >= 2 and value[1] == ":" and value[0] in string.ascii_letters: + return True + return False + + +def safe_join(base: str | Path, *parts: str) -> Path: + """Join path components onto a base, rejecting escapes. + + Resolves the joined path and verifies it remains within ``base``. + This is the **gold-standard** check: it catches ``..`` traversal, + absolute-path injection, and symlink escapes that the base-free + checks cannot. + + Example:: + + >>> safe_join("/data/docs", "readme.txt") + PosixPath('/data/docs/readme.txt') + >>> safe_join("/data/docs", "../../../etc/passwd") + Traceback (most recent call last): + ... + PathEscapeError: ... + + Args: + base: The sandbox root. May be relative; it will be resolved. + parts: Path components to join. Each is checked for null bytes + and absolute form before joining. + + Returns: + The resolved path, guaranteed to be within ``base``. + + Raises: + PathEscapeError: If any part contains a null byte, any part is + absolute, or the resolved path is not contained within the + resolved base. + """ + base_resolved = Path(base).resolve() + + for part in parts: + # Null bytes pass through Path construction but fail at the + # syscall boundary with a cryptic error. Reject here so callers + # get a clear PathEscapeError instead. + if "\0" in part: + raise PathEscapeError(f"Path component contains a null byte; refusing to join onto {base_resolved}") + # Absolute parts would silently discard everything to the left + # in Path's / operator. + if is_absolute_path(part): + raise PathEscapeError(f"Path component {part!r} is absolute; refusing to join onto {base_resolved}") + + target = base_resolved.joinpath(*parts).resolve() + + if not target.is_relative_to(base_resolved): + raise PathEscapeError(f"Path {target} escapes base {base_resolved}") + + return target diff --git a/src/mcp/shared/uri_template.py b/src/mcp/shared/uri_template.py new file mode 100644 index 000000000..7fff3aa1b --- /dev/null +++ b/src/mcp/shared/uri_template.py @@ -0,0 +1,1056 @@ +"""RFC 6570 URI Templates with bidirectional support. + +Provides both expansion (template + variables → URI) and matching +(URI → variables). RFC 6570 only specifies expansion; matching is the +inverse operation needed by MCP servers to route ``resources/read`` +requests to handlers. + +Supports Levels 1-3 fully, plus Level 4 explode modifier for path-like +operators (``{/var*}``, ``{.var*}``, ``{;var*}``). The Level 4 prefix +modifier (``{var:N}``) and query-explode (``{?var*}``) are not supported. + +Matching semantics +------------------ + +Matching is not specified by RFC 6570 (§1.4 explicitly defers to regex +languages). This implementation uses a two-ended scan that never +backtracks: match time is O(n·v) where n is URI length and v is the +number of template variables. Realistic templates have v < 10, making +this effectively linear; there is no input that produces +superpolynomial time. + +A template may contain **at most one multi-segment variable** — +``{+var}``, ``{#var}``, or an explode-modified variable (``{/var*}``, +``{.var*}``, ``{;var*}``). This variable greedily consumes whatever the +surrounding bounded variables and literals do not. Two such variables +in one template are inherently ambiguous (which one gets the extra +segment?) and are rejected at parse time. + +Bounded variables before the multi-segment variable match **lazily** +(first occurrence of the following literal); those after match +**greedily** (last occurrence of the preceding literal). Templates +without a multi-segment variable match greedily throughout, identical +to regex semantics. + +Reserved expansion ``{+var}`` leaves ``?`` and ``#`` unencoded, but +the scan stops at those characters so ``{+path}{?q}`` can separate path +from query. A value containing a literal ``?`` or ``#`` expands fine +but will not round-trip through ``match()``. +""" + +from __future__ import annotations + +import re +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from typing import Literal, TypeAlias, cast +from urllib.parse import quote, unquote + +__all__ = [ + "DEFAULT_MAX_TEMPLATE_LENGTH", + "DEFAULT_MAX_VARIABLES", + "DEFAULT_MAX_URI_LENGTH", + "InvalidUriTemplate", + "Operator", + "UriTemplate", + "Variable", +] + +Operator = Literal["", "+", "#", ".", "/", ";", "?", "&"] + +_OPERATORS: frozenset[str] = frozenset({"+", "#", ".", "/", ";", "?", "&"}) + +# RFC 6570 §2.3: varname = varchar *(["."] varchar), varchar = ALPHA / DIGIT / "_" +# Dots appear only between varchar groups — not consecutive, not trailing. +# (Percent-encoded varchars are technically allowed but unseen in practice.) +_VARNAME_RE = re.compile(r"^[A-Za-z0-9_]+(?:\.[A-Za-z0-9_]+)*$") + +DEFAULT_MAX_TEMPLATE_LENGTH = 8_192 +DEFAULT_MAX_VARIABLES = 256 +DEFAULT_MAX_URI_LENGTH = 65_536 + +# RFC 3986 reserved characters, kept unencoded by {+var} and {#var}. +_RESERVED = ":/?#[]@!$&'()*+,;=" + + +@dataclass(frozen=True) +class _OperatorSpec: + """Expansion behavior for a single operator (RFC 6570 §3.2, Table in §A).""" + + prefix: str + """Leading character emitted before the first variable.""" + separator: str + """Character between variables (and between exploded list items).""" + named: bool + """Emit ``name=value`` pairs (query/path-param style) rather than bare values.""" + allow_reserved: bool + """Keep reserved characters unencoded ({+var}, {#var}).""" + + +_OPERATOR_SPECS: dict[Operator, _OperatorSpec] = { + "": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=False), + "+": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=True), + "#": _OperatorSpec(prefix="#", separator=",", named=False, allow_reserved=True), + ".": _OperatorSpec(prefix=".", separator=".", named=False, allow_reserved=False), + "/": _OperatorSpec(prefix="/", separator="/", named=False, allow_reserved=False), + ";": _OperatorSpec(prefix=";", separator=";", named=True, allow_reserved=False), + "?": _OperatorSpec(prefix="?", separator="&", named=True, allow_reserved=False), + "&": _OperatorSpec(prefix="&", separator="&", named=True, allow_reserved=False), +} + +# Per-operator stop characters for the linear scan. A bounded variable's +# value ends at the first occurrence of any character in its stop set, +# mirroring the character-class boundaries a regex would use but without +# the backtracking. +_STOP_CHARS: dict[Operator, str] = { + "": "/?#&,", # simple: everything structural is pct-encoded + "+": "?#", # reserved: / allowed, stop at query/fragment + "#": "", # fragment: tail of URI, nothing stops it + ".": "./?#", # label: stop at next . + "/": "/?#", # path segment: stop at next / + ";": ";/?#", # path-param value (may be empty: ;name) + "?": "&#", # query value (may be empty: ?name=) + "&": "&#", # query-cont value +} + + +class InvalidUriTemplate(ValueError): + """Raised when a URI template string is malformed or unsupported. + + Attributes: + template: The template string that failed to parse. + position: Character offset where the error was detected, or None + if the error is not tied to a specific position. + """ + + def __init__(self, message: str, *, template: str, position: int | None = None) -> None: + super().__init__(message) + self.template = template + self.position = position + + +@dataclass(frozen=True) +class Variable: + """A single variable within a URI template expression.""" + + name: str + operator: Operator + explode: bool = False + + +@dataclass +class _Expression: + """A parsed ``{...}`` expression: one operator, one or more variables.""" + + operator: Operator + variables: list[Variable] + + +_Part = str | _Expression + + +@dataclass(frozen=True) +class _Lit: + """A literal run in the flattened match-atom sequence.""" + + text: str + + +@dataclass(frozen=True) +class _Cap: + """A single-variable capture in the flattened match-atom sequence. + + ``ifemp`` marks the ``;`` operator's optional-equals quirk: ``{;id}`` + expands to ``;id=value`` or bare ``;id`` when the value is empty, so + the scan must accept both forms. + """ + + var: Variable + ifemp: bool = False + + +_Atom: TypeAlias = _Lit | _Cap + + +def _is_greedy(var: Variable) -> bool: + """Return True if this variable can span multiple path segments. + + Reserved/fragment expansion and explode variables are the only + constructs whose match range is not bounded by a single structural + delimiter. A template may contain at most one such variable. + """ + return var.explode or var.operator in ("+", "#") + + +def _is_str_sequence(value: object) -> bool: + """Check if value is a non-string sequence whose items are all strings.""" + if isinstance(value, str) or not isinstance(value, Sequence): + return False + seq = cast(Sequence[object], value) + return all(isinstance(item, str) for item in seq) + + +_PCT_TRIPLET_RE = re.compile(r"%[0-9A-Fa-f]{2}") + + +def _encode(value: str, *, allow_reserved: bool) -> str: + """Percent-encode a value per RFC 6570 §3.2.1. + + Simple expansion encodes everything except unreserved characters. + Reserved expansion (``{+var}``, ``{#var}``) additionally keeps + RFC 3986 reserved characters intact and passes through existing + ``%XX`` pct-triplets unchanged (RFC 6570 §3.2.3). A bare ``%`` not + followed by two hex digits is still encoded to ``%25``. + """ + if not allow_reserved: + return quote(value, safe="") + + # Reserved expansion: walk the string, pass through triplets as-is, + # quote the gaps between them. A bare % with no triplet lands in a + # gap and gets encoded normally. + out: list[str] = [] + last = 0 + for m in _PCT_TRIPLET_RE.finditer(value): + out.append(quote(value[last : m.start()], safe=_RESERVED)) + out.append(m.group()) + last = m.end() + out.append(quote(value[last:], safe=_RESERVED)) + return "".join(out) + + +def _expand_expression(expr: _Expression, variables: Mapping[str, str | Sequence[str]]) -> str: + """Expand a single ``{...}`` expression into its URI fragment. + + Walks the expression's variables, encoding and joining defined ones + according to the operator's spec. Undefined variables are skipped + (RFC 6570 §2.3); if all are undefined, the expression contributes + nothing (no prefix is emitted). + """ + spec = _OPERATOR_SPECS[expr.operator] + rendered: list[str] = [] + + for var in expr.variables: + if var.name not in variables: + # Undefined: skip entirely, no placeholder. + continue + + value = variables[var.name] + + # Explicit type guard: reject non-str scalars with a clear message + # rather than a confusing "not iterable" from the sequence branch. + if not isinstance(value, str) and not _is_str_sequence(value): + raise TypeError(f"Variable {var.name!r} must be str or a sequence of str, got {type(value).__name__}") + + if isinstance(value, str): + encoded = _encode(value, allow_reserved=spec.allow_reserved) + if spec.named: + # ; uses "name" for empty values, ?/& use "name=" (RFC §3.2.7-8) + if value == "" and expr.operator == ";": + rendered.append(var.name) + else: + rendered.append(f"{var.name}={encoded}") + else: + rendered.append(encoded) + else: + # Sequence value. + items = [_encode(v, allow_reserved=spec.allow_reserved) for v in value] + if not items: + continue + if var.explode: + # Each item gets the operator's separator; named ops repeat the key. + if spec.named: + # RFC §3.2.7 ifemp: ; omits the = for empty values. + rendered.append( + spec.separator.join( + var.name if (v == "" and expr.operator == ";") else f"{var.name}={v}" for v in items + ) + ) + else: + rendered.append(spec.separator.join(items)) + else: + # Non-explode: comma-join into a single value. + joined = ",".join(items) + rendered.append(f"{var.name}={joined}" if spec.named else joined) + + if not rendered: + return "" + return spec.prefix + spec.separator.join(rendered) + + +@dataclass(frozen=True) +class UriTemplate: + """A parsed RFC 6570 URI template. + + Construct via :meth:`parse`. Instances are immutable and hashable; + equality is based on the template string alone. + """ + + template: str + _parts: list[_Part] = field(repr=False, compare=False) + _variables: list[Variable] = field(repr=False, compare=False) + _prefix: list[_Atom] = field(repr=False, compare=False) + _greedy: Variable | None = field(repr=False, compare=False) + _suffix: list[_Atom] = field(repr=False, compare=False) + _query_variables: list[Variable] = field(repr=False, compare=False) + + @staticmethod + def is_template(value: str) -> bool: + """Check whether a string contains URI template expressions. + + A cheap heuristic for distinguishing concrete URIs from templates + without the cost of full parsing. Returns ``True`` if the string + contains at least one ``{...}`` pair. + + Example:: + + >>> UriTemplate.is_template("file://docs/{name}") + True + >>> UriTemplate.is_template("file://docs/readme.txt") + False + + Note: + This does not validate the template. A ``True`` result does + not guarantee :meth:`parse` will succeed. + """ + open_i = value.find("{") + return open_i != -1 and value.find("}", open_i) != -1 + + @classmethod + def parse( + cls, + template: str, + *, + max_length: int = DEFAULT_MAX_TEMPLATE_LENGTH, + max_variables: int = DEFAULT_MAX_VARIABLES, + ) -> UriTemplate: + """Parse a URI template string. + + Args: + template: An RFC 6570 URI template. + max_length: Maximum permitted length of the template string. + Guards against resource exhaustion. + max_variables: Maximum number of variables permitted across + all expressions. Counting variables rather than + ``{...}`` expressions closes the gap where a single + ``{v0,v1,...,vN}`` expression packs arbitrarily many + variables under one expression count. + + Raises: + InvalidUriTemplate: If the template is malformed, exceeds the + size limits, or uses unsupported RFC 6570 features. + """ + if len(template) > max_length: + raise InvalidUriTemplate( + f"Template exceeds maximum length of {max_length}", + template=template, + ) + + parts, variables = _parse(template, max_variables=max_variables) + + # Trailing {?...}/{&...} expressions are matched leniently via + # parse_qs rather than the scan: order-agnostic, partial, ignores + # extras. The path portion uses the linear scan. + path_parts, query_vars = _split_query_tail(parts) + atoms = _flatten(path_parts) + prefix, greedy, suffix = _partition_greedy(atoms, template) + + return cls( + template=template, + _parts=parts, + _variables=variables, + _prefix=prefix, + _greedy=greedy, + _suffix=suffix, + _query_variables=query_vars, + ) + + @property + def variables(self) -> list[Variable]: + """All variables in the template, in order of appearance.""" + return list(self._variables) + + @property + def variable_names(self) -> list[str]: + """All variable names in the template, in order of appearance.""" + return [v.name for v in self._variables] + + def expand(self, variables: Mapping[str, str | Sequence[str]]) -> str: + """Expand the template by substituting variable values. + + String values are percent-encoded according to their operator: + simple ``{var}`` encodes reserved characters; ``{+var}`` and + ``{#var}`` leave them intact. Sequence values are joined with + commas for non-explode variables, or with the operator's + separator for explode variables. + + Example:: + + >>> t = UriTemplate.parse("file://docs/{name}") + >>> t.expand({"name": "hello world.txt"}) + 'file://docs/hello%20world.txt' + + >>> t = UriTemplate.parse("file://docs/{+path}") + >>> t.expand({"path": "src/main.py"}) + 'file://docs/src/main.py' + + >>> t = UriTemplate.parse("/search{?q,lang}") + >>> t.expand({"q": "mcp", "lang": "en"}) + '/search?q=mcp&lang=en' + + >>> t = UriTemplate.parse("/files{/path*}") + >>> t.expand({"path": ["a", "b", "c"]}) + '/files/a/b/c' + + Args: + variables: Values for each template variable. Keys must be + strings; values must be ``str`` or a sequence of ``str``. + + Returns: + The expanded URI string. + + Note: + Per RFC 6570, variables absent from the mapping are + **silently omitted**. This is the correct behavior for + optional query parameters (``{?page}`` with no page yields + no ``?page=``), but for required path segments it produces + a structurally incomplete URI. If you need all variables + present, validate before calling:: + + missing = set(t.variable_names) - variables.keys() + if missing: + raise ValueError(f"Missing: {missing}") + + Raises: + TypeError: If a value is neither ``str`` nor an iterable of + ``str``. Non-string scalars (``int``, ``None``) are not + coerced. + """ + out: list[str] = [] + for part in self._parts: + if isinstance(part, str): + out.append(part) + else: + out.append(_expand_expression(part, variables)) + return "".join(out) + + def match(self, uri: str, *, max_uri_length: int = DEFAULT_MAX_URI_LENGTH) -> dict[str, str | list[str]] | None: + """Match a concrete URI against this template and extract variables. + + This is the inverse of :meth:`expand`. The URI is matched via a + linear scan of the template and captured values are + percent-decoded. The round-trip ``match(expand({k: v})) == {k: v}`` + holds when ``v`` does not contain its operator's separator + unencoded: ``{.ext}`` with ``ext="tar.gz"`` expands to + ``.tar.gz`` but matches back as ``ext="tar"`` since the ``.`` + pattern stops at the first dot. RFC 6570 §1.4 notes this is an + inherent reversal limitation. + + Matching is structural at the URI level only: a simple ``{name}`` + will not match across a literal ``/`` in the URI (the scan stops + there), but a percent-encoded ``%2F`` that decodes to ``/`` is + accepted as part of the value. Path-safety validation belongs at + a higher layer; see :mod:`mcp.shared.path_security`. + + Example:: + + >>> t = UriTemplate.parse("file://docs/{name}") + >>> t.match("file://docs/readme.txt") + {'name': 'readme.txt'} + >>> t.match("file://docs/hello%20world.txt") + {'name': 'hello world.txt'} + + >>> t = UriTemplate.parse("file://docs/{+path}") + >>> t.match("file://docs/src/main.py") + {'path': 'src/main.py'} + + >>> t = UriTemplate.parse("/files{/path*}") + >>> t.match("/files/a/b/c") + {'path': ['a', 'b', 'c']} + + **Query parameters** (``{?q,lang}`` at the end of a template) + are matched leniently: order-agnostic, partial, and unrecognized + params are ignored. Absent params are omitted from the result so + downstream function defaults can apply:: + + >>> t = UriTemplate.parse("logs://{service}{?since,level}") + >>> t.match("logs://api") + {'service': 'api'} + >>> t.match("logs://api?level=error") + {'service': 'api', 'level': 'error'} + >>> t.match("logs://api?level=error&since=5m&utm=x") + {'service': 'api', 'since': '5m', 'level': 'error'} + + Args: + uri: A concrete URI string. + max_uri_length: Maximum permitted length of the input URI. + Oversized inputs return ``None`` without scanning, + guarding against resource exhaustion. + + Returns: + A mapping from variable names to decoded values (``str`` for + scalar variables, ``list[str]`` for explode variables), or + ``None`` if the URI does not match the template or exceeds + ``max_uri_length``. + """ + if len(uri) > max_uri_length: + return None + + if self._query_variables: + # Two-phase: scan matches the path, the query is split and + # decoded manually. Query params may be partial, reordered, + # or include extras; absent params stay absent so downstream + # defaults can apply. Fragment is stripped first since the + # template's {?...} tail never describes a fragment. + before_fragment, _, _ = uri.partition("#") + path, _, query = before_fragment.partition("?") + result = self._scan(path) + if result is None: + return None + if query: + parsed = _parse_query(query) + for var in self._query_variables: + if var.name in parsed: + result[var.name] = parsed[var.name] + return result + + return self._scan(uri) + + def _scan(self, uri: str) -> dict[str, str | list[str]] | None: + """Run the two-ended linear scan against the path portion of a URI.""" + n = len(uri) + + # Suffix right-to-left: literals anchor via endswith, bounded + # vars take the minimum needed (rfind for the preceding literal). + # This matches regex greedy-first semantics for templates without + # a greedy var, and minimises the suffix claim when one exists. + # When there is no greedy var the suffix IS the whole template, + # so its first atom must anchor at position 0 rather than + # searching via rfind. + anchored = self._greedy is None + suffix = _scan_suffix(self._suffix, uri, n, anchored=anchored) + if suffix is None: + return None + suffix_result, suffix_start = suffix + + if self._greedy is None: + # No greedy var: suffix scan consumed the whole template. + # It must have consumed the whole URI too. + return suffix_result if suffix_start == 0 else None + + # Prefix left-to-right: each bounded var takes the minimum + # needed (find for the following literal), leaving as much as + # possible for the greedy var in the middle. + prefix = _scan_prefix(self._prefix, uri, 0, suffix_start) + if prefix is None: + return None + prefix_result, prefix_end = prefix + + # _scan_prefix is bounded by suffix_start, so this holds by + # construction. Kept as an assertion to document the invariant. + assert prefix_end <= suffix_start + + middle = uri[prefix_end:suffix_start] + greedy_value = _extract_greedy(self._greedy, middle) + if greedy_value is None: + return None + + return {**prefix_result, self._greedy.name: greedy_value, **suffix_result} + + def __str__(self) -> str: + return self.template + + +def _parse_query(query: str) -> dict[str, str]: + """Parse a query string into a name→value mapping. + + Unlike ``urllib.parse.parse_qs``, this follows RFC 3986 semantics: + ``+`` is a literal sub-delim, not a space. Form-urlencoding treats + ``+`` as space for HTML form submissions, but RFC 6570 and MCP + resource URIs follow RFC 3986 where only ``%20`` encodes a space. + + Parameter names are **not** percent-decoded. RFC 6570 expansion + never encodes variable names, so a legitimate match will always + have the name in literal form. Decoding names would let + ``%74oken=evil&token=real`` shadow the real ``token`` parameter + via first-wins. + + Duplicate keys keep the first value. Pairs without ``=`` are + treated as empty-valued. + """ + result: dict[str, str] = {} + for pair in query.split("&"): + name, _, value = pair.partition("=") + if name and name not in result: + result[name] = unquote(value) + return result + + +def _extract_greedy(var: Variable, raw: str) -> str | list[str] | None: + """Decode the greedy variable's isolated middle span. + + For scalar greedy (``{+var}``, ``{#var}``) this is a stop-char + validation and a single ``unquote``. For explode variables the span + is a run of separator-delimited segments (``/a/b/c`` or + ``;keys=a;keys=b``) that is split, validated, and decoded per item. + """ + spec = _OPERATOR_SPECS[var.operator] + stops = _STOP_CHARS[var.operator] + + if not var.explode: + if any(c in stops for c in raw): + return None + return unquote(raw) + + sep = spec.separator + if not raw: + return [] + # A non-empty explode span must begin with the separator: {/a*} + # expands to "/x/y", never "x/y". The scan does not consume the + # separator itself, so it must be the first character here. + if raw[0] != sep: + return None + # Segments must not contain the operator's non-separator stop + # characters (e.g. {/path*} segments may contain neither ? nor #). + body_stops = set(stops) - {sep} + if any(c in body_stops for c in raw): + return None + + segments: list[str] = [] + prefix = f"{var.name}=" + # split()[0] is always "" because raw starts with the separator; + # subsequent empties are legitimate values ({/path*} with + # ["a","","c"] expands to /a//c). + for seg in raw.split(sep)[1:]: + if spec.named: + # Named explode emits name=value per item (or bare name + # under ; with empty value). Validate the name and strip + # the prefix before decoding. + if seg.startswith(prefix): + seg = seg[len(prefix) :] + elif seg == var.name: + seg = "" + else: + return None + segments.append(unquote(seg)) + return segments + + +def _split_query_tail(parts: list[_Part]) -> tuple[list[_Part], list[Variable]]: + """Separate trailing ``?``/``&`` expressions from the path portion. + + Lenient query matching (order-agnostic, partial, ignores extras) + applies when a template ends with one or more consecutive ``?``/``&`` + expressions and the preceding path portion contains no literal + ``?``. If the path has a literal ``?`` (e.g., ``?fixed=1{&page}``), + the URI's ``?`` split won't align with the template's expression + boundary, so the strict scan is used instead. + + Returns: + A pair ``(path_parts, query_vars)``. If lenient matching does + not apply, ``query_vars`` is empty and ``path_parts`` is the + full input. + """ + split = len(parts) + for i in range(len(parts) - 1, -1, -1): + part = parts[i] + if isinstance(part, _Expression) and part.operator in ("?", "&"): + split = i + else: + break + + if split == len(parts): + return parts, [] + + # The tail must start with a {?...} expression so that expand() + # emits a ? the URI can split on. A standalone {&page} expands + # with an & prefix, which partition("?") won't find. + first = parts[split] + assert isinstance(first, _Expression) + if first.operator != "?": + return parts, [] + + # If the path portion contains a literal ?/# or a {?...}/{#...} + # expression, lenient matching's partition("#") then partition("?") + # would strip content the path scan expects to see. Fall back to + # the strict scan. + for part in parts[:split]: + if isinstance(part, str): + if "?" in part or "#" in part: + return parts, [] + elif part.operator in ("?", "#"): + return parts, [] + + query_vars: list[Variable] = [] + for part in parts[split:]: + assert isinstance(part, _Expression) + query_vars.extend(part.variables) + + return parts[:split], query_vars + + +def _parse(template: str, *, max_variables: int) -> tuple[list[_Part], list[Variable]]: + """Split a template into an ordered sequence of literals and expressions. + + Walks the string, alternating between collecting literal runs and + parsing ``{...}`` expressions. The resulting ``parts`` sequence + preserves positional interleaving so ``match()`` and ``expand()`` can + walk it in order. + + Raises: + InvalidUriTemplate: On unclosed braces, too many expressions, or + any error surfaced by :func:`_parse_expression`. + """ + parts: list[_Part] = [] + variables: list[Variable] = [] + i = 0 + n = len(template) + + while i < n: + # Find the next expression opener from the current cursor. + brace = template.find("{", i) + + if brace == -1: + # No more expressions; everything left is a trailing literal. + parts.append(template[i:]) + break + + if brace > i: + # Literal text between cursor and the brace. + parts.append(template[i:brace]) + + end = template.find("}", brace) + if end == -1: + raise InvalidUriTemplate( + f"Unclosed expression at position {brace}", + template=template, + position=brace, + ) + + # Delegate body (between braces, exclusive) to the expression parser. + expr = _parse_expression(template, template[brace + 1 : end], brace) + parts.append(expr) + variables.extend(expr.variables) + + if len(variables) > max_variables: + raise InvalidUriTemplate( + f"Template exceeds maximum of {max_variables} variables", + template=template, + ) + + # Advance past the closing brace. + i = end + 1 + + _check_duplicate_variables(template, variables) + return parts, variables + + +def _parse_expression(template: str, body: str, pos: int) -> _Expression: + """Parse the body of a single ``{...}`` expression. + + The body is everything between the braces. It consists of an optional + leading operator character followed by one or more comma-separated + variable specifiers. Each specifier is a name with an optional + trailing ``*`` (explode modifier). + + Args: + template: The full template string, for error reporting. + body: The expression body, braces excluded. + pos: Character offset of the opening brace, for error reporting. + + Raises: + InvalidUriTemplate: On empty body, invalid variable names, or + unsupported modifiers. + """ + if not body: + raise InvalidUriTemplate(f"Empty expression at position {pos}", template=template, position=pos) + + # Peel off the operator, if any. Membership check justifies the cast. + operator: Operator = "" + if body[0] in _OPERATORS: + operator = cast(Operator, body[0]) + body = body[1:] + if not body: + raise InvalidUriTemplate( + f"Expression has operator but no variables at position {pos}", + template=template, + position=pos, + ) + + # Remaining body is comma-separated variable specs: name[*] + variables: list[Variable] = [] + for spec in body.split(","): + if ":" in spec: + raise InvalidUriTemplate( + f"Prefix modifier {{var:N}} is not supported (in {spec!r} at position {pos})", + template=template, + position=pos, + ) + + explode = spec.endswith("*") + name = spec[:-1] if explode else spec + + if not _VARNAME_RE.match(name): + raise InvalidUriTemplate( + f"Invalid variable name {name!r} at position {pos}", + template=template, + position=pos, + ) + + # Explode only makes sense for operators that repeat a separator. + # Simple/reserved/fragment have no per-item separator; query-explode + # needs order-agnostic dict matching which we don't support yet. + if explode and operator in ("", "+", "#", "?", "&"): + raise InvalidUriTemplate( + f"Explode modifier on {{{operator}{name}*}} is not supported for matching", + template=template, + position=pos, + ) + + variables.append(Variable(name=name, operator=operator, explode=explode)) + + return _Expression(operator=operator, variables=variables) + + +def _check_duplicate_variables(template: str, variables: list[Variable]) -> None: + """Reject templates that use the same variable name more than once. + + RFC 6570 requires repeated variables to expand to the same value, + which would require backreference matching with potentially + exponential cost. Rather than silently returning only the last + captured value, we reject at parse time. + + Raises: + InvalidUriTemplate: If any variable name appears more than once. + """ + seen: set[str] = set() + for var in variables: + if var.name in seen: + raise InvalidUriTemplate( + f"Variable {var.name!r} appears more than once; repeated variables are not supported", + template=template, + ) + seen.add(var.name) + + +def _flatten(parts: list[_Part]) -> list[_Atom]: + """Lower expressions into a flat sequence of literals and single-variable captures. + + Operator prefixes and separators become explicit ``_Lit`` atoms so + the scan only ever sees two atom kinds. Adjacent literals are + coalesced so that anchor-finding (``find``/``rfind``) operates on + the longest possible literal, reducing false matches. + + Explode variables emit no lead literal: the explode capture + includes its own separator-prefixed repetitions (``{/a*}`` → + ``/x/y/z``, not ``/`` then ``x/y/z``). + """ + atoms: list[_Atom] = [] + + def push_lit(text: str) -> None: + if not text: + return + if atoms and isinstance(atoms[-1], _Lit): + atoms[-1] = _Lit(atoms[-1].text + text) + else: + atoms.append(_Lit(text)) + + for part in parts: + if isinstance(part, str): + push_lit(part) + continue + spec = _OPERATOR_SPECS[part.operator] + for i, var in enumerate(part.variables): + lead = spec.prefix if i == 0 else spec.separator + if var.explode: + atoms.append(_Cap(var)) + elif spec.named: + # ; uses ifemp (bare name when empty); ? and & always + # emit name= so the equals is part of the literal. + if part.operator == ";": + push_lit(f"{lead}{var.name}") + atoms.append(_Cap(var, ifemp=True)) + else: + push_lit(f"{lead}{var.name}=") + atoms.append(_Cap(var)) + else: + push_lit(lead) + atoms.append(_Cap(var)) + return atoms + + +def _partition_greedy(atoms: list[_Atom], template: str) -> tuple[list[_Atom], Variable | None, list[_Atom]]: + """Split atoms at the single greedy variable, if any. + + Returns ``(prefix, greedy_var, suffix)``. If there is no greedy + variable the entire atom list is returned as the suffix so that + the right-to-left scan (which matches regex-greedy semantics) + handles it. + + Raises: + InvalidUriTemplate: If more than one greedy variable is + present. Two multi-segment variables in one template are + inherently ambiguous — there is no principled way to decide + which one absorbs an extra segment. + """ + greedy_idx: int | None = None + for i, atom in enumerate(atoms): + if isinstance(atom, _Cap) and _is_greedy(atom.var): + if greedy_idx is not None: + raise InvalidUriTemplate( + "Template contains more than one multi-segment variable " + "({+var}, {#var}, or explode modifier); matching would be ambiguous", + template=template, + ) + greedy_idx = i + if greedy_idx is None: + return [], None, atoms + greedy = atoms[greedy_idx] + assert isinstance(greedy, _Cap) + return atoms[:greedy_idx], greedy.var, atoms[greedy_idx + 1 :] + + +def _scan_suffix( + atoms: Sequence[_Atom], uri: str, end: int, *, anchored: bool +) -> tuple[dict[str, str | list[str]], int] | None: + """Scan atoms right-to-left from ``end``, returning captures and start position. + + Each bounded variable takes the minimum span that lets its + preceding literal match (found via ``rfind``), which makes the + *first* variable in template order greedy — identical to Python + regex semantics for a sequence of greedy groups. + + When ``anchored`` is true the atom sequence is the entire template + (no greedy variable), so ``atoms[0]`` must match at URI position 0 + rather than at its rightmost occurrence. + """ + result: dict[str, str | list[str]] = {} + pos = end + i = len(atoms) - 1 + while i >= 0: + atom = atoms[i] + if isinstance(atom, _Lit): + n = len(atom.text) + if pos < n or uri[pos - n : pos] != atom.text: + return None + pos -= n + i -= 1 + continue + + var = atom.var + stops = _STOP_CHARS[var.operator] + prev = atoms[i - 1] if i > 0 else None + + if atom.ifemp: + # ;name or ;name=value. The preceding _Lit is ";name". + # Try empty first: if the lit ends at pos the value is + # absent (RFC ifemp). Otherwise require =value. + assert isinstance(prev, _Lit) + if uri.endswith(prev.text, 0, pos): + result[var.name] = "" + i -= 1 + continue + start = pos + while start > 0 and uri[start - 1] not in stops and uri[start - 1] != "=": + start -= 1 + if start == 0 or uri[start - 1] != "=": + return None + result[var.name] = unquote(uri[start:pos]) + pos = start - 1 + i -= 1 + continue + + if isinstance(prev, _Cap): + # Adjacent capture with no literal anchor: this (later) + # var takes nothing, the earlier var takes the span. Skip + # the stop-char scan entirely since the result is unused. + result[var.name] = "" + i -= 1 + continue + + # Earliest valid start: the var cannot extend left past any + # stop-char, so scan backward to find that boundary. + earliest = pos + while earliest > 0 and uri[earliest - 1] not in stops: + earliest -= 1 + + if prev is None: + start = earliest + elif anchored and i - 1 == 0: + # First atom of the whole template: positionally fixed at + # 0, not rightmost occurrence. rfind would land inside the + # value when the literal repeats there (e.g. "prefix-{id}" + # against "prefix-prefix-123"). + start = len(prev.text) + if start < earliest or start > pos: + return None + else: + # Rightmost occurrence of the preceding literal whose end + # falls within the var's valid range. + idx = uri.rfind(prev.text, 0, pos) + if idx == -1 or idx + len(prev.text) < earliest: + return None + start = idx + len(prev.text) + + result[var.name] = unquote(uri[start:pos]) + pos = start + i -= 1 + return result, pos + + +def _scan_prefix( + atoms: Sequence[_Atom], uri: str, start: int, limit: int +) -> tuple[dict[str, str | list[str]], int] | None: + """Scan atoms left-to-right from ``start``, not exceeding ``limit``. + + Each bounded variable takes the minimum span that lets its + following literal match (found via ``find``), leaving the + greedy variable as much of the URI as possible. + """ + result: dict[str, str | list[str]] = {} + pos = start + n = len(atoms) + for i in range(n): + atom = atoms[i] + if isinstance(atom, _Lit): + end = pos + len(atom.text) + if end > limit or uri[pos:end] != atom.text: + return None + pos = end + continue + + var = atom.var + stops = _STOP_CHARS[var.operator] + nxt = atoms[i + 1] if i + 1 < n else None + + if atom.ifemp: + # Optional = after ;name. A non-= non-delimiter here means + # the name continued (e.g. ;keys vs ;key) — reject, unless + # the template's next literal starts right here, in which + # case the value is legitimately empty. + if pos < limit and uri[pos] == "=": + pos += 1 + elif pos < limit and uri[pos] not in stops: + if not (isinstance(nxt, _Lit) and uri.startswith(nxt.text, pos)): + return None + + # Latest valid end: the var stops at the first stop-char or + # the scan limit, whichever comes first. + latest = pos + while latest < limit and uri[latest] not in stops: + latest += 1 + + if nxt is None: + end = latest + elif isinstance(nxt, _Lit): + # First occurrence of the following literal starting + # within the var's valid range. + idx = uri.find(nxt.text, pos, latest + len(nxt.text)) + if idx == -1 or idx > latest: + return None + end = idx + else: + end = latest + + result[var.name] = unquote(uri[pos:end]) + pos = end + return result, pos diff --git a/tests/server/mcpserver/resources/test_resource_template.py b/tests/server/mcpserver/resources/test_resource_template.py index 640cfe803..2ca85cca7 100644 --- a/tests/server/mcpserver/resources/test_resource_template.py +++ b/tests/server/mcpserver/resources/test_resource_template.py @@ -6,9 +6,155 @@ from mcp.server.mcpserver import Context, MCPServer from mcp.server.mcpserver.resources import FunctionResource, ResourceTemplate +from mcp.server.mcpserver.resources.templates import ( + DEFAULT_RESOURCE_SECURITY, + ResourceSecurity, + ResourceSecurityError, +) from mcp.types import Annotations +def _make(uri_template: str, security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY) -> ResourceTemplate: + def handler(**kwargs: Any) -> str: + raise NotImplementedError # these tests only exercise matches() + + return ResourceTemplate.from_function(fn=handler, uri_template=uri_template, security=security) + + +def test_matches_rfc6570_reserved_expansion(): + # {+path} allows / — the feature the old regex implementation couldn't support + t = _make("file://docs/{+path}") + assert t.matches("file://docs/src/main.py") == {"path": "src/main.py"} + + +def test_matches_rejects_encoded_slash_traversal(): + # %2F decodes to / in UriTemplate.match(), giving "../../etc/passwd". + # ResourceSecurity's traversal check then rejects the '..' components. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError, match="'name'"): + t.matches("file://docs/..%2F..%2Fetc%2Fpasswd") + + +def test_matches_rejects_path_traversal_by_default(): + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..") + + +def test_matches_rejects_path_traversal_in_reserved_var(): + # Even {+path} gets the traversal check — it's semantic, not structural + t = _make("file://docs/{+path}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/../../etc/passwd") + + +def test_matches_rejects_absolute_path(): + t = _make("file://docs/{+path}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs//etc/passwd") + + +def test_matches_allows_dotdot_as_substring(): + # .. is only dangerous as a path component + t = _make("git://refs/{range}") + assert t.matches("git://refs/v1.0..v2.0") == {"range": "v1.0..v2.0"} + + +def test_matches_exempt_params_skip_security(): + policy = ResourceSecurity(exempt_params={"range"}) + t = _make("git://diff/{+range}", security=policy) + assert t.matches("git://diff/../foo") == {"range": "../foo"} + + +def test_matches_disabled_policy_allows_traversal(): + policy = ResourceSecurity(reject_path_traversal=False, reject_absolute_paths=False) + t = _make("file://docs/{name}", security=policy) + assert t.matches("file://docs/..") == {"name": ".."} + + +def test_matches_rejects_null_byte_by_default(): + # %00 decodes to \x00 which defeats string comparisons + # ("..\x00" != "..") and can truncate in C extensions. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/key%00.txt") + # Null byte also defeats the traversal check's component comparison + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..%00%2Fsecret") + + +def test_matches_null_byte_check_can_be_disabled(): + policy = ResourceSecurity(reject_null_bytes=False) + t = _make("file://docs/{name}", security=policy) + assert t.matches("file://docs/key%00.txt") == {"name": "key\x00.txt"} + + +def test_security_rejection_does_not_fall_through_to_next_template(): + # A strict template's security rejection must halt iteration, not + # fall through to a later permissive template. Previously matches() + # returned None for both "no match" and "security failed", making + # registration order security-critical. + strict = _make("file://docs/{name}") + lax = _make( + "file://docs/{+path}", + security=ResourceSecurity(exempt_params={"path"}), + ) + uri = "file://docs/..%2Fsecrets" + # Strict matches structurally then fails security -> raises. + with pytest.raises(ResourceSecurityError) as exc: + strict.matches(uri) + assert exc.value.param == "name" + # If this raised, the resource manager never reaches the lax + # template. Verify the lax template WOULD have accepted it. + assert lax.matches(uri) == {"path": "../secrets"} + + +def test_matches_explode_checks_each_segment(): + t = _make("api{/parts*}") + assert t.matches("api/a/b/c") == {"parts": ["a", "b", "c"]} + # Any segment with traversal rejects the whole match + with pytest.raises(ResourceSecurityError): + t.matches("api/a/../c") + + +def test_matches_encoded_backslash_caught_by_traversal_check(): + # %5C decodes to '\\'. The traversal check normalizes '\\' to '/' + # and catches the '..' components. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..%5C..%5Csecret") + + +def test_matches_encoded_dots_caught_by_traversal_check(): + # %2E%2E decodes to '..' which the traversal check rejects. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/%2E%2E") + + +def test_matches_mixed_encoded_and_literal_slash(): + # The literal '/' stops the simple-var regex, so the URI doesn't + # match the template at all. + t = _make("file://docs/{name}") + assert t.matches("file://docs/..%2F../etc") is None + + +def test_matches_encoded_slash_without_traversal_allowed(): + # %2F decoding to '/' is fine when there's no traversal involved. + # UriTemplate accepts it; ResourceSecurity only blocks '..' and + # absolute paths. Handlers that need single-segment should use + # safe_join or validate explicitly. + t = _make("file://docs/{name}") + assert t.matches("file://docs/sub%2Ffile.txt") == {"name": "sub/file.txt"} + + +def test_matches_escapes_template_literals(): + # Regression: old impl treated . as regex wildcard + t = _make("data://v1.0/{id}") + assert t.matches("data://v1.0/42") == {"id": "42"} + assert t.matches("data://v1X0/42") is None + + class TestResourceTemplate: """Test ResourceTemplate functionality.""" diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index 3ef06d038..183c32c1c 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -12,13 +12,14 @@ from mcp.client import Client from mcp.server.context import ServerRequestContext from mcp.server.experimental.request_context import Experimental -from mcp.server.mcpserver import Context, MCPServer +from mcp.server.mcpserver import Context, MCPServer, ResourceSecurity from mcp.server.mcpserver.exceptions import ToolError from mcp.server.mcpserver.prompts.base import Message, UserMessage from mcp.server.mcpserver.resources import FileResource, FunctionResource from mcp.server.mcpserver.utilities.types import Audio, Image from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError +from mcp.shared.uri_template import InvalidUriTemplate from mcp.types import ( AudioContent, BlobResourceContents, @@ -792,7 +793,7 @@ async def test_resource_with_params(self): parameters don't match""" mcp = MCPServer() - with pytest.raises(ValueError, match="Mismatch between URI parameters"): + with pytest.raises(ValueError, match="has no URI template variables"): @mcp.resource("resource://data") def get_data_fn(param: str) -> str: # pragma: no cover @@ -1419,6 +1420,130 @@ def prompt_fn(name: str) -> str: ... # pragma: no branch await client.get_prompt("prompt_fn") +async def test_resource_decorator_rfc6570_reserved_expansion(): + # Regression: old regex-based param extraction couldn't see `path` + # in `{+path}` and failed with a confusing mismatch error. + mcp = MCPServer() + + @mcp.resource("file://docs/{+path}") + def read_doc(path: str) -> str: + raise NotImplementedError + + templates = await mcp.list_resource_templates() + assert [t.uri_template for t in templates] == ["file://docs/{+path}"] + + +async def test_resource_decorator_rejects_malformed_template(): + mcp = MCPServer() + with pytest.raises(InvalidUriTemplate, match="Unclosed expression"): + mcp.resource("file://{name") + + +async def test_resource_optional_query_params_use_function_defaults(): + """Omitted {?...} query params should fall through to the + handler's Python defaults. Partial and reordered params work.""" + mcp = MCPServer() + + @mcp.resource("logs://{service}{?since,level}") + def tail_logs(service: str, since: str = "1h", level: str = "info") -> str: + return f"{service}|{since}|{level}" + + async with Client(mcp) as client: + # No query → all defaults + r = await client.read_resource("logs://api") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|1h|info" + + # Partial query → one default + r = await client.read_resource("logs://api?since=15m") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|15m|info" + + # Reordered, both present + r = await client.read_resource("logs://api?level=error&since=5m") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|5m|error" + + # Extra param ignored + r = await client.read_resource("logs://api?since=2h&utm=x") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|2h|info" + + +async def test_resource_security_default_rejects_traversal(): + mcp = MCPServer() + + @mcp.resource("data://items/{name}") + def get_item(name: str) -> str: + return f"item:{name}" + + async with Client(mcp) as client: + # Safe value passes through to the handler + r = await client.read_resource("data://items/widget") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "item:widget" + + # ".." as a path component is rejected by default policy + with pytest.raises(MCPError, match="Unknown resource"): + await client.read_resource("data://items/..") + + +async def test_resource_security_per_resource_override(): + mcp = MCPServer() + + @mcp.resource( + "git://diff/{+range}", + security=ResourceSecurity(exempt_params={"range"}), + ) + def git_diff(range: str) -> str: + return f"diff:{range}" + + async with Client(mcp) as client: + # "../foo" would be rejected by default, but "range" is exempt + result = await client.read_resource("git://diff/../foo") + assert isinstance(result.contents[0], TextResourceContents) + assert result.contents[0].text == "diff:../foo" + + +async def test_resource_security_server_wide_override(): + mcp = MCPServer(resource_security=ResourceSecurity(reject_path_traversal=False)) + + @mcp.resource("data://items/{name}") + def get_item(name: str) -> str: + return f"item:{name}" + + async with Client(mcp) as client: + # Server-wide policy disabled traversal check; ".." now allowed + result = await client.read_resource("data://items/..") + assert isinstance(result.contents[0], TextResourceContents) + assert result.contents[0].text == "item:.." + + +async def test_static_resource_with_context_param_errors(): + """A non-template URI with a Context-only handler should error + at decoration time with a clear message, not silently register + an unreachable resource.""" + mcp = MCPServer() + + with pytest.raises(ValueError, match="Context injection for static resources is not yet supported"): + + @mcp.resource("weather://current") + def current_weather(ctx: Context) -> str: + raise NotImplementedError + + +async def test_static_resource_with_extra_params_errors(): + """A non-template URI with non-Context params should error at + decoration time.""" + mcp = MCPServer() + + with pytest.raises(ValueError, match="has no URI template variables"): + + @mcp.resource("data://fixed") + def get_data(name: str) -> str: + raise NotImplementedError + + async def test_completion_decorator() -> None: """Test that the completion decorator registers a working handler.""" mcp = MCPServer() diff --git a/tests/shared/test_path_security.py b/tests/shared/test_path_security.py new file mode 100644 index 000000000..b923cdb59 --- /dev/null +++ b/tests/shared/test_path_security.py @@ -0,0 +1,155 @@ +"""Tests for filesystem path safety primitives.""" + +from pathlib import Path + +import pytest + +from mcp.shared.path_security import ( + PathEscapeError, + contains_path_traversal, + is_absolute_path, + safe_join, +) + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + # Safe: no traversal + ("a/b/c", False), + ("readme.txt", False), + ("", False), + (".", False), + ("./a/b", False), + # Safe: .. balanced by prior descent + ("a/../b", False), + ("a/b/../c", False), + ("a/b/../../c", False), + # Unsafe: net escape + ("..", True), + ("../etc", True), + ("../../etc/passwd", True), + ("a/../../b", True), + ("./../../etc", True), + # .. as substring, not component — safe + ("1.0..2.0", False), + ("foo..bar", False), + ("..foo", False), + ("foo..", False), + # Backslash separator + ("..\\etc", True), + ("a\\..\\..\\b", True), + ("a\\b\\c", False), + # Mixed separators + ("a/..\\..\\b", True), + ], +) +def test_contains_path_traversal(value: str, expected: bool): + assert contains_path_traversal(value) is expected + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + # Relative + ("relative/path", False), + ("file.txt", False), + ("", False), + (".", False), + ("..", False), + # POSIX absolute + ("/", True), + ("/etc/passwd", True), + ("/a", True), + # Windows drive + ("C:", True), + ("C:\\Windows", True), + ("c:/foo", True), + ("Z:\\", True), + # Windows UNC / backslash-absolute + ("\\\\server\\share", True), + ("\\foo", True), + # Not a drive: digit before colon + ("1:foo", False), + # Colon not in position 1 + ("ab:c", False), + # Non-ASCII letter is not a drive letter + ("Ω:namespace", False), + ("é:foo", False), + ], +) +def test_is_absolute_path(value: str, expected: bool): + assert is_absolute_path(value) is expected + + +def test_safe_join_simple(tmp_path: Path): + result = safe_join(tmp_path, "docs", "readme.txt") + assert result == tmp_path / "docs" / "readme.txt" + + +def test_safe_join_resolves_relative_base(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.chdir(tmp_path) + result = safe_join(".", "file.txt") + assert result == tmp_path / "file.txt" + + +def test_safe_join_rejects_dotdot_escape(tmp_path: Path): + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(tmp_path, "../../../etc/passwd") + + +def test_safe_join_rejects_balanced_then_escape(tmp_path: Path): + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(tmp_path, "a/../../etc") + + +def test_safe_join_allows_balanced_dotdot(tmp_path: Path): + result = safe_join(tmp_path, "a/../b") + assert result == tmp_path / "b" + + +def test_safe_join_rejects_absolute_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "/etc/passwd") + + +def test_safe_join_rejects_absolute_in_later_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "docs", "/etc/passwd") + + +def test_safe_join_rejects_windows_drive(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "C:\\Windows\\System32") + + +def test_safe_join_rejects_null_byte(tmp_path: Path): + with pytest.raises(PathEscapeError, match="null byte"): + safe_join(tmp_path, "file\0.txt") + + +def test_safe_join_rejects_null_byte_in_later_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="null byte"): + safe_join(tmp_path, "docs", "file\0.txt") + + +def test_safe_join_rejects_symlink_escape(tmp_path: Path): + outside = tmp_path / "outside" + outside.mkdir() + sandbox = tmp_path / "sandbox" + sandbox.mkdir() + (sandbox / "escape").symlink_to(outside) + + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(sandbox, "escape", "secret.txt") + + +def test_safe_join_base_equals_target(tmp_path: Path): + # Joining nothing (or ".") should return the base itself + assert safe_join(tmp_path) == tmp_path + assert safe_join(tmp_path, ".") == tmp_path + + +def test_path_escape_error_is_value_error(): + with pytest.raises(ValueError): + safe_join("/tmp", "/etc") diff --git a/tests/shared/test_uri_template.py b/tests/shared/test_uri_template.py new file mode 100644 index 000000000..6b253732d --- /dev/null +++ b/tests/shared/test_uri_template.py @@ -0,0 +1,790 @@ +"""Tests for RFC 6570 URI template parsing, expansion, and matching.""" + +import pytest + +from mcp.shared.uri_template import DEFAULT_MAX_URI_LENGTH, InvalidUriTemplate, UriTemplate, Variable + + +def test_parse_literal_only(): + tmpl = UriTemplate.parse("file://docs/readme.txt") + assert tmpl.variables == [] + assert tmpl.variable_names == [] + assert str(tmpl) == "file://docs/readme.txt" + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + ("file://docs/{name}", True), + ("file://docs/readme.txt", False), + ("", False), + ("{a}", True), + ("{", False), + ("}", False), + ("}{", False), + ("prefix{+path}/suffix", True), + ("{invalid syntax but still a template}", True), + ], +) +def test_is_template(value: str, expected: bool): + assert UriTemplate.is_template(value) is expected + + +def test_parse_simple_variable(): + tmpl = UriTemplate.parse("file://docs/{name}") + assert tmpl.variables == [Variable(name="name", operator="")] + assert tmpl.variable_names == ["name"] + + +@pytest.mark.parametrize( + ("template", "operator"), + [ + ("{+path}", "+"), + ("{#frag}", "#"), + ("{.ext}", "."), + ("{/seg}", "/"), + ("{;param}", ";"), + ("{?q}", "?"), + ("{&next}", "&"), + ], +) +def test_parse_all_operators(template: str, operator: str): + tmpl = UriTemplate.parse(template) + (var,) = tmpl.variables + assert var.operator == operator + assert var.explode is False + + +def test_parse_multiple_variables_in_expression(): + tmpl = UriTemplate.parse("{?q,lang,page}") + assert tmpl.variable_names == ["q", "lang", "page"] + assert all(v.operator == "?" for v in tmpl.variables) + + +def test_parse_multiple_expressions(): + tmpl = UriTemplate.parse("db://{table}/{id}{?format}") + assert tmpl.variable_names == ["table", "id", "format"] + ops = [v.operator for v in tmpl.variables] + assert ops == ["", "", "?"] + + +def test_parse_explode_modifier(): + tmpl = UriTemplate.parse("/files{/path*}") + (var,) = tmpl.variables + assert var.name == "path" + assert var.operator == "/" + assert var.explode is True + + +@pytest.mark.parametrize("template", ["{.labels*}", "{;params*}"]) +def test_parse_explode_supported_operators(template: str): + tmpl = UriTemplate.parse(template) + assert tmpl.variables[0].explode is True + + +def test_parse_mixed_explode_and_plain(): + tmpl = UriTemplate.parse("{/path*}{?q}") + assert tmpl.variables == [ + Variable(name="path", operator="/", explode=True), + Variable(name="q", operator="?"), + ] + + +def test_parse_varname_with_dots_and_underscores(): + tmpl = UriTemplate.parse("{foo_bar.baz}") + assert tmpl.variable_names == ["foo_bar.baz"] + + +def test_parse_rejects_unclosed_expression(): + with pytest.raises(InvalidUriTemplate, match="Unclosed expression") as exc: + UriTemplate.parse("file://{name") + assert exc.value.position == 7 + assert exc.value.template == "file://{name" + + +def test_parse_rejects_empty_expression(): + with pytest.raises(InvalidUriTemplate, match="Empty expression"): + UriTemplate.parse("file://{}") + + +def test_parse_rejects_operator_without_variable(): + with pytest.raises(InvalidUriTemplate, match="operator but no variables"): + UriTemplate.parse("{+}") + + +@pytest.mark.parametrize( + "name", + [ + "-bad", + "bad-name", + "bad name", + "bad/name", + # RFC §2.3: dots only between varchars, not consecutive or trailing + "foo..bar", + "foo.", + ], +) +def test_parse_rejects_invalid_varname(name: str): + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse(f"{{{name}}}") + + +def test_parse_accepts_dotted_varname(): + t = UriTemplate.parse("{a.b.c}") + assert t.variable_names == ["a.b.c"] + + +def test_parse_rejects_empty_spec_in_list(): + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse("{a,,b}") + + +def test_parse_rejects_prefix_modifier(): + with pytest.raises(InvalidUriTemplate, match="Prefix modifier"): + UriTemplate.parse("{var:3}") + + +@pytest.mark.parametrize("template", ["{var*}", "{+var*}", "{#var*}", "{?var*}", "{&var*}"]) +def test_parse_rejects_unsupported_explode(template: str): + with pytest.raises(InvalidUriTemplate, match="Explode modifier"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + [ + # Two explode variables — any combination + "{/a*}{/b*}", + "{/a*}{.b*}", + "{.a*}{;b*}", + "{/a*}/x{/b*}", # literal between doesn't help: still two greedy + "{/a*}{b}{.c*}", # non-explode between doesn't help either + # {+var}/{#var} combined with explode + "{+a}{/b*}", + # Multi-var + expression: each var is greedy + "{+a,b}", + # Two {+var}/{#var} anywhere + "{+a}/x/{+b}", + "{+a},{+b}", + "{#a}/x/{+b}", + "{+a}.foo.{#b}", + ], +) +def test_parse_rejects_multiple_multi_segment_variables(template: str): + # Two multi-segment variables make matching inherently ambiguous: + # there is no principled way to decide which one absorbs an extra + # segment. The linear scan can only partition the URI around a + # single greedy slot. + with pytest.raises(InvalidUriTemplate, match="more than one multi-segment"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + [ + "file://docs/{+path}", # + at end of template + "file://{+path}.txt", # + followed by literal only + "file://{+path}/edit", # + followed by literal only + "api/{+path}{?v,page}", # + followed by query (handled by parse_qs) + "api/{+path}{&next}", # + followed by query-continuation + "page{#section}", # # at end + "{a}{#b}", # # prepends literal '#' that {a}'s stop-set includes + "{+a}/sep/{b}", # + with bounded vars after + "{+a},{b}", + # Previously rejected for adjacency; now safe under linear scan + "{+a}{b}", # suffix var scans back to its stop-char + "{+a}{/b}", + "{+a}{.b}", + "{+a}{;b}", + "{#a}{b}", + "prefix/{+path}{.ext}", + "{a}{+b}", # prefix var scans forward to its stop-char + "{.a}{+b}", + "{/a}{+b}", + "x{name}{+path}y", + ], +) +def test_parse_allows_single_multi_segment_variable(template: str): + # One multi-segment variable is fine: the linear scan isolates it + # between the prefix and suffix boundaries, and the scan never + # backtracks so match time stays O(n) regardless of URI content. + t = UriTemplate.parse(template) + assert t is not None + + +@pytest.mark.parametrize( + "template", + ["{x}/{x}", "{x,x}", "{a}{b}{a}", "{+x}/foo/{x}"], +) +def test_parse_rejects_duplicate_variable_names(template: str): + with pytest.raises(InvalidUriTemplate, match="appears more than once"): + UriTemplate.parse(template) + + +def test_invalid_uri_template_is_value_error(): + with pytest.raises(ValueError): + UriTemplate.parse("{}") + + +@pytest.mark.parametrize( + "template", + [ + "{{name}}", # nested open: body becomes "{name" + "{a{b}c}", # brace inside expression + "{{]{}}{}", # garbage soup + "{a,{b}", # brace in comma list + ], +) +def test_parse_rejects_nested_braces(template: str): + # Nested/stray { inside an expression lands in the varname and + # fails the varname regex rather than needing special handling. + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + ("template", "position"), + [ + ("{", 0), + ("{{", 0), + ("file://{name", 7), + ("{a}{", 3), + ("}{", 1), # stray } is literal, then unclosed { + ], +) +def test_parse_rejects_unclosed_brace(template: str, position: int): + with pytest.raises(InvalidUriTemplate, match="Unclosed") as exc: + UriTemplate.parse(template) + assert exc.value.position == position + + +@pytest.mark.parametrize( + "template", + ["}}", "}", "a}b", "{a}}{b}"], +) +def test_parse_treats_stray_close_brace_as_literal(template: str): + # RFC 6570 §2.1 strictly excludes } from literals, but we accept it + # for TypeScript SDK parity. A stray } almost always indicates a + # typo; rejecting would be more helpful but would also break + # cross-SDK behavior. + tmpl = UriTemplate.parse(template) + assert str(tmpl) == template + + +def test_parse_stray_close_brace_between_expressions(): + tmpl = UriTemplate.parse("{a}}{b}") + assert tmpl.variable_names == ["a", "b"] + + +def test_parse_rejects_oversized_template(): + with pytest.raises(InvalidUriTemplate, match="maximum length"): + UriTemplate.parse("x" * 101, max_length=100) + + +def test_parse_rejects_too_many_variables(): + template = "".join(f"{{v{i}}}" for i in range(11)) + with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"): + UriTemplate.parse(template, max_variables=10) + + +def test_parse_counts_variables_not_expressions(): + # A single {v0,v1,...} expression packs many variables under one + # brace pair. Counting expressions would miss this. + template = "{" + ",".join(f"v{i}" for i in range(11)) + "}" + with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"): + UriTemplate.parse(template, max_variables=10) + + +def test_parse_custom_limits_allow_larger(): + template = "".join(f"{{v{i}}}" for i in range(20)) + tmpl = UriTemplate.parse(template, max_variables=20) + assert len(tmpl.variables) == 20 + + +def test_equality_based_on_template_string(): + a = UriTemplate.parse("file://{name}") + b = UriTemplate.parse("file://{name}") + c = UriTemplate.parse("file://{other}") + assert a == b + assert a != c + assert hash(a) == hash(b) + + +def test_frozen(): + tmpl = UriTemplate.parse("{x}") + with pytest.raises(Exception): # noqa: B017 — FrozenInstanceError + tmpl.template = "changed" # type: ignore[misc] + + +@pytest.mark.parametrize( + ("template", "variables", "expected"), + [ + # Level 1: simple, encodes reserved chars + ("{var}", {"var": "value"}, "value"), + ("{var}", {"var": "hello world"}, "hello%20world"), + ("{var}", {"var": "a/b"}, "a%2Fb"), + ("file://docs/{name}", {"name": "readme.txt"}, "file://docs/readme.txt"), + # Level 2: reserved expansion keeps / ? # etc. + ("{+var}", {"var": "a/b/c"}, "a/b/c"), + ("{+var}", {"var": "a?b#c"}, "a?b#c"), + # RFC §3.2.3: reserved expansion passes through existing + # pct-triplets unchanged; bare % is still encoded. + ("{+var}", {"var": "path%2Fto"}, "path%2Fto"), + ("{+var}", {"var": "50%"}, "50%25"), + ("{+var}", {"var": "50%2"}, "50%252"), + ("{+var}", {"var": "a%2Fb%20c"}, "a%2Fb%20c"), + ("{#var}", {"var": "a%2Fb"}, "#a%2Fb"), + # Simple expansion still encodes % unconditionally (triplet + # preservation is reserved-only). + ("{var}", {"var": "path%2Fto"}, "path%252Fto"), + ("file://docs/{+path}", {"path": "src/main.py"}, "file://docs/src/main.py"), + # Level 2: fragment + ("{#var}", {"var": "section"}, "#section"), + ("{#var}", {"var": "a/b"}, "#a/b"), + # Level 3: label + ("file{.ext}", {"ext": "txt"}, "file.txt"), + # Level 3: path segment + ("{/seg}", {"seg": "docs"}, "/docs"), + # Level 3: path-style param + ("{;id}", {"id": "42"}, ";id=42"), + ("{;id}", {"id": ""}, ";id"), + # Level 3: query + ("{?q}", {"q": "search"}, "?q=search"), + ("{?q}", {"q": ""}, "?q="), + ("/search{?q,lang}", {"q": "mcp", "lang": "en"}, "/search?q=mcp&lang=en"), + # Level 3: query continuation + ("?a=1{&b}", {"b": "2"}, "?a=1&b=2"), + # Multi-var in one expression + ("{x,y}", {"x": "1", "y": "2"}, "1,2"), + # {+x,y} is rejected at parse time: each var in a + expression + # is multi-segment, and a template may only have one. + # Sequence values, non-explode (comma-join) + ("{/list}", {"list": ["a", "b", "c"]}, "/a,b,c"), + ("{?list}", {"list": ["a", "b"]}, "?list=a,b"), + # Explode: each item gets separator + ("{/path*}", {"path": ["a", "b", "c"]}, "/a/b/c"), + ("{.labels*}", {"labels": ["x", "y"]}, ".x.y"), + ("{;keys*}", {"keys": ["a", "b"]}, ";keys=a;keys=b"), + # RFC §3.2.7 ifemp: ; omits = for empty explode items + ("{;keys*}", {"keys": ["a", "", "b"]}, ";keys=a;keys;keys=b"), + # Undefined variables omitted + ("{?q,page}", {"q": "x"}, "?q=x"), + ("{a}{b}", {"a": "x"}, "x"), + ("{?page}", {}, ""), + # Empty sequence omitted + ("{/path*}", {"path": []}, ""), + # Literal-only template + ("file://static", {}, "file://static"), + ], +) +def test_expand(template: str, variables: dict[str, str | list[str]], expected: str): + assert UriTemplate.parse(template).expand(variables) == expected + + +def test_expand_encodes_special_chars_in_simple(): + t = UriTemplate.parse("{v}") + assert t.expand({"v": "a&b=c"}) == "a%26b%3Dc" + + +def test_expand_preserves_special_chars_in_reserved(): + t = UriTemplate.parse("{+v}") + assert t.expand({"v": "a&b=c"}) == "a&b=c" + + +@pytest.mark.parametrize( + "value", + [42, None, 3.14, {"a": "b"}, ["ok", 42], b"bytes"], +) +def test_expand_rejects_invalid_value_types(value: object): + t = UriTemplate.parse("{v}") + with pytest.raises(TypeError, match="must be str or a sequence of str"): + t.expand({"v": value}) # type: ignore[dict-item] + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # Level 1: simple + ("{var}", "hello", {"var": "hello"}), + ("file://docs/{name}", "file://docs/readme.txt", {"name": "readme.txt"}), + ("{a}/{b}", "foo/bar", {"a": "foo", "b": "bar"}), + # Level 2: reserved allows / + ("file://docs/{+path}", "file://docs/src/main.py", {"path": "src/main.py"}), + ("{+var}", "a/b/c", {"var": "a/b/c"}), + # Level 2: fragment + ("page{#section}", "page#intro", {"section": "intro"}), + # Level 3: label + ("file{.ext}", "file.txt", {"ext": "txt"}), + # Level 3: path segment + ("api{/version}", "api/v1", {"version": "v1"}), + # Level 3: path-style param + ("item{;id}", "item;id=42", {"id": "42"}), + ("item{;id}", "item;id", {"id": ""}), + # Explode: ; emits name=value per item, match strips the prefix + ("item{;keys*}", "item;keys=a;keys=b", {"keys": ["a", "b"]}), + ("item{;keys*}", "item;keys=a;keys;keys=b", {"keys": ["a", "", "b"]}), + ("item{;keys*}", "item", {"keys": []}), + # Level 3: query. Lenient matching: partial, reordered, and + # extra params are all accepted. Absent params stay absent. + ("search{?q}", "search?q=hello", {"q": "hello"}), + ("search{?q}", "search?q=", {"q": ""}), + ("search{?q}", "search", {}), + ("search{?q,lang}", "search?q=mcp&lang=en", {"q": "mcp", "lang": "en"}), + ("search{?q,lang}", "search?lang=en&q=mcp", {"q": "mcp", "lang": "en"}), + ("search{?q,lang}", "search?q=mcp", {"q": "mcp"}), + ("search{?q,lang}", "search", {}), + ("search{?q}", "search?q=mcp&utm=x&ref=y", {"q": "mcp"}), + # URL-encoded query values are decoded + ("search{?q}", "search?q=hello%20world", {"q": "hello world"}), + # + is a literal sub-delim per RFC 3986, not a space (form-encoding) + ("search{?q}", "search?q=C++", {"q": "C++"}), + ("search{?q}", "search?q=1.0+build.5", {"q": "1.0+build.5"}), + # Fragment is stripped before query parsing + ("logs://{service}{?level}", "logs://api?level=error#section1", {"service": "api", "level": "error"}), + ("search{?q}", "search#frag", {}), + # Multiple ?/& expressions collected together + ("api{?v}{&page,limit}", "api?limit=10&v=2", {"v": "2", "limit": "10"}), + # Standalone {&var} falls through to the strict scan (expands + # with & prefix, no ? for lenient matching to split on) + ("api{&page}", "api&page=2", {"page": "2"}), + # Literal ? in path portion falls through to the strict scan + ("api?x{?page}", "api?x?page=2", {"page": "2"}), + # {?...} expression in path portion also falls through + ("api{?q}x{?page}", "api?q=1x?page=2", {"q": "1", "page": "2"}), + # {#...} or literal # in path portion falls through: lenient + # matching would strip the fragment before the path scan sees it + ("page{#section}{?q}", "page#intro?q=x", {"section": "intro", "q": "x"}), + ("page#lit{?q}", "page#lit?q=x", {"q": "x"}), + # Empty & segments in query are skipped + ("search{?q}", "search?&q=hello&", {"q": "hello"}), + # Duplicate query keys keep first value + ("search{?q}", "search?q=first&q=second", {"q": "first"}), + # Percent-encoded parameter names are NOT decoded: RFC 6570 + # expansion never encodes names, so an encoded name cannot be + # a legitimate match. Prevents HTTP parameter pollution. + ("api://x{?token}", "api://x?%74oken=evil&token=real", {"token": "real"}), + ("api://x{?token}", "api://x?%74oken=evil", {}), + # Level 3: query continuation with literal ? falls back to + # the strict scan (template-order, all-present required) + ("?a=1{&b}", "?a=1&b=2", {"b": "2"}), + # Explode: path segments as list + ("/files{/path*}", "/files/a/b/c", {"path": ["a", "b", "c"]}), + ("/files{/path*}", "/files", {"path": []}), + ("/files{/path*}/edit", "/files/a/b/edit", {"path": ["a", "b"]}), + # Explode: labels + ("host{.labels*}", "host.example.com", {"labels": ["example", "com"]}), + # Repeated-slash literals preserved exactly + ("///{a}////{b}////", "///x////y////", {"a": "x", "b": "y"}), + ], +) +def test_match(template: str, uri: str, expected: dict[str, str | list[str]]): + assert UriTemplate.parse(template).match(uri) == expected + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + ("file://docs/{name}", "file://other/readme.txt"), + ("{a}/{b}", "foo"), + ("file{.ext}", "file"), + ("static", "different"), + # Anchoring: trailing extra component must not match. Guards + # against a refactor from fullmatch() to match() or search(). + ("/users/{id}", "/users/123/extra"), + ("/users/{id}/posts/{pid}", "/users/1/posts/2/extra"), + # Repeated-slash literal with wrong slash count + ("///{a}////{b}////", "//x////y////"), + # ; name boundary: {;id} must not match a longer parameter name + ("item{;id}", "item;identity=john"), + ("item{;id}", "item;ident"), + # ; explode: wrong parameter name in any segment rejects the match + ("item{;keys*}", "item;admin=true"), + ("item{;keys*}", "item;keys=a;admin=true"), + # Lenient-query branch: path portion fails to match + ("api/{name}{?q}", "wrong/path?q=x"), + # Lenient-query branch: ; explode name mismatch in path portion + ("item{;keys*}{?q}", "item;wrong=x?q=1"), + ], +) +def test_match_no_match(template: str, uri: str): + assert UriTemplate.parse(template).match(uri) is None + + +def test_match_adjacent_vars_with_prefix_names(): + # Two adjacent simple vars where one name is a prefix of the other. + # Capture positions are ordinal, so names only affect the result + # dict keys, not the scan. Adjacent unrestricted vars are inherently + # ambiguous; greedy * resolution means the first takes everything. + t = UriTemplate.parse("{var}{vara}") + assert t.match("ab") == {"var": "ab", "vara": ""} + assert t.match("abcd") == {"var": "abcd", "vara": ""} + + +def test_match_explode_preserves_empty_list_items(): + # Splitting the explode capture on its separator yields a leading + # empty item from the operator prefix; only that one is stripped. + # Subsequent empties are legitimate values from the input list. + t = UriTemplate.parse("{/path*}") + assert t.match("/a//c") == {"path": ["a", "", "c"]} + assert t.match("//a") == {"path": ["", "a"]} + assert t.match("/a/") == {"path": ["a", ""]} + + t = UriTemplate.parse("host{.labels*}") + assert t.match("host.a..c") == {"labels": ["a", "", "c"]} + + +def test_match_adjacent_vars_disambiguated_by_literal(): + # A literal between vars resolves the ambiguity. + t = UriTemplate.parse("{a}-{b}") + assert t.match("foo-bar") == {"a": "foo", "b": "bar"} + + +@pytest.mark.parametrize( + ("template", "variables"), + [ + # Leading literal appears inside the value: must anchor at + # position 0, not rfind to the rightmost occurrence. + ("prefix-{id}", {"id": "prefix-123"}), + ("u{s}", {"s": "xu"}), + ("_{x}", {"x": "_"}), + ("~{v}~", {"v": "~~~"}), + # Multi-occurrence with two vars: rfind correctly picks the + # rightmost literal BETWEEN vars, first literal anchors at 0. + ("L{a}L{b}", {"a": "xLy", "b": "z"}), + # Leading literal with stop-char: earliest bound still applies. + ("api/{name}", {"name": "api"}), + ], +) +def test_match_leading_literal_appears_in_value(template: str, variables: dict[str, str]): + # Regression: the R->L scan used rfind for the preceding literal, + # which lands inside the value when the template's leading literal + # is a substring of the expanded value. The first atom must anchor + # at position 0, not search. + t = UriTemplate.parse(template) + uri = t.expand(variables) + assert t.match(uri) == variables + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # {+var} followed by a bounded var: suffix scan reads back to + # the bounded var's stop-char, greedy var gets the rest. + ("{+path}{/name}", "a/b/c/readme", {"path": "a/b/c", "name": "readme"}), + ("{+path}{.ext}", "src/main.py", {"path": "src/main", "ext": "py"}), + ("prefix/{+path}{.ext}", "prefix/a/b.txt", {"path": "a/b", "ext": "txt"}), + # {+var} preceded by a bounded var: prefix scan reads forward + # to the bounded var's stop-char. + ("{/name}{+rest}", "/foo/bar/baz", {"name": "foo", "rest": "/bar/baz"}), + # Bounded vars before the greedy var match lazily (first anchor) + ("{owner}@{+path}", "alice@src/main", {"owner": "alice", "path": "src/main"}), + # Bounded vars after the greedy var match greedily (last anchor) + ("{+path}@{name}", "src@main@v1", {"path": "src@main", "name": "v1"}), + # {#frag} with a trailing bounded var + ("{#section}{/page}", "#intro/1", {"section": "intro", "page": "1"}), + ], +) +def test_match_greedy_with_adjacent_bounded_vars(template: str, uri: str, expected: dict[str, str]): + # These templates were previously rejected at parse time to avoid + # regex backtracking. The linear scan handles them in O(n). + assert UriTemplate.parse(template).match(uri) == expected + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + # Adjacent bounded vars with a failing suffix: scan commits to + # one split and fails immediately, no retry. + ("{a}{b}X", "z" * 200), + ("{a}{b}{c}X", "z" * 200), + # Mid-template {?...} with greedy var and failing suffix. + ("{?a}{+b}x", "?a=" + "y" * 200), + # Chained anchors that all appear in input but suffix fails. + ("{a}L{b}L{c}L{d}M", "L" * 200), + ], +) +def test_match_no_backtracking_on_pathological_input(template: str, uri: str): + # These patterns caused O(n²) or worse backtracking under the regex + # matcher. The linear scan returns None without retrying splits. + # (Correctness check only; we benchmark separately to avoid flaky + # timing assertions in CI.) + assert UriTemplate.parse(template).match(uri) is None + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + # Prefix literal mismatch before a greedy var + ("file://{+path}", "http://x"), + # Prefix anchor not found: {a} needs '@' before greedy but none exists + ("{a}@{+path}", "no-at-sign-here"), + # Prefix literal doesn't fit within suffix boundary + ("foo{+a}oob", "fooob"), + # Greedy scalar contains its own stop-char ({+var} stops at ?) + ("api://{+path}", "api://foo?bar"), + # Explode span doesn't start with its separator + ("X{/path*}", "Xnoslash"), + # Explode body contains a non-separator stop-char + ("X{/path*}", "X/a?b"), + ], +) +def test_match_greedy_rejection_paths(template: str, uri: str): + assert UriTemplate.parse(template).match(uri) is None + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # ifemp in prefix before a greedy var: =value form + ("api{;key}{+rest}", "api;key=abc/xyz", {"key": "abc", "rest": "/xyz"}), + # ifemp in prefix: bare form (empty value) + ("api{;key}{+rest}", "api;key/xyz", {"key": "", "rest": "/xyz"}), + # Adjacent bounded caps in prefix: first takes to stop-char + ("{a}{b}{+rest}", "foo/bar", {"a": "foo", "b": "", "rest": "/bar"}), + ], +) +def test_match_prefix_scan_edge_cases(template: str, uri: str, expected: dict[str, str]): + assert UriTemplate.parse(template).match(uri) == expected + + +def test_match_prefix_ifemp_rejects_name_continuation(): + # {;key} before a greedy var: ;keys has no = and the 's' continues + # the name, so this is not our parameter. + t = UriTemplate.parse("api{;key}{+rest}") + assert t.match("api;keys/xyz") is None + + +def test_match_prefix_ifemp_empty_before_non_stop_literal(): + # Regression: _scan_prefix rejected the empty-value case when the + # following template literal starts with a non-stop-char. The + # name-continuation guard saw 'X' after ';key' and assumed the + # name continued, but 'X' is the template's next literal. + t = UriTemplate.parse("api{;key}X{+rest}") + # Non-empty round-trips fine: + assert t.match(t.expand({"key": "abc", "rest": "/tail"})) == {"key": "abc", "rest": "/tail"} + # Empty value (ifemp → bare ;key, then X) must also round-trip: + uri = t.expand({"key": "", "rest": "/tail"}) + assert uri == "api;keyX/tail" + assert t.match(uri) == {"key": "", "rest": "/tail"} + # But an actual name continuation still rejects: + assert t.match("api;keyZX/tail") is None + + +def test_match_large_uri_against_greedy_template(): + # Large payload against a greedy template — the scan visits each + # character once for the suffix anchor and once for the greedy + # validation, so this is O(n) not O(n²). + t = UriTemplate.parse("{+path}/end") + body = "seg/" * 15000 + result = t.match(body + "end") + assert result == {"path": body[:-1]} + # And the failing case returns None without retrying splits. + assert t.match(body + "nope") is None + + +def test_match_decodes_percent_encoding(): + t = UriTemplate.parse("file://docs/{name}") + assert t.match("file://docs/hello%20world.txt") == {"name": "hello world.txt"} + + +def test_match_escapes_template_literals(): + # Regression: previous impl didn't escape . in literals, making it + # a regex wildcard. "fileXtxt" should NOT match "file.txt/{id}". + t = UriTemplate.parse("file.txt/{id}") + assert t.match("file.txt/42") == {"id": "42"} + assert t.match("fileXtxt/42") is None + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # Percent-encoded delimiters round-trip through match/expand. + # Path-safety validation belongs to ResourceSecurity, not here. + ("file://docs/{name}", "file://docs/a%2Fb", {"name": "a/b"}), + ("{var}", "a%3Fb", {"var": "a?b"}), + ("{var}", "a%23b", {"var": "a#b"}), + ("{var}", "a%26b", {"var": "a&b"}), + ("file{.ext}", "file.a%2Eb", {"ext": "a.b"}), + ("api{/v}", "api/a%2Fb", {"v": "a/b"}), + ("search{?q}", "search?q=a%26b", {"q": "a&b"}), + ("{;filter}", ";filter=a%3Bb", {"filter": "a;b"}), + ], +) +def test_match_encoded_delimiters_roundtrip(template: str, uri: str, expected: dict[str, str]): + assert UriTemplate.parse(template).match(uri) == expected + + +def test_match_reserved_expansion_handles_slash(): + # {+var} allows literal / (not just encoded) + t = UriTemplate.parse("{+path}") + assert t.match("a%2Fb") == {"path": "a/b"} + assert t.match("a/b") == {"path": "a/b"} + + +def test_match_double_encoding_decoded_once(): + # %252F is %2F encoded again. Single decode gives "%2F" (a literal + # percent sign, a '2', and an 'F'). Guards against over-decoding. + t = UriTemplate.parse("file://docs/{name}") + assert t.match("file://docs/..%252Fetc") == {"name": "..%2Fetc"} + + +def test_match_rejects_oversized_uri(): + t = UriTemplate.parse("{var}") + assert t.match("x" * 100, max_uri_length=50) is None + + +def test_match_accepts_uri_within_custom_limit(): + t = UriTemplate.parse("{var}") + assert t.match("x" * 100, max_uri_length=200) == {"var": "x" * 100} + + +def test_match_default_uri_length_limit(): + t = UriTemplate.parse("{+var}") + # Just at the limit: should match + assert t.match("x" * DEFAULT_MAX_URI_LENGTH) is not None + # One over: should reject + assert t.match("x" * (DEFAULT_MAX_URI_LENGTH + 1)) is None + + +def test_match_explode_encoded_separator_in_segment(): + # An encoded separator inside a segment decodes as part of the value, + # not as a split point. The split happens at literal separators only. + t = UriTemplate.parse("/files{/path*}") + assert t.match("/files/a%2Fb/c") == {"path": ["a/b", "c"]} + + +@pytest.mark.parametrize( + ("template", "variables"), + [ + ("{var}", {"var": "hello"}), + ("file://docs/{name}", {"name": "readme.txt"}), + ("file://docs/{+path}", {"path": "src/main.py"}), + ("search{?q,lang}", {"q": "mcp", "lang": "en"}), + ("file{.ext}", {"ext": "txt"}), + ("/files{/path*}", {"path": ["a", "b", "c"]}), + ("{var}", {"var": "hello world"}), + ("item{;id}", {"id": "42"}), + ("item{;id}", {"id": ""}), + # Defined-but-empty values still emit the operator prefix; match + # must accept the empty capture after it. + ("page{#section}", {"section": ""}), + ("file{.ext}", {"ext": ""}), + ("api{/v}", {"v": ""}), + ("x{name}y", {"name": ""}), + ("item{;keys*}", {"keys": ["a", "b", "c"]}), + ("item{;keys*}", {"keys": ["a", "", "b"]}), + # Empty strings in explode lists round-trip for unnamed operators + ("{/path*}", {"path": ["a", "", "c"]}), + ("{/path*}", {"path": ["", "a"]}), + ("host{.labels*}", {"labels": ["a", "", "c"]}), + # Partial query expansion round-trips: expand omits undefined + # vars, match leaves them absent from the result. + ("logs://{service}{?since,level}", {"service": "api"}), + ("logs://{service}{?since,level}", {"service": "api", "since": "1h"}), + ("logs://{service}{?since,level}", {"service": "api", "since": "1h", "level": "error"}), + ], +) +def test_roundtrip_expand_then_match(template: str, variables: dict[str, str | list[str]]): + t = UriTemplate.parse(template) + uri = t.expand(variables) + assert t.match(uri) == variables