Phase 1: Add pgqueuer infrastructure

- Add pgqueuer dependency to pyproject.toml
- Create worker.py with schema installation and job handler registration
- Add make worker command to Makefile
- Update make dev to run worker alongside backend/frontend
- Use has_table() check for idempotent schema installation
- Register 'random_number' job handler (placeholder that logs for now)
This commit is contained in:
counterweight 2025-12-21 22:37:04 +01:00
parent 15bae15731
commit 10c0316603
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
4 changed files with 55 additions and 44 deletions

View file

@ -11,4 +11,4 @@ Use the `TEST` variable to select specific tests:
- Backend: `make test-backend TEST="tests/test_booking.py"` or `TEST="tests/test_booking.py::TestClass::test_method"`
- Frontend: `make test-frontend TEST="app/login"` (file pattern)
- E2E: `make test-e2e TEST="auth"` (matches e2e/auth.spec.ts)
- Don't do `2>&1 | tail`. Let the output hit the console when running the tests.
- Don't use `tail`. Let the output hit the console when running the tests.

View file

@ -48,8 +48,8 @@ db-seed: db-ready
dev:
$(MAKE) db-seed
cd backend && uv run uvicorn main:app --reload & \
cd frontend && npm run dev & \
cd backend && uv run python worker.py & \
cd frontend && npm run dev & \
wait
# TEST variable can be used to select specific tests:

View file

@ -11,7 +11,7 @@ dependencies = [
"python-jose[cryptography]>=3.3.0",
"email-validator>=2.0.0",
"bech32>=1.2.0",
"pgqueuer>=0.1.0",
"pgqueuer>=0.14.0",
]
[dependency-groups]

View file

@ -1,62 +1,73 @@
"""Worker process for processing async jobs using pgqueuer."""
"""Background job worker using pgqueuer."""
import asyncio
import logging
import os
import asyncpg
from pgqueuer import AsyncpgDriver, PgQueuer, Queries
from pgqueuer import Job, QueueManager
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("worker")
# Get DATABASE_URL and convert from SQLAlchemy format (postgresql+asyncpg://)
# to asyncpg format (postgresql://)
_raw_db_url = os.getenv(
# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql://
_raw_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret"
)
DATABASE_URL = _raw_db_url.replace("postgresql+asyncpg://", "postgresql://")
DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://")
async def main() -> None:
"""Main worker loop."""
logger.info("Starting worker...")
# Connect to database independently
async def install_schema() -> None:
"""Install pgqueuer schema if not already present."""
conn = await asyncpg.connect(DATABASE_URL)
driver = AsyncpgDriver(conn)
# Install pgqueuer schema (creates tables if they don't exist)
queries = Queries.from_asyncpg_connection(conn)
try:
await queries.install()
logger.info("pgqueuer schema installed")
except Exception as e:
# Schema might already exist, which is fine
if "already exists" in str(e).lower():
logger.info("pgqueuer schema already exists")
queries = Queries.from_asyncpg_connection(conn)
# Check if schema is already installed by looking for the main table
if not await queries.has_table("pgqueuer"):
await queries.install()
logger.info("pgqueuer schema installed")
else:
raise
# Initialize pgqueuer
pgq = PgQueuer(connection=driver)
# Register job handlers using entrypoint decorator
@pgq.entrypoint("dummy") # type: ignore[type-var]
async def dummy_job_handler(payload: dict) -> None:
"""Dummy job handler for testing."""
logger.info(f"Processing dummy job with payload: {payload}")
logger.info("Worker started, waiting for jobs...")
# Start consuming jobs
try:
await pgq.run()
except KeyboardInterrupt:
logger.info("Worker shutting down...")
logger.info("pgqueuer schema already exists")
finally:
await conn.close()
def register_job_handlers(qm: QueueManager) -> None:
"""Register all job handlers with the queue manager."""
@qm.entrypoint("random_number")
async def process_random_number(job: Job) -> None:
"""Process a random number job (placeholder - just logs for now)."""
payload_str = job.payload.decode() if job.payload else ""
logger.info(f"Processing random_number job {job.id}: {payload_str}")
async def main() -> None:
"""Main worker entry point."""
logger.info("Installing pgqueuer schema...")
await install_schema()
logger.info("Connecting to database...")
conn = await asyncpg.connect(DATABASE_URL)
try:
driver = AsyncpgDriver(conn)
qm = QueueManager(driver)
# Register job handlers
register_job_handlers(qm)
logger.info("Worker started, waiting for jobs...")
await qm.run()
finally:
await conn.close()
logger.info("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())