connect and verify everything is ready

This commit is contained in:
Pablo Martin 2024-06-12 18:20:55 +02:00
parent e1bcdb1309
commit 7867e73f10
2 changed files with 101 additions and 0 deletions

View file

@ -9,6 +9,9 @@ DEFAULT_CURRENCIES = {Currency("EUR"), Currency("GBP"), Currency("USD")}
RATES_SOURCES = {"mock": MockRateFetcher, "xe": XERateFetcher} RATES_SOURCES = {"mock": MockRateFetcher, "xe": XERateFetcher}
DWH_SCHEMA = "sync_xedotcom_currency_rates"
DWH_TABLE = "exchange_rates"
@dataclass @dataclass
class PATHS: class PATHS:

View file

@ -1,8 +1,13 @@
import csv import csv
import datetime import datetime
import os
import pathlib import pathlib
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import psycopg2
from psycopg2 import sql
from xexe.constants import DWH_SCHEMA, DWH_TABLE
from xexe.exchange_rates import ExchangeRates from xexe.exchange_rates import ExchangeRates
@ -36,3 +41,96 @@ class CSVRateWriter(RateWriter):
exported_at.isoformat(timespec="seconds"), exported_at.isoformat(timespec="seconds"),
] ]
) )
class DWHRateWriter(RateWriter):
def __init__(self) -> None:
super().__init__()
self.connection = self._create_connection()
self._verify_prerequisites()
@abstractmethod
def _create_connection(self):
host = os.environ["DWH_HOST"]
port = os.environ["DWH_PORT"]
database = os.environ["DWH_DB"]
user = os.environ["DWH_USER"]
password = os.environ["DWH_PASSWORD"]
connection = psycopg2.connect(
host=host, port=port, database=database, user=user, password=password
)
return connection
def _verify_prerequisites(self):
cursor = self.connection.cursor()
schema = DWH_SCHEMA
table = DWH_TABLE
try:
# Start a transaction
self.connection.autocommit = False
# Check if schema exists
cursor.execute(
sql.SQL(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s;"
),
[schema],
)
schema_exists = cursor.fetchone() is not None
if not schema_exists:
raise Exception(f"Schema '{schema}' does not exist.")
# Check if table exists
cursor.execute(
sql.SQL(
"SELECT table_name FROM information_schema.tables WHERE table_schema = %s AND table_name = %s;"
),
[schema, table],
)
table_exists = cursor.fetchone() is not None
if table_exists:
# Check if we can write to the table
try:
cursor.execute(
sql.SQL("SELECT 1 FROM {}.{} LIMIT 1 FOR UPDATE;").format(
sql.Identifier(schema), sql.Identifier(table)
)
)
except Exception as e:
raise Exception(
f"Cannot write to the existing table '{table}' in schema '{schema}': {e}"
)
else:
# Check if we can create a new table in the schema
try:
cursor.execute(
sql.SQL("CREATE TABLE {}.{} (id SERIAL PRIMARY KEY);").format(
sql.Identifier(schema), sql.Identifier(table)
)
)
cursor.execute(
sql.SQL("DROP TABLE {}.{};").format(
sql.Identifier(schema), sql.Identifier(table)
)
)
except Exception as e:
raise Exception(
f"Cannot create a new table '{table}' in schema '{schema}': {e}"
)
# Roll back the transaction to ensure no changes are persisted
self.connection.rollback()
except Exception as e:
# Roll back any changes if there was an exception
self.connection.rollback()
raise e
finally:
cursor.close()
self.connection.autocommit = True # Reset autocommit to its default state