From 7867e73f108022f7968decc4bec00b6cf90fa582 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 12 Jun 2024 18:20:55 +0200 Subject: [PATCH] connect and verify everything is ready --- xexe/constants.py | 3 ++ xexe/rate_writing.py | 98 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/xexe/constants.py b/xexe/constants.py index 7066779..c7ec3d4 100644 --- a/xexe/constants.py +++ b/xexe/constants.py @@ -9,6 +9,9 @@ DEFAULT_CURRENCIES = {Currency("EUR"), Currency("GBP"), Currency("USD")} RATES_SOURCES = {"mock": MockRateFetcher, "xe": XERateFetcher} +DWH_SCHEMA = "sync_xedotcom_currency_rates" +DWH_TABLE = "exchange_rates" + @dataclass class PATHS: diff --git a/xexe/rate_writing.py b/xexe/rate_writing.py index 8996a3b..5589b68 100644 --- a/xexe/rate_writing.py +++ b/xexe/rate_writing.py @@ -1,8 +1,13 @@ import csv import datetime +import os import pathlib from abc import ABC, abstractmethod +import psycopg2 +from psycopg2 import sql + +from xexe.constants import DWH_SCHEMA, DWH_TABLE from xexe.exchange_rates import ExchangeRates @@ -36,3 +41,96 @@ class CSVRateWriter(RateWriter): exported_at.isoformat(timespec="seconds"), ] ) + + +class DWHRateWriter(RateWriter): + + def __init__(self) -> None: + super().__init__() + self.connection = self._create_connection() + self._verify_prerequisites() + + @abstractmethod + def _create_connection(self): + host = os.environ["DWH_HOST"] + port = os.environ["DWH_PORT"] + database = os.environ["DWH_DB"] + user = os.environ["DWH_USER"] + password = os.environ["DWH_PASSWORD"] + + connection = psycopg2.connect( + host=host, port=port, database=database, user=user, password=password + ) + return connection + + def _verify_prerequisites(self): + cursor = self.connection.cursor() + schema = DWH_SCHEMA + table = DWH_TABLE + + try: + # Start a transaction + self.connection.autocommit = False + + # Check if schema exists + cursor.execute( + sql.SQL( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s;" + ), + [schema], + ) + schema_exists = cursor.fetchone() is not None + + if not schema_exists: + raise Exception(f"Schema '{schema}' does not exist.") + + # Check if table exists + cursor.execute( + sql.SQL( + "SELECT table_name FROM information_schema.tables WHERE table_schema = %s AND table_name = %s;" + ), + [schema, table], + ) + table_exists = cursor.fetchone() is not None + + if table_exists: + # Check if we can write to the table + try: + cursor.execute( + sql.SQL("SELECT 1 FROM {}.{} LIMIT 1 FOR UPDATE;").format( + sql.Identifier(schema), sql.Identifier(table) + ) + ) + except Exception as e: + raise Exception( + f"Cannot write to the existing table '{table}' in schema '{schema}': {e}" + ) + else: + # Check if we can create a new table in the schema + try: + cursor.execute( + sql.SQL("CREATE TABLE {}.{} (id SERIAL PRIMARY KEY);").format( + sql.Identifier(schema), sql.Identifier(table) + ) + ) + cursor.execute( + sql.SQL("DROP TABLE {}.{};").format( + sql.Identifier(schema), sql.Identifier(table) + ) + ) + except Exception as e: + raise Exception( + f"Cannot create a new table '{table}' in schema '{schema}': {e}" + ) + + # Roll back the transaction to ensure no changes are persisted + self.connection.rollback() + + except Exception as e: + # Roll back any changes if there was an exception + self.connection.rollback() + raise e + + finally: + cursor.close() + self.connection.autocommit = True # Reset autocommit to its default state