Add ruff linter/formatter for Python
- Add ruff as dev dependency - Configure ruff in pyproject.toml with strict 88-char line limit - Ignore B008 (FastAPI Depends pattern is standard) - Allow longer lines in tests for readability - Fix all lint issues in source files - Add Makefile targets: lint-backend, format-backend, fix-backend
This commit is contained in:
parent
69bc8413e0
commit
6c218130e9
31 changed files with 1234 additions and 876 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import bcrypt
|
||||
from fastapi import Depends, HTTPException, Request, status
|
||||
|
|
@ -8,7 +8,7 @@ from sqlalchemy import select
|
|||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from database import get_db
|
||||
from models import User, Permission
|
||||
from models import Permission, User
|
||||
from schemas import UserResponse
|
||||
|
||||
SECRET_KEY = os.environ["SECRET_KEY"] # Required - see .env.example
|
||||
|
|
@ -32,9 +32,13 @@ def get_password_hash(password: str) -> str:
|
|||
).decode("utf-8")
|
||||
|
||||
|
||||
def create_access_token(data: dict[str, str], expires_delta: timedelta | None = None) -> str:
|
||||
def create_access_token(
|
||||
data: dict[str, str],
|
||||
expires_delta: timedelta | None = None,
|
||||
) -> str:
|
||||
to_encode: dict[str, str | datetime] = dict(data)
|
||||
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
||||
delta = expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
expire = datetime.now(UTC) + delta
|
||||
to_encode["exp"] = expire
|
||||
encoded: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
return encoded
|
||||
|
|
@ -60,11 +64,11 @@ async def get_current_user(
|
|||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication credentials",
|
||||
)
|
||||
|
||||
|
||||
token = request.cookies.get(COOKIE_NAME)
|
||||
if not token:
|
||||
raise credentials_exception
|
||||
|
||||
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
user_id_str = payload.get("sub")
|
||||
|
|
@ -72,7 +76,7 @@ async def get_current_user(
|
|||
raise credentials_exception
|
||||
user_id = int(user_id_str)
|
||||
except (JWTError, ValueError):
|
||||
raise credentials_exception
|
||||
raise credentials_exception from None
|
||||
|
||||
result = await db.execute(select(User).where(User.id == user_id))
|
||||
user = result.scalar_one_or_none()
|
||||
|
|
@ -83,27 +87,32 @@ async def get_current_user(
|
|||
|
||||
def require_permission(*required_permissions: Permission):
|
||||
"""
|
||||
Dependency factory that checks if user has ALL of the required permissions.
|
||||
|
||||
Dependency factory that checks if user has ALL required permissions.
|
||||
|
||||
Usage:
|
||||
@app.get("/api/counter")
|
||||
async def get_counter(user: User = Depends(require_permission(Permission.VIEW_COUNTER))):
|
||||
async def get_counter(
|
||||
user: User = Depends(require_permission(Permission.VIEW_COUNTER))
|
||||
):
|
||||
...
|
||||
"""
|
||||
|
||||
async def permission_checker(
|
||||
request: Request,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
) -> User:
|
||||
user = await get_current_user(request, db)
|
||||
user_permissions = await user.get_permissions(db)
|
||||
|
||||
|
||||
missing = [p for p in required_permissions if p not in user_permissions]
|
||||
if missing:
|
||||
missing_str = ", ".join(p.value for p in missing)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Missing required permissions: {', '.join(p.value for p in missing)}",
|
||||
detail=f"Missing required permissions: {missing_str}",
|
||||
)
|
||||
return user
|
||||
|
||||
return permission_checker
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue