2025-12-21 22:44:31 +01:00
|
|
|
"""Job definitions and enqueueing utilities using pgqueuer."""
|
|
|
|
|
|
2025-12-21 23:23:21 +01:00
|
|
|
import asyncio
|
2025-12-21 22:44:31 +01:00
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
import asyncpg
|
|
|
|
|
from pgqueuer.queries import Queries
|
|
|
|
|
|
2025-12-21 23:16:29 +01:00
|
|
|
from database import ASYNCPG_DATABASE_URL
|
|
|
|
|
|
2025-12-21 22:44:31 +01:00
|
|
|
# Job type constants
|
|
|
|
|
JOB_RANDOM_NUMBER = "random_number"
|
|
|
|
|
|
2025-12-21 23:13:22 +01:00
|
|
|
# Connection pool for job enqueueing (lazy initialized)
|
|
|
|
|
_pool: asyncpg.Pool | None = None
|
2025-12-21 23:23:21 +01:00
|
|
|
_pool_lock = asyncio.Lock()
|
2025-12-21 23:13:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_job_pool() -> asyncpg.Pool:
|
|
|
|
|
"""Get or create the connection pool for job enqueueing."""
|
|
|
|
|
global _pool
|
2025-12-21 23:23:21 +01:00
|
|
|
if _pool is not None:
|
|
|
|
|
return _pool
|
|
|
|
|
async with _pool_lock:
|
|
|
|
|
# Double-check after acquiring lock
|
|
|
|
|
if _pool is None:
|
|
|
|
|
_pool = await asyncpg.create_pool(
|
|
|
|
|
ASYNCPG_DATABASE_URL, min_size=1, max_size=5
|
|
|
|
|
)
|
|
|
|
|
return _pool
|
2025-12-21 23:13:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
async def close_job_pool() -> None:
|
|
|
|
|
"""Close the connection pool. Call on app shutdown."""
|
|
|
|
|
global _pool
|
|
|
|
|
if _pool is not None:
|
|
|
|
|
await _pool.close()
|
|
|
|
|
_pool = None
|
|
|
|
|
|
2025-12-21 22:44:31 +01:00
|
|
|
|
|
|
|
|
async def enqueue_random_number_job(user_id: int) -> int:
|
|
|
|
|
"""
|
|
|
|
|
Enqueue a random number job for the given user.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
user_id: The ID of the user who triggered the job.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
The job ID.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
Exception: If enqueueing fails.
|
|
|
|
|
"""
|
2025-12-21 23:13:22 +01:00
|
|
|
pool = await get_job_pool()
|
|
|
|
|
async with pool.acquire() as conn:
|
2025-12-21 22:44:31 +01:00
|
|
|
queries = Queries.from_asyncpg_connection(conn)
|
|
|
|
|
payload = json.dumps({"user_id": user_id}).encode()
|
|
|
|
|
job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload)
|
|
|
|
|
return job_ids[0]
|