The PyFly security module provides a complete authentication and authorization system built around JWT tokens, password hashing, a request-scoped security context, middleware for automatic token processing, and a decorator for role- and permission-based access control. Like all PyFly modules, it follows hexagonal principles: the password encoder is defined as a protocol (port) with a bcrypt adapter, and the security context is a plain dataclass with no framework coupling.
- Architecture Overview
- SecurityContext
- JWT Authentication
- Password Encoding
- SecurityMiddleware
- The @secure Decorator
- CSRF Protection
- HttpSecurity DSL
- OAuth2
- Exception Hierarchy
- Auto-Configuration
- Putting It All Together
The security module consists of the following components:
| Component | File | Purpose |
|---|---|---|
SecurityContext |
pyfly.security.context |
Immutable dataclass holding auth/authz data |
JWTService |
pyfly.security.jwt |
Encode, decode, and validate JWT tokens |
PasswordEncoder |
pyfly.security.password |
Protocol for password hashing |
BcryptPasswordEncoder |
pyfly.security.password |
Bcrypt implementation of PasswordEncoder |
SecurityMiddleware |
pyfly.web.adapters.starlette.security_middleware |
Starlette middleware for token extraction (re-exported from pyfly.security.middleware and pyfly.security) |
@secure |
pyfly.security.decorators |
Decorator for role/permission/expression enforcement |
CsrfFilter |
pyfly.web.adapters.starlette.filters.csrf_filter |
Double-submit cookie CSRF protection |
JWKSTokenValidator |
pyfly.security.oauth2.resource_server |
RS256 JWT validation via remote JWKS |
ClientRegistration |
pyfly.security.oauth2.client |
OAuth2 provider configuration dataclass |
AuthorizationServer |
pyfly.security.oauth2.authorization_server |
Token issuance and refresh token management |
HttpSecurity |
pyfly.security.http_security |
URL-level access control builder (DSL) |
HttpSecurityFilter |
pyfly.web.adapters.starlette.filters.http_security_filter |
Evaluates HttpSecurity rules at filter layer |
OAuth2LoginHandler |
pyfly.security.oauth2.login |
Browser-facing authorization_code login flow |
OAuth2SessionSecurityFilter |
pyfly.security.oauth2.session_security_filter |
Restores SecurityContext from HTTP session |
All components are exported from the top-level pyfly.security package:
from pyfly.security import (
SecurityContext,
JWTService,
PasswordEncoder,
BcryptPasswordEncoder,
SecurityMiddleware,
secure,
)
# CSRF utilities
from pyfly.security.csrf import generate_csrf_token, validate_csrf_token
from pyfly.web.adapters.starlette.filters.csrf_filter import CsrfFilter
# OAuth2
from pyfly.security.oauth2 import (
JWKSTokenValidator,
ClientRegistration,
ClientRegistrationRepository,
InMemoryClientRegistrationRepository,
AuthorizationServer,
TokenStore,
InMemoryTokenStore,
google,
github,
keycloak,
)SecurityContext is a frozen dataclass that holds authentication and authorization data for the current request. It is the central data structure that the middleware populates and the @secure decorator inspects.
from pyfly.security import SecurityContext
ctx = SecurityContext(
user_id="user-123",
roles=["ADMIN", "USER"],
permissions=["order:read", "order:write", "order:delete"],
attributes={"department": "engineering", "team": "platform"},
)Fields:
| Field | Type | Default | Description |
|---|---|---|---|
user_id |
str | None |
None |
Authenticated user's identifier |
roles |
list[str] |
[] |
User's assigned roles |
permissions |
list[str] |
[] |
User's granted permissions |
attributes |
dict[str, str] |
{} |
Additional key-value attributes |
Because SecurityContext is a frozen dataclass, it is immutable once created. This prevents accidental modification during request processing.
ctx = SecurityContext(user_id="user-123")
ctx.is_authenticated # True
anon = SecurityContext()
anon.is_authenticated # FalseThe is_authenticated property returns True if and only if user_id is not None.
ctx = SecurityContext(user_id="user-123", roles=["ADMIN", "USER"])
ctx.has_role("ADMIN") # True
ctx.has_role("MANAGER") # False
ctx.has_any_role(["ADMIN", "MANAGER"]) # True (has ADMIN)
ctx.has_any_role(["MANAGER", "DIRECTOR"]) # False (has neither)has_role(role)-- exact match against the roles list.has_any_role(roles)-- returnsTrueif the user has at least one of the given roles (set intersection).
ctx = SecurityContext(
user_id="user-123",
permissions=["order:read", "order:write"],
)
ctx.has_permission("order:read") # True
ctx.has_permission("order:delete") # Falseanon = SecurityContext.anonymous()
anon.user_id # None
anon.roles # []
anon.permissions # []
anon.is_authenticated # FalseThe anonymous() class method creates a context with all defaults, representing an unauthenticated user.
| Method / Property | Return Type | Description |
|---|---|---|
is_authenticated |
bool |
True if user_id is not None |
has_role(role) |
bool |
True if the user has the specified role |
has_any_role(roles) |
bool |
True if the user has any of the given roles |
has_permission(permission) |
bool |
True if the user has the specified permission |
anonymous() (classmethod) |
SecurityContext |
Create an anonymous (unauthenticated) context |
JWTService handles JWT token encoding, decoding, validation, and conversion to SecurityContext. It wraps the PyJWT library.
from pyfly.security import JWTService
jwt_service = JWTService(secret="my-secret-key", algorithm="HS256")Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
secret |
str |
required | Secret key for HMAC-based token signing |
algorithm |
str |
"HS256" |
JWT algorithm (e.g., HS256, HS384, RS256) |
Create a JWT token from a payload dictionary:
from datetime import datetime, timedelta, UTC
token = jwt_service.encode({
"sub": "user-123",
"roles": ["ADMIN", "USER"],
"permissions": ["order:read", "order:write"],
"exp": datetime.now(UTC) + timedelta(hours=1),
"iat": datetime.now(UTC),
})
# Returns: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...."The payload is a standard Python dictionary. PyJWT handles serialization of common types like datetime. You are responsible for including standard JWT claims like exp (expiration), iat (issued at), and sub (subject).
Decode and validate a JWT token:
payload = jwt_service.decode(token)
# Returns: {"sub": "user-123", "roles": ["ADMIN", "USER"], ...}Validation includes:
- Signature verification using the configured secret and algorithm
- Expiration check (if
expclaim is present) - All standard PyJWT validations
If the token is invalid, expired, or tampered with, a SecurityException is raised:
from pyfly.kernel.exceptions import SecurityException
try:
payload = jwt_service.decode("invalid-token")
except SecurityException as exc:
print(exc) # "Invalid token: ..."
print(exc.code) # "INVALID_TOKEN"The to_security_context() method is a convenience that decodes a token and builds a SecurityContext directly:
ctx = jwt_service.to_security_context(token)
# SecurityContext(
# user_id="user-123",
# roles=["ADMIN", "USER"],
# permissions=["order:read", "order:write"],
# )to_security_context() extracts these claims from the JWT payload:
| JWT Claim | SecurityContext Field | Required | Default |
|---|---|---|---|
sub |
user_id |
Yes | -- |
roles |
roles |
No | [] |
permissions |
permissions |
No | [] |
Any additional claims in the payload are ignored by to_security_context(). If you need them, decode the token manually with decode() and build the context yourself.
All PyJWT errors (jwt.PyJWTError and its subclasses) are caught and wrapped in a SecurityException with code "INVALID_TOKEN":
| PyJWT Error | Cause |
|---|---|
jwt.ExpiredSignatureError |
Token has expired (past exp claim) |
jwt.InvalidSignatureError |
Signature does not match |
jwt.DecodeError |
Token is malformed |
jwt.InvalidTokenError |
Other token validation failures |
PasswordEncoder is a runtime-checkable protocol that defines the contract for password hashing:
from pyfly.security import PasswordEncoder
class PasswordEncoder(Protocol):
def hash(self, raw_password: str) -> str:
"""Hash a raw password. Returns the hashed string."""
...
def verify(self, raw_password: str, hashed_password: str) -> bool:
"""Verify a raw password against a hashed password."""
...This protocol allows you to swap out the hashing implementation (e.g., bcrypt, argon2, scrypt) without changing your service layer.
The default production-ready implementation using bcrypt:
from pyfly.security import BcryptPasswordEncoder
encoder = BcryptPasswordEncoder(rounds=12)
# Hash a password
hashed = encoder.hash("my-secure-password")
# "$2b$12$LJ3m4ys3Lk..."
# Verify a password
encoder.verify("my-secure-password", hashed) # True
encoder.verify("wrong-password", hashed) # FalseConstructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
rounds |
int |
12 |
Bcrypt cost factor (higher = slower but more secure) |
The cost factor controls how computationally expensive the hashing operation is. Each increment roughly doubles the time. A value of 12 is considered a good default for production use.
Methods:
| Method | Return Type | Description |
|---|---|---|
hash(raw_password) |
str |
Generate a bcrypt hash with a random salt |
verify(raw_password, hashed) |
bool |
Check if the raw password matches the hash |
You can create custom password encoders by implementing the PasswordEncoder protocol:
import hashlib
class SHA256PasswordEncoder:
"""Simple SHA-256 encoder (NOT recommended for production)."""
def hash(self, raw_password: str) -> str:
return hashlib.sha256(raw_password.encode()).hexdigest()
def verify(self, raw_password: str, hashed_password: str) -> bool:
return self.hash(raw_password) == hashed_passwordBecause PasswordEncoder is a runtime_checkable protocol, you can use isinstance() checks:
encoder = BcryptPasswordEncoder()
isinstance(encoder, PasswordEncoder) # TrueThe SecurityMiddleware is a Starlette middleware that automatically extracts JWT tokens from incoming requests and populates the SecurityContext on request.state. Its canonical location is pyfly.web.adapters.starlette.security_middleware, and it is re-exported from pyfly.security.middleware and the top-level pyfly.security package for convenience.
For every incoming request, the middleware:
- Checks if the request path is in the
exclude_pathsset. If so, sets an anonymous context and continues. - Reads the
Authorizationheader. - If the header starts with
"Bearer ", extracts the token string. - Attempts to decode the token via
JWTService.to_security_context(). - On success, sets
request.state.security_contextto the authenticated context. - On failure (invalid/expired token), logs a debug message and sets an anonymous context.
- If no
Authorizationheader is present, sets an anonymous context.
The middleware never rejects requests. It only populates the security context. Authorization enforcement is the job of the @secure decorator or your own logic.
from pyfly.security import SecurityMiddleware, JWTService
jwt_service = JWTService(secret="my-secret")
# As Starlette middleware
from starlette.applications import Starlette
app = Starlette()
app.add_middleware(
SecurityMiddleware,
jwt_service=jwt_service,
exclude_paths=["/docs", "/openapi.json", "/actuator/health"],
)Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
app |
ASGIApp |
required | The ASGI application |
jwt_service |
JWTService |
required | JWT service for token validation |
exclude_paths |
Sequence[str] |
() |
Paths to skip (set anonymous context directly) |
Public endpoints like documentation, health checks, and login should be excluded from JWT processing. While the middleware does not reject requests, excluding paths avoids unnecessary token parsing:
app.add_middleware(
SecurityMiddleware,
jwt_service=jwt_service,
exclude_paths=[
"/docs",
"/redoc",
"/openapi.json",
"/actuator/health",
"/api/auth/login",
"/api/auth/register",
],
)The SecurityMiddleware is not included automatically by create_app(). You add it to the application after creation:
from pyfly.web.adapters.starlette import create_app
from pyfly.security import SecurityMiddleware, JWTService
app = create_app(title="My API", context=ctx)
app.add_middleware(
SecurityMiddleware,
jwt_service=JWTService(secret="my-secret"),
exclude_paths=["/docs", "/openapi.json"],
)The @secure decorator enforces authentication and authorization on individual handler functions.
Require the user to have at least one of the specified roles:
from pyfly.security import secure, SecurityContext
@secure(roles=["ADMIN"])
async def admin_only(security_context: SecurityContext) -> dict:
return {"message": "Admin access granted"}
@secure(roles=["ADMIN", "MANAGER"])
async def admin_or_manager(security_context: SecurityContext) -> dict:
# User must have ADMIN *or* MANAGER role (at least one)
return {"message": "Access granted"}Require the user to have all of the specified permissions:
@secure(permissions=["order:read"])
async def read_orders(security_context: SecurityContext) -> list:
return [{"id": "1", "status": "active"}]
@secure(permissions=["order:read", "order:write"])
async def manage_orders(security_context: SecurityContext) -> dict:
# User must have BOTH order:read AND order:write
return {"message": "Full order access"}When both roles and permissions are specified, the user must satisfy both conditions:
@secure(roles=["ADMIN", "MANAGER"], permissions=["order:delete"])
async def delete_order(order_id: str, security_context: SecurityContext) -> None:
# User must have (ADMIN or MANAGER) AND order:delete permission
...The @secure decorator wraps the function in an async wrapper that:
- Extracts the
security_contextkeyword argument from the call. - If
security_contextisNone, raisesSecurityException(code="AUTH_REQUIRED"). - If
security_context.is_authenticatedisFalse, raisesSecurityException(code="AUTH_REQUIRED"). - If
rolesare specified and the user has none of them, raisesSecurityException(code="FORBIDDEN"). - If
permissionsare specified and the user is missing any, raisesSecurityException(code="FORBIDDEN"). - If all checks pass, calls the original function.
The decorated function must accept a security_context: SecurityContext keyword argument. This is how the decorator accesses the current user's context.
| Check Failed | Exception | HTTP Status |
|---|---|---|
| No security context | SecurityException("Authentication required", code="AUTH_REQUIRED") |
401 |
| Not authenticated | SecurityException("Authentication required", code="AUTH_REQUIRED") |
401 |
| Insufficient roles | SecurityException("Insufficient roles: ...", code="FORBIDDEN") |
403 |
| Insufficient permissions | SecurityException("Insufficient permissions: ...", code="FORBIDDEN") |
403 |
These exceptions are caught by the global exception handler and converted to structured JSON error responses.
The expression parameter enables Spring Security-style security expressions for more complex authorization logic:
@secure(expression="hasRole('ADMIN') and hasPermission('order:delete')")
async def delete_order(order_id: str, security_context: SecurityContext) -> None:
...Supported expressions:
| Expression | Description | Example |
|---|---|---|
hasRole('X') |
User has role X | hasRole('ADMIN') |
hasAnyRole('X', 'Y') |
User has at least one of the roles | hasAnyRole('ADMIN', 'MANAGER') |
hasPermission('X') |
User has permission X | hasPermission('user:read') |
isAuthenticated |
User is authenticated | isAuthenticated |
and |
Boolean AND | hasRole('ADMIN') and hasPermission('write') |
or |
Boolean OR | hasRole('ADMIN') or hasRole('MANAGER') |
not |
Boolean NOT | not hasRole('GUEST') |
(...) |
Grouping | (hasRole('ADMIN') or hasRole('MANAGER')) and hasPermission('write') |
Complex expression examples:
# Require ADMIN role AND write permission
@secure(expression="hasRole('ADMIN') and hasPermission('order:write')")
async def update_order(order_id: str, security_context: SecurityContext) -> dict:
...
# Allow ADMIN or MANAGER with write permission
@secure(expression="(hasRole('ADMIN') or hasRole('MANAGER')) and hasPermission('write')")
async def approve_order(order_id: str, security_context: SecurityContext) -> dict:
...
# Deny guests
@secure(expression="not hasRole('GUEST')")
async def member_content(security_context: SecurityContext) -> dict:
...Safety: Expressions are evaluated using safe AST parsing -- no eval() or exec() is used. The expression is first reduced to a boolean-only string (True/False/and/or/not/parentheses), then evaluated via recursive AST walking.
Invalid expressions (containing unsafe tokens like function calls, imports, or arithmetic) raise SecurityException with code "INVALID_EXPRESSION".
Source: src/pyfly/security/decorators.py
PyFly provides stateless CSRF protection using the double-submit cookie pattern. This is implemented as a WebFilter that integrates into the filter chain.
- On safe requests (GET, HEAD, OPTIONS, TRACE), the filter sets an
XSRF-TOKENcookie on the response. - JavaScript reads the cookie and includes its value as an
X-XSRF-TOKENheader on subsequent unsafe requests. - On unsafe requests (POST, PUT, DELETE, PATCH), the filter validates that the header value matches the cookie value using a timing-safe comparison.
- If either token is missing or they don't match, the filter returns HTTP 403.
Since cross-origin requests cannot read cookies from another domain, this proves the request originated from the same site.
Token generation and validation are provided by pyfly.security.csrf:
from pyfly.security.csrf import (
generate_csrf_token,
validate_csrf_token,
CSRF_COOKIE_NAME, # "XSRF-TOKEN"
CSRF_HEADER_NAME, # "X-XSRF-TOKEN"
SAFE_METHODS, # frozenset({"GET", "HEAD", "OPTIONS", "TRACE"})
)
# Generate a cryptographically-secure token
token = generate_csrf_token() # URL-safe base64 string (43 chars)
# Timing-safe validation
is_valid = validate_csrf_token(cookie_token, header_token)| Function | Description |
|---|---|
generate_csrf_token() |
Generates a URL-safe token using secrets.token_urlsafe(32) |
validate_csrf_token(cookie, header) |
Timing-safe comparison using secrets.compare_digest |
Source: src/pyfly/security/csrf.py
The CsrfFilter extends OncePerRequestFilter and runs in the WebFilter chain:
from pyfly.web.adapters.starlette.filters.csrf_filter import CsrfFilter| Property | Value | Description |
|---|---|---|
__pyfly_order__ |
-50 |
Runs after RequestContext but before SecurityFilter |
exclude_patterns |
["/actuator/*", "/health", "/ready"] |
Paths excluded from CSRF |
Bearer bypass: Requests with an Authorization: Bearer ... header skip CSRF validation entirely. JWT-based API clients are already immune to CSRF attacks because tokens are not sent automatically by browsers.
Cookie properties:
| Property | Value | Reason |
|---|---|---|
httponly |
False |
JavaScript must read the cookie to send it as a header |
samesite |
lax |
Prevents cookies from being sent on cross-site requests |
secure |
True |
Cookie only sent over HTTPS |
path |
/ |
Available to all paths |
To use CSRF protection with a JavaScript frontend:
// Read the XSRF-TOKEN cookie
function getCsrfToken() {
const match = document.cookie.match(/XSRF-TOKEN=([^;]+)/);
return match ? match[1] : null;
}
// Include in requests
fetch('/api/orders', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-XSRF-TOKEN': getCsrfToken(),
},
body: JSON.stringify({ item: 'Widget' }),
credentials: 'include',
});Source: src/pyfly/web/adapters/starlette/filters/csrf_filter.py
The HttpSecurity builder provides a fluent API for defining URL-level access control rules. Instead of scattering @secure decorators on every handler, you declare authorization rules centrally and the HttpSecurityFilter enforces them at the filter layer -- before the route handler is reached.
from pyfly.security.http_security import HttpSecurityHttpSecurity follows a builder pattern inspired by Spring Security's HttpSecurity:
from pyfly.security.http_security import HttpSecurity
http_security = HttpSecurity()
http_security.authorize_requests() \
.request_matchers("/api/admin/**").has_role("ADMIN") \
.request_matchers("/api/**").authenticated() \
.request_matchers("/health", "/docs", "/openapi.json").permit_all() \
.any_request().deny_all()
# Build the filter
http_security_filter = http_security.build()The builder chain works as follows:
authorize_requests()-- returns an_AuthorizeRequestsBuilderto start defining rules.request_matchers(*patterns)-- begins a rule for one or more URL glob patterns (fnmatch-style).- A terminal method (
permit_all(),authenticated(),has_role(), etc.) -- sets the access rule for the matched patterns and returns back to the builder for chaining. any_request()-- a catch-all that matches any path not matched by previous rules. Should be the last rule in the chain.build()-- creates anHttpSecurityFilterconfigured with all accumulated rules.
Rules are evaluated in declaration order -- first match wins. If no rule matches a given request path, the request is allowed through (open by default).
| Terminal Method | Rule Type | Description |
|---|---|---|
permit_all() |
PERMIT_ALL |
Allow all requests (no authentication required) |
deny_all() |
DENY_ALL |
Reject all requests with HTTP 403 |
authenticated() |
AUTHENTICATED |
Require an authenticated user (any role) |
has_role(role) |
HAS_ROLE |
Require the user to have the specified role |
has_any_role(roles) |
HAS_ANY_ROLE |
Require the user to have at least one of the listed roles |
has_permission(perm) |
HAS_PERMISSION |
Require the user to have the specified permission |
The HttpSecurityFilter is an OncePerRequestFilter ordered at HIGHEST_PRECEDENCE + 350. It runs after authentication filters (SecurityFilter at +250, OAuth2SessionSecurityFilter at +225) and before the route handler. This means the SecurityContext is already populated on request.state when the rules are evaluated.
from pyfly.web.adapters.starlette.filters.http_security_filter import HttpSecurityFilterEvaluation logic:
- For each incoming request, the filter iterates through the rules in order.
- The first rule whose URL patterns match the request path is applied.
- If the rule requires authentication or specific roles/permissions and the user does not satisfy the requirement, the filter returns an RFC 7807 problem-detail JSON response (HTTP 401 or 403).
- If no rule matches, the request passes through.
Error responses follow RFC 7807 with Content-Type: application/problem+json:
{
"type": "about:blank",
"title": "Forbidden",
"status": 403,
"detail": "Required role 'ADMIN' is not granted.",
"instance": "/api/admin/users"
}Register the HttpSecurityFilter as a DI bean so it is auto-discovered by create_app():
from pyfly.container import configuration, bean
from pyfly.security.http_security import HttpSecurity
@configuration
class SecurityConfig:
@bean
def http_security_filter(self):
http_security = HttpSecurity()
http_security.authorize_requests() \
.request_matchers("/api/admin/**").has_role("ADMIN") \
.request_matchers("/api/**").authenticated() \
.request_matchers("/health", "/docs", "/redoc", "/openapi.json").permit_all() \
.any_request().permit_all()
return http_security.build()The filter is automatically included in the WebFilter chain and sorted by its @order value (HIGHEST_PRECEDENCE + 350).
Source: src/pyfly/security/http_security.py, src/pyfly/web/adapters/starlette/filters/http_security_filter.py
PyFly provides Spring Security-style method-level authorization via @pre_authorize and @post_authorize decorators. These evaluate SpEL-style security expressions against the current RequestContext.security_context.
from pyfly.security import pre_authorize
@service
class OrderService:
@pre_authorize("hasRole('ADMIN') or hasPermission('order:write')")
async def delete_order(self, order_id: str) -> None:
...
@pre_authorize("isAuthenticated")
async def list_orders(self) -> list[Order]:
...from pyfly.security import post_authorize
@service
class OrderService:
@post_authorize("hasPermission('order:read')")
async def get_order(self, order_id: str) -> Order:
# Method body runs first; authorization checked on return
return await self.repo.find_by_id(order_id)| Expression | Description |
|---|---|
isAuthenticated |
User is authenticated |
hasRole('ADMIN') |
User has the ADMIN role |
hasAnyRole('ADMIN', 'MANAGER') |
User has at least one role |
hasPermission('order:read') |
User has the permission |
and / or / not |
Boolean operators |
(...) |
Grouping |
Both decorators raise UnauthorizedException (401) when no SecurityContext is available, and ForbiddenException (403) when the expression evaluates to False.
PyFly provides a complete OAuth2 implementation following hexagonal architecture. The module includes a Resource Server for validating external tokens, Client Registration for connecting to OAuth2 providers, and an Authorization Server for issuing tokens.
from pyfly.security.oauth2 import (
# Resource Server
JWKSTokenValidator,
# Client Registration
ClientRegistration,
ClientRegistrationRepository,
InMemoryClientRegistrationRepository,
google,
github,
keycloak,
# Authorization Server
AuthorizationServer,
TokenStore,
InMemoryTokenStore,
)The JWKSTokenValidator validates RS256-signed JWTs using a remote JWKS (JSON Web Key Set) endpoint. This is used when your application acts as an OAuth2 Resource Server -- it receives tokens issued by an external authorization server and validates them.
from pyfly.security.oauth2 import JWKSTokenValidator
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="my-api",
)Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
jwks_uri |
str |
required | URL of the JWKS endpoint |
issuer |
str | None |
None |
Expected iss claim (validates if set) |
audience |
str | None |
None |
Expected aud claim (validates if set) |
algorithms |
list[str] | None |
["RS256"] |
Allowed signing algorithms |
Validating tokens:
# Validate and get raw payload
payload = validator.validate(token)
# {"sub": "user-123", "roles": ["ADMIN"], "scope": "read write", ...}
# Validate and build SecurityContext directly
ctx = validator.to_security_context(token)
# SecurityContext(user_id="user-123", roles=["ADMIN"], permissions=["read", "write"])Claim mapping for to_security_context():
| JWT Claim | SecurityContext Field | Notes |
|---|---|---|
sub |
user_id |
Standard subject claim |
roles |
roles |
Flat roles array |
realm_access.roles |
roles |
Keycloak-style nested roles (fallback) |
permissions |
permissions |
Flat permissions array |
scope |
permissions |
Space-separated scopes (fallback, split on spaces) |
Source: src/pyfly/security/oauth2/resource_server.py
ClientRegistration is a frozen dataclass that holds the configuration needed to interact with an OAuth2 provider.
from pyfly.security.oauth2 import ClientRegistration
registration = ClientRegistration(
registration_id="my-app",
client_id="client-id-from-provider",
client_secret="client-secret-from-provider",
authorization_grant_type="authorization_code",
redirect_uri="https://myapp.com/callback",
scopes=["openid", "profile", "email"],
authorization_uri="https://provider.com/authorize",
token_uri="https://provider.com/token",
user_info_uri="https://provider.com/userinfo",
jwks_uri="https://provider.com/.well-known/jwks.json",
issuer_uri="https://provider.com",
provider_name="Custom Provider",
)Fields:
| Field | Type | Default | Description |
|---|---|---|---|
registration_id |
str |
required | Unique identifier for this registration |
client_id |
str |
required | OAuth2 client ID |
client_secret |
str |
"" |
OAuth2 client secret |
authorization_grant_type |
str |
"authorization_code" |
Grant type |
redirect_uri |
str |
"" |
Redirect URI for auth code flow |
scopes |
list[str] |
[] |
Requested scopes |
authorization_uri |
str |
"" |
Provider's authorization endpoint |
token_uri |
str |
"" |
Provider's token endpoint |
user_info_uri |
str |
"" |
Provider's userinfo endpoint |
jwks_uri |
str |
"" |
Provider's JWKS endpoint |
issuer_uri |
str |
"" |
Provider's issuer URI |
provider_name |
str |
"" |
Human-readable provider name |
Pre-configured factories for common OAuth2 providers:
from pyfly.security.oauth2 import google, github, keycloak
# Google OAuth2
google_reg = google(
client_id="your-google-client-id",
client_secret="your-google-client-secret",
redirect_uri="https://myapp.com/callback/google",
)
# GitHub OAuth2
github_reg = github(
client_id="your-github-client-id",
client_secret="your-github-client-secret",
)
# Keycloak (derives all endpoints from the issuer URI)
keycloak_reg = keycloak(
client_id="your-keycloak-client-id",
client_secret="your-keycloak-client-secret",
issuer_uri="https://keycloak.example.com/realms/myrealm",
)| Factory | Scopes | Grant Type |
|---|---|---|
google() |
openid, profile, email |
authorization_code |
github() |
read:user, user:email |
authorization_code |
keycloak() |
openid, profile, email |
authorization_code |
The ClientRegistrationRepository protocol defines the port for looking up registrations:
from pyfly.security.oauth2 import (
ClientRegistrationRepository,
InMemoryClientRegistrationRepository,
)
# Create a repository with registrations
repo = InMemoryClientRegistrationRepository(google_reg, github_reg, keycloak_reg)
# Look up by registration ID
reg = repo.find_by_registration_id("google") # Returns ClientRegistration or None
# Add registrations after construction
repo.add(custom_registration)
# List all registrations
all_regs = repo.registrations # list[ClientRegistration]Source: src/pyfly/security/oauth2/client.py
The AuthorizationServer issues JWT access tokens and manages refresh tokens. It supports client_credentials (machine-to-machine) and refresh_token grant types.
from pyfly.security.oauth2 import (
AuthorizationServer,
InMemoryTokenStore,
InMemoryClientRegistrationRepository,
ClientRegistration,
)
# Set up client registration
client = ClientRegistration(
registration_id="my-service",
client_id="my-service",
client_secret="service-secret",
scopes=["read", "write"],
)
client_repo = InMemoryClientRegistrationRepository(client)
# Create authorization server
auth_server = AuthorizationServer(
secret="jwt-signing-secret",
client_repository=client_repo,
token_store=InMemoryTokenStore(),
access_token_ttl=3600, # 1 hour
refresh_token_ttl=86400, # 24 hours
issuer="https://auth.myapp.com",
)Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
secret |
str |
required | Secret key for HS256 token signing |
client_repository |
ClientRegistrationRepository |
required | Repository for client lookup |
token_store |
TokenStore |
required | Storage for refresh tokens |
access_token_ttl |
int |
3600 |
Access token lifetime (seconds) |
refresh_token_ttl |
int |
86400 |
Refresh token lifetime (seconds) |
issuer |
str | None |
None |
Token issuer (iss claim) |
# Client credentials grant (machine-to-machine)
response = await auth_server.token(
grant_type="client_credentials",
client_id="my-service",
client_secret="service-secret",
scope="read write",
)
# {
# "access_token": "eyJhbGciOiJIUzI1NiI...",
# "token_type": "Bearer",
# "expires_in": 3600,
# "refresh_token": "dGhpcyBpcyBhIHJlZnJlc2g...",
# "scope": "read write"
# }
# Refresh token grant
new_response = await auth_server.token(
grant_type="refresh_token",
client_id="my-service",
client_secret="service-secret",
refresh_token=response["refresh_token"],
)Refresh token rotation: When a refresh token is used, the old token is automatically revoked and a new one is issued. This limits the window of vulnerability if a token is compromised.
The TokenStore protocol defines the port for token persistence:
class TokenStore(Protocol):
async def store(self, token_id: str, token_data: dict[str, Any]) -> None: ...
async def find(self, token_id: str) -> dict[str, Any] | None: ...
async def revoke(self, token_id: str) -> None: ...InMemoryTokenStore is the built-in adapter for development and testing. In production, implement TokenStore with Redis or a database backend.
| Error Code | Cause |
|---|---|
INVALID_CLIENT |
Unknown client ID or wrong secret |
INVALID_REQUEST |
Missing required parameter (e.g., refresh_token) |
UNSUPPORTED_GRANT_TYPE |
Grant type not supported |
INVALID_GRANT |
Invalid, expired, or mismatched refresh token |
Source: src/pyfly/security/oauth2/authorization_server.py
The OAuth2LoginHandler implements the full browser-facing OAuth2 authorization_code flow. It creates Starlette routes that handle the redirect-to-provider, callback-with-code, and logout steps. The OAuth2SessionSecurityFilter complements it by restoring the SecurityContext from the HTTP session on subsequent requests.
from pyfly.security.oauth2.login import OAuth2LoginHandler
from pyfly.security.oauth2.session_security_filter import OAuth2SessionSecurityFilterOAuth2LoginHandler creates three routes:
| Route | Method | Description |
|---|---|---|
/oauth2/authorization/{registration_id} |
GET | Redirects the browser to the OAuth2 provider's authorization endpoint with a CSRF state parameter |
/login/oauth2/code/{registration_id} |
GET | Handles the provider callback: validates state, exchanges the authorization code for tokens, fetches user info, builds a SecurityContext, and stores it in the session |
/logout |
POST | Invalidates the HTTP session and redirects to / |
Constructor parameters:
| Parameter | Type | Description |
|---|---|---|
client_repository |
ClientRegistrationRepository |
Repository to look up OAuth2 client registrations |
Authorization flow:
- The user visits
/oauth2/authorization/google(or any registration ID). - The handler looks up the
ClientRegistration, generates a randomstatetoken, stores it in the session, and redirects the browser to the provider'sauthorization_uriwithresponse_type=code,client_id,redirect_uri,scope, andstateparameters. - The provider authenticates the user and redirects back to
/login/oauth2/code/google?code=...&state=.... - The callback handler validates the
stateparameter (CSRF protection), exchanges the authorization code for tokens via the provider'stoken_uri, fetches user info fromuser_info_uri, builds aSecurityContext, and stores it in the session. - The user is redirected to the original page (or
/).
from pyfly.security.oauth2 import (
ClientRegistrationRepository,
InMemoryClientRegistrationRepository,
google,
)
from pyfly.security.oauth2.login import OAuth2LoginHandler
# Set up client registrations
google_reg = google(
client_id="your-google-client-id",
client_secret="your-google-client-secret",
redirect_uri="http://localhost:8080/login/oauth2/code/google",
)
client_repo = InMemoryClientRegistrationRepository(google_reg)
# Create the login handler
login_handler = OAuth2LoginHandler(client_repository=client_repo)
# Get the routes for mounting in create_app()
oauth2_routes = login_handler.routes()Source: src/pyfly/security/oauth2/login.py
The OAuth2SessionSecurityFilter is a OncePerRequestFilter that restores the SecurityContext from the HTTP session on every request. It runs at HIGHEST_PRECEDENCE + 225, which is before the JWT-based SecurityFilter (at +250), ensuring session-based authentication takes priority over token-based authentication.
from pyfly.security.oauth2.session_security_filter import OAuth2SessionSecurityFilterBehavior:
- Reads the session from
request.state.session. - If a
SECURITY_CONTEXTattribute is stored in the session (set byOAuth2LoginHandlerduring login), restores it torequest.state.security_context. - If no session-based context is found and no
security_contexthas been set by an earlier filter, sets an anonymous context.
This filter is complementary to the JWT SecurityFilter. In applications that use both OAuth2 login (session-based) and API tokens (JWT-based), the session filter runs first. If the user has an active session, the session context is used. If not, the JWT SecurityFilter gets its turn to check for a Bearer token.
| Property | Value |
|---|---|
__pyfly_order__ |
HIGHEST_PRECEDENCE + 225 |
| Runs before | SecurityFilter (HP+250), HttpSecurityFilter (HP+350) |
Source: src/pyfly/security/oauth2/session_security_filter.py
A complete example wiring OAuth2 login into a PyFly application:
from pyfly.container import configuration, bean
from pyfly.security.oauth2 import (
InMemoryClientRegistrationRepository,
google, github,
)
from pyfly.security.oauth2.login import OAuth2LoginHandler
from pyfly.security.oauth2.session_security_filter import OAuth2SessionSecurityFilter
from pyfly.security.http_security import HttpSecurity
@configuration
class OAuth2Config:
@bean
def client_repository(self) -> InMemoryClientRegistrationRepository:
return InMemoryClientRegistrationRepository(
google(
client_id="google-client-id",
client_secret="google-client-secret",
redirect_uri="http://localhost:8080/login/oauth2/code/google",
),
github(
client_id="github-client-id",
client_secret="github-client-secret",
redirect_uri="http://localhost:8080/login/oauth2/code/github",
),
)
@bean
def oauth2_login_handler(self, client_repository: InMemoryClientRegistrationRepository) -> OAuth2LoginHandler:
return OAuth2LoginHandler(client_repository=client_repository)
@bean
def oauth2_session_filter(self) -> OAuth2SessionSecurityFilter:
return OAuth2SessionSecurityFilter()
@bean
def http_security_filter(self):
http_security = HttpSecurity()
http_security.authorize_requests() \
.request_matchers("/oauth2/**", "/login/**", "/logout").permit_all() \
.request_matchers("/api/**").authenticated() \
.any_request().permit_all()
return http_security.build()Then mount the OAuth2 routes via extra_routes in create_app():
from pyfly.web.adapters.starlette import create_app
login_handler = context.get_bean(OAuth2LoginHandler)
app = create_app(
title="My App",
context=context,
extra_routes=login_handler.routes(),
)The security module uses exceptions from pyfly.kernel.exceptions:
| Exception | HTTP Status | Description |
|---|---|---|
SecurityException |
401 | Base security error (auth failures) |
UnauthorizedException |
401 | Authentication required but not provided/invalid |
ForbiddenException |
403 | Authenticated but lacks permission |
The @secure decorator raises SecurityException directly with appropriate codes. The JWTService.decode() method raises SecurityException with code "INVALID_TOKEN" for any token validation failure.
When pyfly.security.enabled is set to true in your configuration, PyFly automatically wires the security beans through two auto-configuration classes. No manual bean registration is needed.
Conditions: pyfly.security.enabled=true AND pyjwt library installed.
| Bean | Type | Config Keys |
|---|---|---|
jwt_service |
JWTService |
pyfly.security.jwt.secret, pyfly.security.jwt.algorithm |
The auto-configured JWTService reads its secret and algorithm from the configuration:
pyfly:
security:
enabled: true
jwt:
secret: "my-production-secret" # REQUIRED: change from default
algorithm: "HS256" # Default: HS256Conditions: pyfly.security.enabled=true AND bcrypt library installed.
| Bean | Type | Config Keys |
|---|---|---|
password_encoder |
BcryptPasswordEncoder |
pyfly.security.password.bcrypt-rounds |
pyfly:
security:
enabled: true
password:
bcrypt-rounds: 12 # Default: 12Both auto-configuration classes use @conditional_on_missing_bean, so providing your own JWTService or BcryptPasswordEncoder via a @configuration + @bean method silently skips the auto-configured version:
from pyfly.container.bean import bean
from pyfly.container import configuration
from pyfly.security import JWTService
@configuration
class MySecurityConfig:
@bean
def jwt_service(self) -> JWTService:
return JWTService(secret="custom-secret", algorithm="RS256")Source: src/pyfly/security/auto_configuration.py
This complete example demonstrates a login/register flow with JWT authentication, password hashing, and role-based endpoint protection.
from pyfly.container import configuration, bean
from pyfly.security import JWTService, BcryptPasswordEncoder
@configuration
class SecurityConfig:
"""Wires security beans into the DI container."""
@bean
def jwt_service(self) -> JWTService:
# In production, load the secret from environment/config
return JWTService(secret="change-me-in-production", algorithm="HS256")
@bean
def password_encoder(self) -> BcryptPasswordEncoder:
return BcryptPasswordEncoder(rounds=12)from pyfly.data.relational.sqlalchemy import BaseEntity, Repository
from pyfly.container import repository as repo_stereotype
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession
class User(BaseEntity):
__tablename__ = "users"
username: Mapped[str] = mapped_column(String(255), unique=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
password_hash: Mapped[str] = mapped_column(String(255))
role: Mapped[str] = mapped_column(String(50), default="USER")
@repo_stereotype
class UserRepository(Repository[User]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(User, session)
async def find_by_username(self, username: str) -> list[User]: ...
async def exists_by_username(self, username: str) -> bool: ...
async def exists_by_email(self, email: str) -> bool: ...from datetime import datetime, timedelta, UTC
from pyfly.container import service
from pyfly.kernel.exceptions import (
UnauthorizedException,
ConflictException,
ResourceNotFoundException,
)
from pyfly.security import JWTService, BcryptPasswordEncoder, SecurityContext
@service
class AuthService:
def __init__(
self,
user_repo: UserRepository,
jwt_service: JWTService,
password_encoder: BcryptPasswordEncoder,
) -> None:
self._users = user_repo
self._jwt = jwt_service
self._encoder = password_encoder
async def register(self, username: str, email: str, password: str) -> str:
"""Register a new user and return a JWT token."""
if await self._users.exists_by_username(username):
raise ConflictException(
f"Username '{username}' is already taken",
code="USERNAME_TAKEN",
)
if await self._users.exists_by_email(email):
raise ConflictException(
f"Email '{email}' is already registered",
code="EMAIL_TAKEN",
)
user = User(
username=username,
email=email,
password_hash=self._encoder.hash(password),
role="USER",
)
saved = await self._users.save(user)
return self._create_token(saved)
async def login(self, username: str, password: str) -> str:
"""Authenticate a user and return a JWT token."""
users = await self._users.find_by_username(username)
if not users:
raise UnauthorizedException(
"Invalid credentials",
code="INVALID_CREDENTIALS",
)
user = users[0]
if not self._encoder.verify(password, user.password_hash):
raise UnauthorizedException(
"Invalid credentials",
code="INVALID_CREDENTIALS",
)
return self._create_token(user)
async def get_current_user(self, user_id: str) -> dict:
"""Get the current user's profile."""
from uuid import UUID
user = await self._users.find_by_id(UUID(user_id))
if not user:
raise ResourceNotFoundException(
"User not found", code="USER_NOT_FOUND"
)
return {
"id": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role,
}
def _create_token(self, user: User) -> str:
"""Create a JWT token for the given user."""
return self._jwt.encode({
"sub": str(user.id),
"username": user.username,
"roles": [user.role],
"permissions": self._get_permissions(user.role),
"exp": datetime.now(UTC) + timedelta(hours=24),
"iat": datetime.now(UTC),
})
@staticmethod
def _get_permissions(role: str) -> list[str]:
"""Map roles to permissions."""
permission_map = {
"USER": ["profile:read", "order:read", "order:create"],
"ADMIN": [
"profile:read", "profile:write",
"order:read", "order:create", "order:delete",
"user:read", "user:write", "user:delete",
],
}
return permission_map.get(role, [])from pydantic import BaseModel, Field
from pyfly.container import rest_controller
from pyfly.kernel.exceptions import UnauthorizedException, ConflictException
from pyfly.web import (
request_mapping, get_mapping, post_mapping,
exception_handler, Body,
)
from pyfly.security import SecurityContext, secure
class RegisterRequest(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., min_length=5)
password: str = Field(..., min_length=8)
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int = 86400 # 24 hours in seconds
@rest_controller
@request_mapping("/api/auth")
class AuthController:
def __init__(self, auth_service: AuthService) -> None:
self._auth = auth_service
@post_mapping("/register", status_code=201)
async def register(self, body: Body[RegisterRequest]) -> TokenResponse:
token = await self._auth.register(
username=body.username,
email=body.email,
password=body.password,
)
return TokenResponse(access_token=token)
@post_mapping("/login")
async def login(self, body: Body[LoginRequest]) -> TokenResponse:
token = await self._auth.login(
username=body.username,
password=body.password,
)
return TokenResponse(access_token=token)
@get_mapping("/me")
@secure(roles=["USER", "ADMIN"])
async def me(self, security_context: SecurityContext) -> dict:
return await self._auth.get_current_user(security_context.user_id)
# --- Exception Handlers ---
@exception_handler(UnauthorizedException)
async def handle_unauthorized(self, exc: UnauthorizedException):
return 401, {
"error": {
"message": str(exc),
"code": exc.code or "UNAUTHORIZED",
}
}
@exception_handler(ConflictException)
async def handle_conflict(self, exc: ConflictException):
return 409, {
"error": {
"message": str(exc),
"code": exc.code or "CONFLICT",
}
}from pyfly.web import delete_mapping, PathVar
@rest_controller
@request_mapping("/api/admin/users")
class AdminUserController:
def __init__(self, user_repo: UserRepository) -> None:
self._users = user_repo
@get_mapping("/")
@secure(roles=["ADMIN"])
async def list_users(self, security_context: SecurityContext) -> list[dict]:
users = await self._users.find_all()
return [
{"id": str(u.id), "username": u.username, "role": u.role}
for u in users
]
@delete_mapping("/{user_id}", status_code=204)
@secure(roles=["ADMIN"], permissions=["user:delete"])
async def delete_user(
self,
user_id: PathVar[str],
security_context: SecurityContext,
) -> None:
from uuid import UUID
await self._users.delete(UUID(user_id))from pyfly.web import CORSConfig
from pyfly.web.adapters.starlette import create_app
from pyfly.security import SecurityMiddleware, JWTService
def build_app(context):
"""Build the fully configured application."""
app = create_app(
title="My Application",
version="1.0.0",
description="Application with JWT authentication",
context=context,
docs_enabled=True,
cors=CORSConfig(
allowed_origins=["http://localhost:3000"],
allowed_methods=["GET", "POST", "PUT", "DELETE"],
allow_credentials=True,
),
)
# Add security middleware
jwt_service = context.get_bean(JWTService)
app.add_middleware(
SecurityMiddleware,
jwt_service=jwt_service,
exclude_paths=[
"/docs",
"/redoc",
"/openapi.json",
"/api/auth/login",
"/api/auth/register",
],
)
return app1. Register a new user:
POST /api/auth/register
Content-Type: application/json
{
"username": "alice",
"email": "alice@example.com",
"password": "securepassword123"
}
Response 201:
{
"access_token": "eyJhbGciOiJIUzI1NiI...",
"token_type": "bearer",
"expires_in": 86400
}
2. Log in:
POST /api/auth/login
Content-Type: application/json
{
"username": "alice",
"password": "securepassword123"
}
Response 200:
{
"access_token": "eyJhbGciOiJIUzI1NiI...",
"token_type": "bearer",
"expires_in": 86400
}
3. Access a protected endpoint:
GET /api/auth/me
Authorization: Bearer eyJhbGciOiJIUzI1NiI...
Response 200:
{
"id": "a1b2c3d4-...",
"username": "alice",
"email": "alice@example.com",
"role": "USER"
}
4. Access without a token:
GET /api/auth/me
Response 401:
{
"error": {
"message": "Authentication required",
"code": "AUTH_REQUIRED",
"status": 401,
"path": "/api/auth/me",
"timestamp": "2026-02-14T10:30:00+00:00",
"transaction_id": "..."
}
}
5. Access an admin-only endpoint without the ADMIN role:
GET /api/admin/users/
Authorization: Bearer eyJhbGciOiJIUzI1NiI... (token with role=USER)
Response 401:
{
"error": {
"message": "Insufficient roles: requires one of ['ADMIN']",
"code": "FORBIDDEN",
"status": 401,
"path": "/api/admin/users/",
"timestamp": "2026-02-14T10:30:00+00:00",
"transaction_id": "..."
}
}