finish branch

This commit is contained in:
counterweight 2025-12-19 00:12:43 +01:00
parent 66bc4c5a45
commit 40ca82bb45
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
11 changed files with 139 additions and 128 deletions

View file

@ -1,12 +1,17 @@
from datetime import datetime, UTC
from enum import Enum as PyEnum
from typing import List, Set
from typing import TypedDict
from sqlalchemy import Integer, String, Float, DateTime, ForeignKey, Table, Column, Enum, select
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import AsyncSession
from database import Base
class RoleConfig(TypedDict):
description: str
permissions: list["Permission"]
class Permission(str, PyEnum):
"""All available permissions in the system."""
# Counter permissions
@ -20,15 +25,19 @@ class Permission(str, PyEnum):
VIEW_AUDIT = "view_audit"
# Role name constants
ROLE_ADMIN = "admin"
ROLE_REGULAR = "regular"
# Role definitions with their permissions
ROLE_DEFINITIONS = {
"admin": {
ROLE_DEFINITIONS: dict[str, RoleConfig] = {
ROLE_ADMIN: {
"description": "Administrator with audit access",
"permissions": [
Permission.VIEW_AUDIT,
],
},
"regular": {
ROLE_REGULAR: {
"description": "Regular user with counter and sum access",
"permissions": [
Permission.VIEW_COUNTER,
@ -65,24 +74,20 @@ class Role(Base):
description: Mapped[str] = mapped_column(String(255), nullable=True)
# Relationship to users
users: Mapped[List["User"]] = relationship(
users: Mapped[list["User"]] = relationship(
"User",
secondary=user_roles,
back_populates="roles",
)
async def get_permissions(self, db: AsyncSession) -> Set[Permission]:
async def get_permissions(self, db: AsyncSession) -> set[Permission]:
"""Get all permissions for this role."""
result = await db.execute(
select(role_permissions.c.permission).where(role_permissions.c.role_id == self.id)
)
return {row[0] for row in result.fetchall()}
async def add_permission(self, db: AsyncSession, permission: Permission) -> None:
"""Add a permission to this role."""
await db.execute(role_permissions.insert().values(role_id=self.id, permission=permission))
async def set_permissions(self, db: AsyncSession, permissions: List[Permission]) -> None:
async def set_permissions(self, db: AsyncSession, permissions: list[Permission]) -> None:
"""Set all permissions for this role (replaces existing)."""
await db.execute(role_permissions.delete().where(role_permissions.c.role_id == self.id))
for perm in permissions:
@ -97,20 +102,21 @@ class User(Base):
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
# Relationship to roles
roles: Mapped[List[Role]] = relationship(
roles: Mapped[list[Role]] = relationship(
"Role",
secondary=user_roles,
back_populates="users",
lazy="selectin",
)
async def get_permissions(self, db: AsyncSession) -> Set[Permission]:
"""Get all permissions from all roles."""
permissions: Set[Permission] = set()
for role in self.roles:
role_perms = await role.get_permissions(db)
permissions.update(role_perms)
return permissions
async def get_permissions(self, db: AsyncSession) -> set[Permission]:
"""Get all permissions from all roles in a single query."""
result = await db.execute(
select(role_permissions.c.permission)
.join(user_roles, role_permissions.c.role_id == user_roles.c.role_id)
.where(user_roles.c.user_id == self.id)
)
return {row[0] for row in result.fetchall()}
async def has_permission(self, db: AsyncSession, permission: Permission) -> bool:
"""Check if user has a specific permission through any of their roles."""
@ -118,7 +124,7 @@ class User(Base):
return permission in permissions
@property
def role_names(self) -> List[str]:
def role_names(self) -> list[str]:
"""Get list of role names for API responses."""
return [role.name for role in self.roles]