Unlike JOB_RANDOM_NUMBER which is used for external job enqueueing, JOB_FETCH_BITCOIN_PRICE is only used internally by the scheduler. Move it to worker.py to clarify it's not part of the public job API.
59 lines
1.5 KiB
Python
59 lines
1.5 KiB
Python
"""Job definitions and enqueueing utilities using pgqueuer."""
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import asyncpg
|
|
from pgqueuer.queries import Queries
|
|
|
|
from database import ASYNCPG_DATABASE_URL
|
|
|
|
# Job type constants
|
|
JOB_RANDOM_NUMBER = "random_number"
|
|
|
|
# Connection pool for job enqueueing (lazy initialized)
|
|
_pool: asyncpg.Pool | None = None
|
|
_pool_lock = asyncio.Lock()
|
|
|
|
|
|
async def get_job_pool() -> asyncpg.Pool:
|
|
"""Get or create the connection pool for job enqueueing."""
|
|
global _pool
|
|
if _pool is not None:
|
|
return _pool
|
|
async with _pool_lock:
|
|
# Double-check after acquiring lock
|
|
if _pool is None:
|
|
_pool = await asyncpg.create_pool(
|
|
ASYNCPG_DATABASE_URL, min_size=1, max_size=5
|
|
)
|
|
return _pool
|
|
|
|
|
|
async def close_job_pool() -> None:
|
|
"""Close the connection pool. Call on app shutdown."""
|
|
global _pool
|
|
if _pool is not None:
|
|
await _pool.close()
|
|
_pool = None
|
|
|
|
|
|
async def enqueue_random_number_job(user_id: int) -> int:
|
|
"""
|
|
Enqueue a random number job for the given user.
|
|
|
|
Args:
|
|
user_id: The ID of the user who triggered the job.
|
|
|
|
Returns:
|
|
The job ID.
|
|
|
|
Raises:
|
|
Exception: If enqueueing fails.
|
|
"""
|
|
pool = await get_job_pool()
|
|
async with pool.acquire() as conn:
|
|
queries = Queries.from_asyncpg_connection(conn)
|
|
payload = json.dumps({"user_id": user_id}).encode()
|
|
job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload)
|
|
return job_ids[0]
|