40 lines
1 KiB
Python
40 lines
1 KiB
Python
|
|
"""Job definitions and enqueueing utilities using pgqueuer."""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
|
||
|
|
import asyncpg
|
||
|
|
from pgqueuer.queries import Queries
|
||
|
|
|
||
|
|
# Job type constants
|
||
|
|
JOB_RANDOM_NUMBER = "random_number"
|
||
|
|
|
||
|
|
# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql://
|
||
|
|
_raw_url = os.getenv(
|
||
|
|
"DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret"
|
||
|
|
)
|
||
|
|
DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://")
|
||
|
|
|
||
|
|
|
||
|
|
async def enqueue_random_number_job(user_id: int) -> int:
|
||
|
|
"""
|
||
|
|
Enqueue a random number job for the given user.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
user_id: The ID of the user who triggered the job.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
The job ID.
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
Exception: If enqueueing fails.
|
||
|
|
"""
|
||
|
|
conn = await asyncpg.connect(DATABASE_URL)
|
||
|
|
try:
|
||
|
|
queries = Queries.from_asyncpg_connection(conn)
|
||
|
|
payload = json.dumps({"user_id": user_id}).encode()
|
||
|
|
job_ids = await queries.enqueue(JOB_RANDOM_NUMBER, payload)
|
||
|
|
return job_ids[0]
|
||
|
|
finally:
|
||
|
|
await conn.close()
|