arbret/backend/tests/test_permissions.py

472 lines
19 KiB
Python

"""
Permission and Role-Based Access Control Tests
These tests verify that:
1. Users can only access endpoints they have permission for
2. Users without proper roles are denied access (403)
3. Unauthenticated users are denied access (401)
4. The permission system cannot be bypassed
"""
import pytest
from models import Permission
# =============================================================================
# Role Assignment Tests
# =============================================================================
class TestRoleAssignment:
"""Test that roles are properly assigned and returned."""
@pytest.mark.asyncio
async def test_regular_user_has_correct_roles(self, client_factory, regular_user):
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/auth/me")
assert response.status_code == 200
data = response.json()
assert "regular" in data["roles"]
assert "admin" not in data["roles"]
@pytest.mark.asyncio
async def test_admin_user_has_correct_roles(self, client_factory, admin_user):
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/auth/me")
assert response.status_code == 200
data = response.json()
assert "admin" in data["roles"]
assert "regular" not in data["roles"]
@pytest.mark.asyncio
async def test_regular_user_has_correct_permissions(self, client_factory, regular_user):
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/auth/me")
data = response.json()
permissions = data["permissions"]
# Should have counter and sum permissions
assert Permission.VIEW_COUNTER.value in permissions
assert Permission.INCREMENT_COUNTER.value in permissions
assert Permission.USE_SUM.value in permissions
# Should NOT have audit permission
assert Permission.VIEW_AUDIT.value not in permissions
@pytest.mark.asyncio
async def test_admin_user_has_correct_permissions(self, client_factory, admin_user):
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/auth/me")
data = response.json()
permissions = data["permissions"]
# Should have audit permission
assert Permission.VIEW_AUDIT.value in permissions
# Should NOT have counter/sum permissions
assert Permission.VIEW_COUNTER.value not in permissions
assert Permission.INCREMENT_COUNTER.value not in permissions
assert Permission.USE_SUM.value not in permissions
@pytest.mark.asyncio
async def test_user_with_no_roles_has_no_permissions(self, client_factory, user_no_roles):
async with client_factory.create(cookies=user_no_roles["cookies"]) as client:
response = await client.get("/api/auth/me")
data = response.json()
assert data["roles"] == []
assert data["permissions"] == []
# =============================================================================
# Counter Endpoint Access Tests
# =============================================================================
class TestCounterAccess:
"""Test access control for counter endpoints."""
@pytest.mark.asyncio
async def test_regular_user_can_view_counter(self, client_factory, regular_user):
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/counter")
assert response.status_code == 200
assert "value" in response.json()
@pytest.mark.asyncio
async def test_regular_user_can_increment_counter(self, client_factory, regular_user):
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.post("/api/counter/increment")
assert response.status_code == 200
assert "value" in response.json()
@pytest.mark.asyncio
async def test_admin_cannot_view_counter(self, client_factory, admin_user):
"""Admin users should be forbidden from counter endpoints."""
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/counter")
assert response.status_code == 403
assert "permission" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_admin_cannot_increment_counter(self, client_factory, admin_user):
"""Admin users should be forbidden from incrementing counter."""
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.post("/api/counter/increment")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_user_without_roles_cannot_view_counter(self, client_factory, user_no_roles):
"""Users with no roles should be forbidden."""
async with client_factory.create(cookies=user_no_roles["cookies"]) as client:
response = await client.get("/api/counter")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_unauthenticated_cannot_view_counter(self, client):
"""Unauthenticated requests should get 401."""
response = await client.get("/api/counter")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_unauthenticated_cannot_increment_counter(self, client):
"""Unauthenticated requests should get 401."""
response = await client.post("/api/counter/increment")
assert response.status_code == 401
# =============================================================================
# Sum Endpoint Access Tests
# =============================================================================
class TestSumAccess:
"""Test access control for sum endpoint."""
@pytest.mark.asyncio
async def test_regular_user_can_use_sum(self, client_factory, regular_user):
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.post(
"/api/sum",
json={"a": 5, "b": 3},
)
assert response.status_code == 200
data = response.json()
assert data["result"] == 8
@pytest.mark.asyncio
async def test_admin_cannot_use_sum(self, client_factory, admin_user):
"""Admin users should be forbidden from sum endpoint."""
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.post(
"/api/sum",
json={"a": 5, "b": 3},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_user_without_roles_cannot_use_sum(self, client_factory, user_no_roles):
async with client_factory.create(cookies=user_no_roles["cookies"]) as client:
response = await client.post(
"/api/sum",
json={"a": 5, "b": 3},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_unauthenticated_cannot_use_sum(self, client):
response = await client.post(
"/api/sum",
json={"a": 5, "b": 3},
)
assert response.status_code == 401
# =============================================================================
# Audit Endpoint Access Tests
# =============================================================================
class TestAuditAccess:
"""Test access control for audit endpoints."""
@pytest.mark.asyncio
async def test_admin_can_view_counter_audit(self, client_factory, admin_user):
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/audit/counter")
assert response.status_code == 200
data = response.json()
assert "records" in data
assert "total" in data
@pytest.mark.asyncio
async def test_admin_can_view_sum_audit(self, client_factory, admin_user):
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/audit/sum")
assert response.status_code == 200
data = response.json()
assert "records" in data
assert "total" in data
@pytest.mark.asyncio
async def test_regular_user_cannot_view_counter_audit(self, client_factory, regular_user):
"""Regular users should be forbidden from audit endpoints."""
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/audit/counter")
assert response.status_code == 403
assert "permission" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_regular_user_cannot_view_sum_audit(self, client_factory, regular_user):
"""Regular users should be forbidden from audit endpoints."""
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/audit/sum")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_user_without_roles_cannot_view_audit(self, client_factory, user_no_roles):
async with client_factory.create(cookies=user_no_roles["cookies"]) as client:
response = await client.get("/api/audit/counter")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_unauthenticated_cannot_view_counter_audit(self, client):
response = await client.get("/api/audit/counter")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_unauthenticated_cannot_view_sum_audit(self, client):
response = await client.get("/api/audit/sum")
assert response.status_code == 401
# =============================================================================
# Offensive Security Tests - Bypass Attempts
# =============================================================================
class TestSecurityBypassAttempts:
"""
Offensive tests that attempt to bypass security controls.
These simulate potential attack vectors.
"""
@pytest.mark.asyncio
async def test_cannot_access_audit_with_forged_role_claim(self, client_factory, regular_user):
"""
Attempt to access audit by somehow claiming admin role.
The server should verify roles from DB, not trust client claims.
"""
# Regular user tries to access audit endpoint
async with client_factory.create(cookies=regular_user["cookies"]) as client:
response = await client.get("/api/audit/counter")
# Should be denied regardless of any manipulation attempts
assert response.status_code == 403
@pytest.mark.asyncio
async def test_cannot_access_counter_with_expired_session(self, client_factory):
"""Test that invalid/expired tokens are rejected."""
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5OTk5IiwiZXhwIjoxfQ.invalid"
async with client_factory.create(cookies={"auth_token": fake_token}) as client:
response = await client.get("/api/counter")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_cannot_access_with_tampered_token(self, client_factory, regular_user):
"""Test that tokens signed with wrong key are rejected."""
# Take a valid token and modify it
original_token = regular_user["cookies"].get("auth_token", "")
if original_token:
tampered_token = original_token[:-5] + "XXXXX"
async with client_factory.create(cookies={"auth_token": tampered_token}) as client:
response = await client.get("/api/counter")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_cannot_escalate_to_admin_via_registration(self, client_factory):
"""
Test that new registrations cannot claim admin role.
New users should only get 'regular' role by default.
"""
from tests.helpers import unique_email, create_invite_for_godfather
from tests.conftest import create_user_with_roles
from models import ROLE_REGULAR
async with client_factory.get_db_session() as db:
godfather = await create_user_with_roles(db, unique_email("gf"), "pass123", [ROLE_REGULAR])
invite_code = await create_invite_for_godfather(db, godfather.id)
response = await client_factory.post(
"/api/auth/register",
json={
"email": unique_email(),
"password": "password123",
"invite_identifier": invite_code,
},
)
assert response.status_code == 200
data = response.json()
# Should only have regular role, not admin
assert "admin" not in data["roles"]
assert Permission.VIEW_AUDIT.value not in data["permissions"]
# Try to access audit with this new user
async with client_factory.create(cookies=dict(response.cookies)) as client:
audit_response = await client.get("/api/audit/counter")
assert audit_response.status_code == 403
@pytest.mark.asyncio
async def test_deleted_user_token_is_invalid(self, client_factory):
"""
If a user is deleted, their token should no longer work.
This tests that tokens are validated against current DB state.
"""
from tests.helpers import unique_email
from sqlalchemy import delete
from models import User
email = unique_email("deleted")
# Create and login user
async with client_factory.get_db_session() as db:
from tests.conftest import create_user_with_roles
user = await create_user_with_roles(db, email, "password123", ["regular"])
user_id = user.id
login_response = await client_factory.post(
"/api/auth/login",
json={"email": email, "password": "password123"},
)
cookies = dict(login_response.cookies)
# Delete the user from DB
async with client_factory.get_db_session() as db:
await db.execute(delete(User).where(User.id == user_id))
await db.commit()
# Try to use the old token
async with client_factory.create(cookies=cookies) as client:
response = await client.get("/api/auth/me")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_role_change_reflected_immediately(self, client_factory):
"""
If a user's role is changed, the change should be reflected
in subsequent requests (no stale permission cache).
"""
from tests.helpers import unique_email
from sqlalchemy import select
from models import User, Role
email = unique_email("rolechange")
# Create regular user
async with client_factory.get_db_session() as db:
from tests.conftest import create_user_with_roles
await create_user_with_roles(db, email, "password123", ["regular"])
login_response = await client_factory.post(
"/api/auth/login",
json={"email": email, "password": "password123"},
)
cookies = dict(login_response.cookies)
# Verify can access counter but not audit
async with client_factory.create(cookies=cookies) as client:
assert (await client.get("/api/counter")).status_code == 200
assert (await client.get("/api/audit/counter")).status_code == 403
# Change user's role from regular to admin
async with client_factory.get_db_session() as db:
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one()
result = await db.execute(select(Role).where(Role.name == "admin"))
admin_role = result.scalar_one()
result = await db.execute(select(Role).where(Role.name == "regular"))
regular_role = result.scalar_one()
user.roles = [admin_role] # Remove regular, add admin
await db.commit()
# Now should have audit access but not counter access
async with client_factory.create(cookies=cookies) as client:
assert (await client.get("/api/audit/counter")).status_code == 200
assert (await client.get("/api/counter")).status_code == 403
# =============================================================================
# Audit Record Tests
# =============================================================================
class TestAuditRecords:
"""Test that actions are properly recorded in audit logs."""
@pytest.mark.asyncio
async def test_counter_increment_creates_audit_record(
self, client_factory, regular_user, admin_user
):
"""Verify that counter increments are recorded and visible in audit."""
# Regular user increments counter
async with client_factory.create(cookies=regular_user["cookies"]) as client:
await client.post("/api/counter/increment")
# Admin checks audit
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/audit/counter")
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
# Find record for our user
records = data["records"]
user_records = [r for r in records if r["user_email"] == regular_user["email"]]
assert len(user_records) >= 1
@pytest.mark.asyncio
async def test_sum_operation_creates_audit_record(
self, client_factory, regular_user, admin_user
):
"""Verify that sum operations are recorded and visible in audit."""
# Regular user uses sum
async with client_factory.create(cookies=regular_user["cookies"]) as client:
await client.post("/api/sum", json={"a": 10, "b": 20})
# Admin checks audit
async with client_factory.create(cookies=admin_user["cookies"]) as client:
response = await client.get("/api/audit/sum")
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
# Find record with our values
records = data["records"]
matching = [r for r in records if r["a"] == 10 and r["b"] == 20 and r["result"] == 30]
assert len(matching) >= 1