diff --git a/.cursor/rules/running_tests.mdc b/.cursor/rules/running_tests.mdc index 32dc2c7..515c139 100644 --- a/.cursor/rules/running_tests.mdc +++ b/.cursor/rules/running_tests.mdc @@ -11,4 +11,4 @@ Use the `TEST` variable to select specific tests: - Backend: `make test-backend TEST="tests/test_booking.py"` or `TEST="tests/test_booking.py::TestClass::test_method"` - Frontend: `make test-frontend TEST="app/login"` (file pattern) - E2E: `make test-e2e TEST="auth"` (matches e2e/auth.spec.ts) -- Don't do `2>&1 | tail`. Let the output hit the console when running the tests. +- Don't use `tail`. Let the output hit the console when running the tests. diff --git a/Makefile b/Makefile index a79d834..62d3db3 100644 --- a/Makefile +++ b/Makefile @@ -48,8 +48,8 @@ db-seed: db-ready dev: $(MAKE) db-seed cd backend && uv run uvicorn main:app --reload & \ - cd frontend && npm run dev & \ cd backend && uv run python worker.py & \ + cd frontend && npm run dev & \ wait # TEST variable can be used to select specific tests: diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 5f49358..6892b8f 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -11,7 +11,7 @@ dependencies = [ "python-jose[cryptography]>=3.3.0", "email-validator>=2.0.0", "bech32>=1.2.0", - "pgqueuer>=0.1.0", + "pgqueuer>=0.14.0", ] [dependency-groups] diff --git a/backend/worker.py b/backend/worker.py index f304871..68c2bb5 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -1,62 +1,73 @@ -"""Worker process for processing async jobs using pgqueuer.""" +"""Background job worker using pgqueuer.""" import asyncio import logging import os import asyncpg -from pgqueuer import AsyncpgDriver, PgQueuer, Queries +from pgqueuer import Job, QueueManager +from pgqueuer.db import AsyncpgDriver +from pgqueuer.queries import Queries -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("worker") -# Get DATABASE_URL and convert from SQLAlchemy format (postgresql+asyncpg://) -# to asyncpg format (postgresql://) -_raw_db_url = os.getenv( +# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql:// +_raw_url = os.getenv( "DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret" ) -DATABASE_URL = _raw_db_url.replace("postgresql+asyncpg://", "postgresql://") +DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://") -async def main() -> None: - """Main worker loop.""" - logger.info("Starting worker...") - - # Connect to database independently +async def install_schema() -> None: + """Install pgqueuer schema if not already present.""" conn = await asyncpg.connect(DATABASE_URL) - driver = AsyncpgDriver(conn) - - # Install pgqueuer schema (creates tables if they don't exist) - queries = Queries.from_asyncpg_connection(conn) try: - await queries.install() - logger.info("pgqueuer schema installed") - except Exception as e: - # Schema might already exist, which is fine - if "already exists" in str(e).lower(): - logger.info("pgqueuer schema already exists") + queries = Queries.from_asyncpg_connection(conn) + # Check if schema is already installed by looking for the main table + if not await queries.has_table("pgqueuer"): + await queries.install() + logger.info("pgqueuer schema installed") else: - raise - - # Initialize pgqueuer - pgq = PgQueuer(connection=driver) - - # Register job handlers using entrypoint decorator - @pgq.entrypoint("dummy") # type: ignore[type-var] - async def dummy_job_handler(payload: dict) -> None: - """Dummy job handler for testing.""" - logger.info(f"Processing dummy job with payload: {payload}") - - logger.info("Worker started, waiting for jobs...") - - # Start consuming jobs - try: - await pgq.run() - except KeyboardInterrupt: - logger.info("Worker shutting down...") + logger.info("pgqueuer schema already exists") finally: await conn.close() +def register_job_handlers(qm: QueueManager) -> None: + """Register all job handlers with the queue manager.""" + + @qm.entrypoint("random_number") + async def process_random_number(job: Job) -> None: + """Process a random number job (placeholder - just logs for now).""" + payload_str = job.payload.decode() if job.payload else "" + logger.info(f"Processing random_number job {job.id}: {payload_str}") + + +async def main() -> None: + """Main worker entry point.""" + logger.info("Installing pgqueuer schema...") + await install_schema() + + logger.info("Connecting to database...") + conn = await asyncpg.connect(DATABASE_URL) + + try: + driver = AsyncpgDriver(conn) + qm = QueueManager(driver) + + # Register job handlers + register_job_handlers(qm) + + logger.info("Worker started, waiting for jobs...") + await qm.run() + finally: + await conn.close() + logger.info("Worker stopped") + + if __name__ == "__main__": asyncio.run(main())