From 6b572aa81b9e4ac2c1f87ce0dca0f6488a73d0ae Mon Sep 17 00:00:00 2001 From: counterweight Date: Sun, 21 Dec 2025 23:13:22 +0100 Subject: [PATCH] Use connection pool for job enqueueing instead of per-request - Added get_job_pool() for lazy pool initialization - Added close_job_pool() for graceful shutdown - Hooked pool shutdown into FastAPI lifespan - Reuses connections instead of creating new ones per enqueue --- backend/jobs.py | 25 +++++++++++++++++++++---- backend/main.py | 3 +++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/backend/jobs.py b/backend/jobs.py index e4c89bb..fb787ed 100644 --- a/backend/jobs.py +++ b/backend/jobs.py @@ -15,6 +15,25 @@ _raw_url = os.getenv( ) DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://") +# Connection pool for job enqueueing (lazy initialized) +_pool: asyncpg.Pool | None = None + + +async def get_job_pool() -> asyncpg.Pool: + """Get or create the connection pool for job enqueueing.""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5) + return _pool + + +async def close_job_pool() -> None: + """Close the connection pool. Call on app shutdown.""" + global _pool + if _pool is not None: + await _pool.close() + _pool = None + async def enqueue_random_number_job(user_id: int) -> int: """ @@ -29,11 +48,9 @@ async def enqueue_random_number_job(user_id: int) -> int: Raises: Exception: If enqueueing fails. """ - conn = await asyncpg.connect(DATABASE_URL) - try: + pool = await get_job_pool() + async with pool.acquire() as conn: queries = Queries.from_asyncpg_connection(conn) payload = json.dumps({"user_id": user_id}).encode() job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload) return job_ids[0] - finally: - await conn.close() diff --git a/backend/main.py b/backend/main.py index 989f32e..d942800 100644 --- a/backend/main.py +++ b/backend/main.py @@ -6,6 +6,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from database import Base, engine +from jobs import close_job_pool from routes import audit as audit_routes from routes import auth as auth_routes from routes import availability as availability_routes @@ -27,6 +28,8 @@ async def lifespan(app: FastAPI): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield + # Cleanup on shutdown + await close_job_pool() app = FastAPI(lifespan=lifespan)