diff --git a/backend/database.py b/backend/database.py index 160863c..696b92e 100644 --- a/backend/database.py +++ b/backend/database.py @@ -7,6 +7,9 @@ DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret" ) +# asyncpg needs postgresql:// instead of postgresql+asyncpg:// +ASYNCPG_DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://") + engine = create_async_engine(DATABASE_URL) async_session = async_sessionmaker(engine, expire_on_commit=False) diff --git a/backend/jobs.py b/backend/jobs.py index fb787ed..3d2fe93 100644 --- a/backend/jobs.py +++ b/backend/jobs.py @@ -1,20 +1,15 @@ """Job definitions and enqueueing utilities using pgqueuer.""" import json -import os import asyncpg from pgqueuer.queries import Queries +from database import ASYNCPG_DATABASE_URL + # Job type constants JOB_RANDOM_NUMBER = "random_number" -# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql:// -_raw_url = os.getenv( - "DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret" -) -DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://") - # Connection pool for job enqueueing (lazy initialized) _pool: asyncpg.Pool | None = None @@ -23,7 +18,7 @@ async def get_job_pool() -> asyncpg.Pool: """Get or create the connection pool for job enqueueing.""" global _pool if _pool is None: - _pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5) + _pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5) return _pool diff --git a/backend/worker.py b/backend/worker.py index 34e6d7d..afa1646 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import os import random import time @@ -12,22 +11,18 @@ from pgqueuer import Job, QueueManager from pgqueuer.db import AsyncpgDriver from pgqueuer.queries import Queries +from database import ASYNCPG_DATABASE_URL + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("worker") -# SQLAlchemy uses postgresql+asyncpg://, but asyncpg needs postgresql:// -_raw_url = os.getenv( - "DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/arbret" -) -DATABASE_URL = _raw_url.replace("postgresql+asyncpg://", "postgresql://") - async def install_schema() -> None: """Install pgqueuer schema if not already present.""" - conn = await asyncpg.connect(DATABASE_URL) + conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) try: queries = Queries.from_asyncpg_connection(conn) # Check if schema is already installed by looking for the main table @@ -103,9 +98,9 @@ async def main() -> None: logger.info("Connecting to database...") # Connection for pgqueuer - queue_conn = await asyncpg.connect(DATABASE_URL) + queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) # Connection pool for application data - db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5) + db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5) try: driver = AsyncpgDriver(queue_conn)