Use connection pool for job enqueueing instead of per-request
- Added get_job_pool() for lazy pool initialization - Added close_job_pool() for graceful shutdown - Hooked pool shutdown into FastAPI lifespan - Reuses connections instead of creating new ones per enqueue
This commit is contained in:
parent
7ec987c78d
commit
6b572aa81b
2 changed files with 24 additions and 4 deletions
|
|
@ -15,6 +15,25 @@ _raw_url = os.getenv(
|
||||||
)
|
)
|
||||||
DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://")
|
DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://")
|
||||||
|
|
||||||
|
# Connection pool for job enqueueing (lazy initialized)
|
||||||
|
_pool: asyncpg.Pool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_job_pool() -> asyncpg.Pool:
|
||||||
|
"""Get or create the connection pool for job enqueueing."""
|
||||||
|
global _pool
|
||||||
|
if _pool is None:
|
||||||
|
_pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5)
|
||||||
|
return _pool
|
||||||
|
|
||||||
|
|
||||||
|
async def close_job_pool() -> None:
|
||||||
|
"""Close the connection pool. Call on app shutdown."""
|
||||||
|
global _pool
|
||||||
|
if _pool is not None:
|
||||||
|
await _pool.close()
|
||||||
|
_pool = None
|
||||||
|
|
||||||
|
|
||||||
async def enqueue_random_number_job(user_id: int) -> int:
|
async def enqueue_random_number_job(user_id: int) -> int:
|
||||||
"""
|
"""
|
||||||
|
|
@ -29,11 +48,9 @@ async def enqueue_random_number_job(user_id: int) -> int:
|
||||||
Raises:
|
Raises:
|
||||||
Exception: If enqueueing fails.
|
Exception: If enqueueing fails.
|
||||||
"""
|
"""
|
||||||
conn = await asyncpg.connect(DATABASE_URL)
|
pool = await get_job_pool()
|
||||||
try:
|
async with pool.acquire() as conn:
|
||||||
queries = Queries.from_asyncpg_connection(conn)
|
queries = Queries.from_asyncpg_connection(conn)
|
||||||
payload = json.dumps({"user_id": user_id}).encode()
|
payload = json.dumps({"user_id": user_id}).encode()
|
||||||
job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload)
|
job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload)
|
||||||
return job_ids[0]
|
return job_ids[0]
|
||||||
finally:
|
|
||||||
await conn.close()
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ from fastapi import FastAPI
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from database import Base, engine
|
from database import Base, engine
|
||||||
|
from jobs import close_job_pool
|
||||||
from routes import audit as audit_routes
|
from routes import audit as audit_routes
|
||||||
from routes import auth as auth_routes
|
from routes import auth as auth_routes
|
||||||
from routes import availability as availability_routes
|
from routes import availability as availability_routes
|
||||||
|
|
@ -27,6 +28,8 @@ async def lifespan(app: FastAPI):
|
||||||
async with engine.begin() as conn:
|
async with engine.begin() as conn:
|
||||||
await conn.run_sync(Base.metadata.create_all)
|
await conn.run_sync(Base.metadata.create_all)
|
||||||
yield
|
yield
|
||||||
|
# Cleanup on shutdown
|
||||||
|
await close_job_pool()
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(lifespan=lifespan)
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue