implemented

This commit is contained in:
counterweight 2025-12-20 23:06:05 +01:00
parent a31bd8246c
commit d3638e2e69
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
18 changed files with 1643 additions and 120 deletions

View file

@ -11,11 +11,16 @@ from routes import audit as audit_routes
from routes import profile as profile_routes
from routes import invites as invites_routes
from routes import auth as auth_routes
from routes import meta as meta_routes
from validate_constants import validate_shared_constants
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup."""
"""Create database tables on startup and validate constants."""
# Validate shared constants match backend definitions
validate_shared_constants()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
@ -39,3 +44,4 @@ app.include_router(audit_routes.router)
app.include_router(profile_routes.router)
app.include_router(invites_routes.router)
app.include_router(invites_routes.admin_router)
app.include_router(meta_routes.router)

18
backend/routes/meta.py Normal file
View file

@ -0,0 +1,18 @@
"""Meta endpoints for shared constants."""
from fastapi import APIRouter
from models import Permission, InviteStatus, ROLE_ADMIN, ROLE_REGULAR
from schemas import ConstantsResponse
router = APIRouter(prefix="/api/meta", tags=["meta"])
@router.get("/constants", response_model=ConstantsResponse)
async def get_constants() -> ConstantsResponse:
"""Get shared constants for frontend/backend synchronization."""
return ConstantsResponse(
permissions=[p.value for p in Permission],
roles=[ROLE_ADMIN, ROLE_REGULAR],
invite_statuses=[s.value for s in InviteStatus],
)

View file

@ -138,3 +138,14 @@ class AdminUserResponse(BaseModel):
"""Minimal user info for admin dropdowns."""
id: int
email: str
# =============================================================================
# Meta/Constants Schemas
# =============================================================================
class ConstantsResponse(BaseModel):
"""Response model for shared constants."""
permissions: list[str]
roles: list[str]
invite_statuses: list[str]

View file

@ -0,0 +1,48 @@
"""Validate shared constants match backend definitions."""
import json
from pathlib import Path
from models import ROLE_ADMIN, ROLE_REGULAR, InviteStatus
def validate_shared_constants() -> None:
"""
Validate that shared/constants.json matches backend definitions.
Raises ValueError if there's a mismatch.
"""
constants_path = Path(__file__).parent.parent / "shared" / "constants.json"
if not constants_path.exists():
raise ValueError(f"Shared constants file not found: {constants_path}")
with open(constants_path) as f:
constants = json.load(f)
# Validate roles
expected_roles = {"ADMIN": ROLE_ADMIN, "REGULAR": ROLE_REGULAR}
if constants.get("roles") != expected_roles:
raise ValueError(
f"Role mismatch in shared/constants.json. "
f"Expected: {expected_roles}, Got: {constants.get('roles')}"
)
# Validate invite statuses
expected_statuses = {s.name: s.value for s in InviteStatus}
if constants.get("inviteStatuses") != expected_statuses:
raise ValueError(
f"Invite status mismatch in shared/constants.json. "
f"Expected: {expected_statuses}, Got: {constants.get('inviteStatuses')}"
)
# Validate validation rules exist (structure check only)
validation = constants.get("validation", {})
required_fields = ["telegram", "signal", "nostrNpub"]
for field in required_fields:
if field not in validation:
raise ValueError(f"Missing validation rules for '{field}' in shared/constants.json")
if __name__ == "__main__":
validate_shared_constants()
print("✓ Shared constants are valid")

View file

@ -1,7 +1,19 @@
"""Validation utilities for user profile fields."""
import json
from pathlib import Path
from email_validator import validate_email, EmailNotValidError
from bech32 import bech32_decode
# Load validation rules from shared constants
_constants_path = Path(__file__).parent.parent / "shared" / "constants.json"
with open(_constants_path) as f:
_constants = json.load(f)
TELEGRAM_RULES = _constants["validation"]["telegram"]
SIGNAL_RULES = _constants["validation"]["signal"]
NPUB_RULES = _constants["validation"]["nostrNpub"]
def validate_contact_email(value: str | None) -> str | None:
"""
@ -24,22 +36,25 @@ def validate_telegram(value: str | None) -> str | None:
"""
Validate Telegram handle.
Must start with @ if provided, with 1-32 characters after @.
Must start with @ if provided, with characters after @ within max length.
Returns None if valid, error message if invalid.
Empty/None values are valid (field is optional).
"""
if not value:
return None
if not value.startswith("@"):
return "Telegram handle must start with @"
prefix = TELEGRAM_RULES["mustStartWith"]
max_len = TELEGRAM_RULES["maxLengthAfterAt"]
if not value.startswith(prefix):
return f"Telegram handle must start with {prefix}"
handle = value[1:]
if not handle:
return "Telegram handle must have at least one character after @"
return f"Telegram handle must have at least one character after {prefix}"
if len(handle) > 32:
return "Telegram handle must be at most 32 characters (after @)"
if len(handle) > max_len:
return f"Telegram handle must be at most {max_len} characters (after {prefix})"
return None
@ -48,19 +63,21 @@ def validate_signal(value: str | None) -> str | None:
"""
Validate Signal username.
Any non-empty string is valid.
Any non-empty string within max length is valid.
Returns None if valid, error message if invalid.
Empty/None values are valid (field is optional).
"""
if not value:
return None
max_len = SIGNAL_RULES["maxLength"]
# Signal usernames are fairly permissive, just check it's not empty
if len(value.strip()) == 0:
return "Signal username cannot be empty"
if len(value) > 64:
return "Signal username must be at most 64 characters"
if len(value) > max_len:
return f"Signal username must be at most {max_len} characters"
return None
@ -76,8 +93,11 @@ def validate_nostr_npub(value: str | None) -> str | None:
if not value:
return None
if not value.startswith("npub1"):
return "Nostr npub must start with 'npub1'"
prefix = NPUB_RULES["prefix"]
expected_words = NPUB_RULES["bech32Words"]
if not value.startswith(prefix):
return f"Nostr npub must start with '{prefix}'"
# Decode bech32 to validate checksum
hrp, data = bech32_decode(value)
@ -90,7 +110,7 @@ def validate_nostr_npub(value: str | None) -> str | None:
# npub should decode to 32 bytes (256 bits) for a public key
# In bech32, each character encodes 5 bits, so 32 bytes = 52 characters of data
if len(data) != 52:
if len(data) != expected_words:
return "Invalid Nostr npub: incorrect length"
return None