""" Permission and Role-Based Access Control Tests These tests verify that: 1. Users can only access endpoints they have permission for 2. Users without proper roles are denied access (403) 3. Unauthenticated users are denied access (401) 4. The permission system cannot be bypassed """ import pytest from models import Permission # ============================================================================= # Role Assignment Tests # ============================================================================= class TestRoleAssignment: """Test that roles are properly assigned and returned.""" @pytest.mark.asyncio async def test_regular_user_has_correct_roles(self, client_factory, regular_user): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/auth/me") assert response.status_code == 200 data = response.json() assert "regular" in data["roles"] assert "admin" not in data["roles"] @pytest.mark.asyncio async def test_admin_user_has_correct_roles(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/auth/me") assert response.status_code == 200 data = response.json() assert "admin" in data["roles"] assert "regular" not in data["roles"] @pytest.mark.asyncio async def test_regular_user_has_correct_permissions( self, client_factory, regular_user ): async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() permissions = data["permissions"] # Should have profile and exchange permissions assert Permission.MANAGE_OWN_PROFILE.value in permissions assert Permission.CREATE_EXCHANGE.value in permissions assert Permission.VIEW_OWN_EXCHANGES.value in permissions # Should NOT have audit permission assert Permission.VIEW_AUDIT.value not in permissions @pytest.mark.asyncio async def test_admin_user_has_correct_permissions(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() permissions = data["permissions"] # Should have audit permission assert Permission.VIEW_AUDIT.value in permissions # Should NOT have exchange permissions (those are for regular users) assert Permission.CREATE_EXCHANGE.value not in permissions @pytest.mark.asyncio async def test_user_with_no_roles_has_no_permissions( self, client_factory, user_no_roles ): async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.get("/api/auth/me") data = response.json() assert data["roles"] == [] assert data["permissions"] == [] # ============================================================================= # Audit Endpoint Access Tests # ============================================================================= class TestAuditAccess: """Test access control for audit endpoints.""" @pytest.mark.asyncio async def test_admin_can_view_price_history(self, client_factory, admin_user): async with client_factory.create(cookies=admin_user["cookies"]) as client: response = await client.get("/api/audit/price-history") assert response.status_code == 200 # Returns a list assert isinstance(response.json(), list) @pytest.mark.asyncio async def test_regular_user_cannot_view_price_history( self, client_factory, regular_user ): """Regular users should be forbidden from audit endpoints.""" async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/audit/price-history") assert response.status_code == 403 assert "permission" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_user_without_roles_cannot_view_audit( self, client_factory, user_no_roles ): async with client_factory.create(cookies=user_no_roles["cookies"]) as client: response = await client.get("/api/audit/price-history") assert response.status_code == 403 @pytest.mark.asyncio async def test_unauthenticated_cannot_view_price_history(self, client): response = await client.get("/api/audit/price-history") assert response.status_code == 401 # ============================================================================= # Offensive Security Tests - Bypass Attempts # ============================================================================= class TestSecurityBypassAttempts: """ Offensive tests that attempt to bypass security controls. These simulate potential attack vectors. """ @pytest.mark.asyncio async def test_cannot_access_audit_with_forged_role_claim( self, client_factory, regular_user ): """ Attempt to access audit by somehow claiming admin role. The server should verify roles from DB, not trust client claims. """ # Regular user tries to access audit endpoint async with client_factory.create(cookies=regular_user["cookies"]) as client: response = await client.get("/api/audit/price-history") # Should be denied regardless of any manipulation attempts assert response.status_code == 403 @pytest.mark.asyncio async def test_cannot_access_with_expired_session(self, client_factory): """Test that invalid/expired tokens are rejected.""" fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5OTk5IiwiZXhwIjoxfQ.invalid" async with client_factory.create(cookies={"auth_token": fake_token}) as client: response = await client.get("/api/profile") assert response.status_code == 401 @pytest.mark.asyncio async def test_cannot_access_with_tampered_token( self, client_factory, regular_user ): """Test that tokens signed with wrong key are rejected.""" # Take a valid token and modify it original_token = regular_user["cookies"].get("auth_token", "") if original_token: tampered_token = original_token[:-5] + "XXXXX" async with client_factory.create( cookies={"auth_token": tampered_token} ) as client: response = await client.get("/api/profile") assert response.status_code == 401 @pytest.mark.asyncio async def test_cannot_escalate_to_admin_via_registration(self, client_factory): """ Test that new registrations cannot claim admin role. New users should only get 'regular' role by default. """ from models import ROLE_REGULAR from tests.conftest import create_user_with_roles from tests.helpers import create_invite_for_godfather, unique_email async with client_factory.get_db_session() as db: godfather = await create_user_with_roles( db, unique_email("gf"), "pass123", [ROLE_REGULAR] ) invite_code = await create_invite_for_godfather(db, godfather.id) response = await client_factory.post( "/api/auth/register", json={ "email": unique_email(), "password": "password123", "invite_identifier": invite_code, }, ) assert response.status_code == 200 data = response.json() # Should only have regular role, not admin assert "admin" not in data["roles"] assert Permission.VIEW_AUDIT.value not in data["permissions"] # Try to access audit with this new user async with client_factory.create(cookies=dict(response.cookies)) as client: audit_response = await client.get("/api/audit/price-history") assert audit_response.status_code == 403 @pytest.mark.asyncio async def test_deleted_user_token_is_invalid(self, client_factory): """ If a user is deleted, their token should no longer work. This tests that tokens are validated against current DB state. """ from sqlalchemy import delete from models import User from tests.helpers import unique_email email = unique_email("deleted") # Create and login user async with client_factory.get_db_session() as db: from tests.conftest import create_user_with_roles user = await create_user_with_roles(db, email, "password123", ["regular"]) user_id = user.id login_response = await client_factory.post( "/api/auth/login", json={"email": email, "password": "password123"}, ) cookies = dict(login_response.cookies) # Delete the user from DB async with client_factory.get_db_session() as db: await db.execute(delete(User).where(User.id == user_id)) await db.commit() # Try to use the old token async with client_factory.create(cookies=cookies) as client: response = await client.get("/api/auth/me") assert response.status_code == 401 @pytest.mark.asyncio async def test_role_change_reflected_immediately(self, client_factory): """ If a user's role is changed, the change should be reflected in subsequent requests (no stale permission cache). """ from sqlalchemy import select from models import Role, User from tests.helpers import unique_email email = unique_email("rolechange") # Create regular user async with client_factory.get_db_session() as db: from tests.conftest import create_user_with_roles await create_user_with_roles(db, email, "password123", ["regular"]) login_response = await client_factory.post( "/api/auth/login", json={"email": email, "password": "password123"}, ) cookies = dict(login_response.cookies) # Verify can access profile but not audit async with client_factory.create(cookies=cookies) as client: assert (await client.get("/api/profile")).status_code == 200 assert (await client.get("/api/audit/price-history")).status_code == 403 # Change user's role from regular to admin async with client_factory.get_db_session() as db: result = await db.execute(select(User).where(User.email == email)) user = result.scalar_one() result = await db.execute(select(Role).where(Role.name == "admin")) admin_role = result.scalar_one() user.roles = [admin_role] # Replace roles with admin only await db.commit() # Now should have audit access but not profile access (admin doesn't have MANAGE_OWN_PROFILE) async with client_factory.create(cookies=cookies) as client: assert (await client.get("/api/audit/price-history")).status_code == 200 assert (await client.get("/api/profile")).status_code == 403