arbret/backend/worker.py

74 lines
2.1 KiB
Python
Raw Normal View History

"""Background job worker using pgqueuer."""
import asyncio
import logging
import os
import asyncpg
from pgqueuer import Job, QueueManager
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("worker")
# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql://
_raw_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret"
)
DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://")
async def install_schema() -> None:
"""Install pgqueuer schema if not already present."""
conn = await asyncpg.connect(DATABASE_URL)
try:
queries = Queries.from_asyncpg_connection(conn)
# Check if schema is already installed by looking for the main table
if not await queries.has_table("pgqueuer"):
await queries.install()
logger.info("pgqueuer schema installed")
else:
logger.info("pgqueuer schema already exists")
finally:
await conn.close()
def register_job_handlers(qm: QueueManager) -> None:
"""Register all job handlers with the queue manager."""
@qm.entrypoint("random_number")
async def process_random_number(job: Job) -> None:
"""Process a random number job (placeholder - just logs for now)."""
payload_str = job.payload.decode() if job.payload else ""
logger.info(f"Processing random_number job {job.id}: {payload_str}")
async def main() -> None:
"""Main worker entry point."""
logger.info("Installing pgqueuer schema...")
await install_schema()
logger.info("Connecting to database...")
conn = await asyncpg.connect(DATABASE_URL)
try:
driver = AsyncpgDriver(conn)
qm = QueueManager(driver)
# Register job handlers
register_job_handlers(qm)
logger.info("Worker started, waiting for jobs...")
await qm.run()
finally:
await conn.close()
logger.info("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())