arbret/backend/worker.py

63 lines
1.8 KiB
Python
Raw Normal View History

"""Worker process for processing async jobs using pgqueuer."""
import asyncio
import logging
import os
import asyncpg
from pgqueuer import AsyncpgDriver, PgQueuer, Queries
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get DATABASE_URL and convert from SQLAlchemy format (postgresql+asyncpg://)
# to asyncpg format (postgresql://)
_raw_db_url = os.getenv(
"DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret"
)
DATABASE_URL = _raw_db_url.replace("postgresql+asyncpg://", "postgresql://")
async def main() -> None:
"""Main worker loop."""
logger.info("Starting worker...")
# Connect to database independently
conn = await asyncpg.connect(DATABASE_URL)
driver = AsyncpgDriver(conn)
# Install pgqueuer schema (creates tables if they don't exist)
queries = Queries.from_asyncpg_connection(conn)
try:
await queries.install()
logger.info("pgqueuer schema installed")
except Exception as e:
# Schema might already exist, which is fine
if "already exists" in str(e).lower():
logger.info("pgqueuer schema already exists")
else:
raise
# Initialize pgqueuer
pgq = PgQueuer(connection=driver)
# Register job handlers using entrypoint decorator
@pgq.entrypoint("dummy") # type: ignore[type-var]
async def dummy_job_handler(payload: dict) -> None:
"""Dummy job handler for testing."""
logger.info(f"Processing dummy job with payload: {payload}")
logger.info("Worker started, waiting for jobs...")
# Start consuming jobs
try:
await pgq.run()
except KeyboardInterrupt:
logger.info("Worker shutting down...")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(main())