""" Permission and Role-Based Access Control Tests These tests verify that: 1. Users can only access endpoints they have permission for 2. Users without proper roles are denied access (403) 3. Unauthenticated users are denied access (401) 4. The permission system cannot be bypassed """ import pytest from models import Permission # ============================================================================= # Role Assignment Tests # ============================================================================= class TestRoleAssignment: """Test that roles are properly assigned and returned.""" @pytest.mark.asyncio async def test_regular_user_has_correct_roles(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/auth/me") assert response.status_code == 200 data = response.json() assert "regular" in data["roles"] assert "admin" not in data["roles"] @pytest.mark.asyncio async def test_admin_user_has_correct_roles(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/auth/me") assert response.status_code == 200 data = response.json() assert "admin" in data["roles"] assert "regular" not in data["roles"] @pytest.mark.asyncio async def test_regular_user_has_correct_permissions(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() permissions = data["permissions"] # Should have counter and sum permissions assert Permission.VIEW_COUNTER.value in permissions assert Permission.INCREMENT_COUNTER.value in permissions assert Permission.USE_SUM.value in permissions # Should NOT have audit permission assert Permission.VIEW_AUDIT.value not in permissions @pytest.mark.asyncio async def test_admin_user_has_correct_permissions(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() permissions = data["permissions"] # Should have audit permission assert Permission.VIEW_AUDIT.value in permissions # Should NOT have counter/sum permissions assert Permission.VIEW_COUNTER.value not in permissions assert Permission.INCREMENT_COUNTER.value not in permissions assert Permission.USE_SUM.value not in permissions @pytest.mark.asyncio async def test_user_with_no_roles_has_no_permissions(self, client_factory, user_no_roles): async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() assert data["roles"] == [] assert data["permissions"] == [] # ============================================================================= # Counter Endpoint Access Tests # ============================================================================= class TestCounterAccess: """Test access control for counter endpoints.""" @pytest.mark.asyncio async def test_regular_user_can_view_counter(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/counter") assert response.status_code == 200 assert "value" in response.json() @pytest.mark.asyncio async def test_regular_user_can_increment_counter(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.post("/api/counter/increment") assert response.status_code == 200 assert "value" in response.json() @pytest.mark.asyncio async def test_admin_cannot_view_counter(self, client_factory, admin_user): """Admin users should be forbidden from counter endpoints.""" async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/counter") assert response.status_code == 403 assert "permission" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_admin_cannot_increment_counter(self, client_factory, admin_user): """Admin users should be forbidden from incrementing counter.""" async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.post("/api/counter/increment") assert response.status_code == 403 @pytest.mark.asyncio async def test_user_without_roles_cannot_view_counter(self, client_factory, user_no_roles): """Users with no roles should be forbidden.""" async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.get("/api/counter") assert response.status_code == 403 @pytest.mark.asyncio async def test_unauthenticated_cannot_view_counter(self, client): """Unauthenticated requests should get 401.""" response = await client.get("/api/counter") assert response.status_code == 401 @pytest.mark.asyncio async def test_unauthenticated_cannot_increment_counter(self, client): """Unauthenticated requests should get 401.""" response = await client.post("/api/counter/increment") assert response.status_code == 401 # ============================================================================= # Sum Endpoint Access Tests # ============================================================================= class TestSumAccess: """Test access control for sum endpoint.""" @pytest.mark.asyncio async def test_regular_user_can_use_sum(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.post( "/api/sum", json={"a": 5, "b": 3}, ) assert response.status_code == 200 data = response.json() assert data["result"] == 8 @pytest.mark.asyncio async def test_admin_cannot_use_sum(self, client_factory, admin_user): """Admin users should be forbidden from sum endpoint.""" async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.post( "/api/sum", json={"a": 5, "b": 3}, ) assert response.status_code == 403 @pytest.mark.asyncio async def test_user_without_roles_cannot_use_sum(self, client_factory, user_no_roles): async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.post( "/api/sum", json={"a": 5, "b": 3}, ) assert response.status_code == 403 @pytest.mark.asyncio async def test_unauthenticated_cannot_use_sum(self, client): response = await client.post( "/api/sum", json={"a": 5, "b": 3}, ) assert response.status_code == 401 # ============================================================================= # Audit Endpoint Access Tests # ============================================================================= class TestAuditAccess: """Test access control for audit endpoints.""" @pytest.mark.asyncio async def test_admin_can_view_counter_audit(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/audit/counter") assert response.status_code == 200 data = response.json() assert "records" in data assert "total" in data @pytest.mark.asyncio async def test_admin_can_view_sum_audit(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/audit/sum") assert response.status_code == 200 data = response.json() assert "records" in data assert "total" in data @pytest.mark.asyncio async def test_regular_user_cannot_view_counter_audit(self, client_factory, regular_user): """Regular users should be forbidden from audit endpoints.""" async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/audit/counter") assert response.status_code == 403 assert "permission" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_regular_user_cannot_view_sum_audit(self, client_factory, regular_user): """Regular users should be forbidden from audit endpoints.""" async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/audit/sum") assert response.status_code == 403 @pytest.mark.asyncio async def test_user_without_roles_cannot_view_audit(self, client_factory, user_no_roles): async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.get("/api/audit/counter") assert response.status_code == 403 @pytest.mark.asyncio async def test_unauthenticated_cannot_view_counter_audit(self, client): response = await client.get("/api/audit/counter") assert response.status_code == 401 @pytest.mark.asyncio async def test_unauthenticated_cannot_view_sum_audit(self, client): response = await client.get("/api/audit/sum") assert response.status_code == 401 # ============================================================================= # Offensive Security Tests - Bypass Attempts # ============================================================================= class TestSecurityBypassAttempts: """ Offensive tests that attempt to bypass security controls. These simulate potential attack vectors. """ @pytest.mark.asyncio async def test_cannot_access_audit_with_forged_role_claim(self, client_factory, regular_user): """ Attempt to access audit by somehow claiming admin role. The server should verify roles from DB, not trust client claims. """ # Regular user tries to access audit endpoint async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/audit/counter") # Should be denied regardless of any manipulation attempts assert response.status_code == 403 @pytest.mark.asyncio async def test_cannot_access_counter_with_expired_session(self, client_factory): """Test that invalid/expired tokens are rejected.""" fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5OTk5IiwiZXhwIjoxfQ.invalid" async with client_factory.create(cookies={"auth_token": fake_token}) as client: response = await client.get("/api/counter") assert response.status_code == 401 @pytest.mark.asyncio async def test_cannot_access_with_tampered_token(self, client_factory, regular_user): """Test that tokens signed with wrong key are rejected.""" # Take a valid token and modify it original_token = regular_user["cookies"].get("auth_token", "") if original_token: tampered_token = original_token[:-5] + "XXXXX" async with client_factory.create(cookies={"auth_token": tampered_token}) as client: response = await client.get("/api/counter") assert response.status_code == 401 @pytest.mark.asyncio async def test_cannot_escalate_to_admin_via_registration(self, client_factory): """ Test that new registrations cannot claim admin role. New users should only get 'regular' role by default. """ from tests.helpers import unique_email, create_invite_for_registration async with client_factory.get_db_session() as db: invite_code = await create_invite_for_registration(db, unique_email("gf")) response = await client_factory.post( "/api/auth/register", json={ "email": unique_email(), "password": "password123", "invite_identifier": invite_code, }, ) assert response.status_code == 200 data = response.json() # Should only have regular role, not admin assert "admin" not in data["roles"] assert Permission.VIEW_AUDIT.value not in data["permissions"] # Try to access audit with this new user async with client_factory.create(cookies=dict(response.cookies)) as client: audit_response = await client.get("/api/audit/counter") assert audit_response.status_code == 403 @pytest.mark.asyncio async def test_deleted_user_token_is_invalid(self, client_factory): """ If a user is deleted, their token should no longer work. This tests that tokens are validated against current DB state. """ from tests.helpers import unique_email from sqlalchemy import delete from models import User email = unique_email("deleted") # Create and login user async with client_factory.get_db_session() as db: from tests.conftest import create_user_with_roles user = await create_user_with_roles(db, email, "password123", ["regular"]) user_id = user.id login_response = await client_factory.post( "/api/auth/login", json={"email": email, "password": "password123"}, ) cookies = dict(login_response.cookies) # Delete the user from DB async with client_factory.get_db_session() as db: await db.execute(delete(User).where(User.id == user_id)) await db.commit() # Try to use the old token async with client_factory.create(cookies=cookies) as client: response = await client.get("/api/auth/me") assert response.status_code == 401 @pytest.mark.asyncio async def test_role_change_reflected_immediately(self, client_factory): """ If a user's role is changed, the change should be reflected in subsequent requests (no stale permission cache). """ from tests.helpers import unique_email from sqlalchemy import select from models import User, Role email = unique_email("rolechange") # Create regular user async with client_factory.get_db_session() as db: from tests.conftest import create_user_with_roles await create_user_with_roles(db, email, "password123", ["regular"]) login_response = await client_factory.post( "/api/auth/login", json={"email": email, "password": "password123"}, ) cookies = dict(login_response.cookies) # Verify can access counter but not audit async with client_factory.create(cookies=cookies) as client: assert (await client.get("/api/counter")).status_code == 200 assert (await client.get("/api/audit/counter")).status_code == 403 # Change user's role from regular to admin async with client_factory.get_db_session() as db: result = await db.execute(select(User).where(User.email == email)) user = result.scalar_one() result = await db.execute(select(Role).where(Role.name == "admin")) admin_role = result.scalar_one() result = await db.execute(select(Role).where(Role.name == "regular")) regular_role = result.scalar_one() user.roles = [admin_role] # Remove regular, add admin await db.commit() # Now should have audit access but not counter access async with client_factory.create(cookies=cookies) as client: assert (await client.get("/api/audit/counter")).status_code == 200 assert (await client.get("/api/counter")).status_code == 403 # ============================================================================= # Audit Record Tests # ============================================================================= class TestAuditRecords: """Test that actions are properly recorded in audit logs.""" @pytest.mark.asyncio async def test_counter_increment_creates_audit_record( self, client_factory, regular_user, admin_user ): """Verify that counter increments are recorded and visible in audit.""" # Regular user increments counter async with client_factory.create(cookies=regular_user["cookies"]) as client: await client.post("/api/counter/increment") # Admin checks audit async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/audit/counter") assert response.status_code == 200 data = response.json() assert data["total"] >= 1 # Find record for our user records = data["records"] user_records = [r for r in records if r["user_email"] == regular_user["email"]] assert len(user_records) >= 1 @pytest.mark.asyncio async def test_sum_operation_creates_audit_record( self, client_factory, regular_user, admin_user ): """Verify that sum operations are recorded and visible in audit.""" # Regular user uses sum async with client_factory.create(cookies=regular_user["cookies"]) as client: await client.post("/api/sum", json={"a": 10, "b": 20}) # Admin checks audit async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/audit/sum") assert response.status_code == 200 data = response.json() assert data["total"] >= 1 # Find record with our values records = data["records"] matching = [r for r in records if r["a"] == 10 and r["b"] == 20 and r["result"] == 30] assert len(matching) >= 1