From 6e8b0a1c58f050d5a39036da02197abca076d19b Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 13 Jun 2024 16:26:59 +0200 Subject: [PATCH] db rate writing implemented --- xexe/rate_writing.py | 66 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/xexe/rate_writing.py b/xexe/rate_writing.py index a2ac25e..29f57a3 100644 --- a/xexe/rate_writing.py +++ b/xexe/rate_writing.py @@ -53,8 +53,8 @@ class DWHRateWriter(RateWriter): self.connection = self._create_connection() self._verify_prerequisites() - @abstractmethod - def _create_connection(self): + @staticmethod + def _create_connection(): host = os.environ["DWH_HOST"] port = os.environ["DWH_PORT"] database = os.environ["DWH_DB"] @@ -134,6 +134,68 @@ class DWHRateWriter(RateWriter): cursor.close() self.connection.autocommit = True # Reset autocommit to its default state + def _create_rates_table_if_not_exists(self): + cursor = self.connection.cursor() + + create_table_query = sql.SQL( + """ + CREATE TABLE IF NOT EXISTS {}.{} ( + from_currency CHAR(3) NOT NULL, + to_currency CHAR(3) NOT NULL, + rate DECIMAL(19, 4) NOT NULL, + rate_date_utc DATE NOT NULL, + exported_at_utc TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (from_currency, to_currency, rate_date_utc) + ); + """ + ).format(sql.Identifier(DWH_SCHEMA), sql.Identifier(DWH_TABLE)) + + try: + cursor.execute(create_table_query) + self.connection.commit() + except Exception as e: + self.connection.rollback() + raise Exception( + f"Failed to create table '{DWH_TABLE}' in schema '{DWH_SCHEMA}': {e}" + ) + finally: + cursor.close() + + def write_rates(self, rates: ExchangeRates) -> None: + self._create_rates_table_if_not_exists() + + cursor = self.connection.cursor() + insert_query = sql.SQL( + """ + INSERT INTO {}.{} (from_currency, to_currency, rate, rate_date_utc, exported_at_utc) + VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP) + ON CONFLICT (from_currency, to_currency, rate_date_utc) + DO UPDATE SET rate = EXCLUDED.rate, exported_at_utc = EXCLUDED.exported_at_utc; + """ + ).format(sql.Identifier(DWH_SCHEMA), sql.Identifier(DWH_TABLE)) + + self.connection.autocommit = False + try: + for rate in rates: + cursor.execute( + insert_query, + ( + rate.from_currency.value, + rate.to_currency.value, + rate.amount, + rate.rate_date, + ), + ) + self.connection.commit() + except Exception as e: + self.connection.rollback() + raise Exception( + f"Failed to write rates to table '{DWH_TABLE}' in schema '{DWH_SCHEMA}': {e}" + ) + finally: + cursor.close() + self.connection.autocommit = True + def build_rate_writer(output: str) -> RateWriter: output_is_csv_file_path = bool(pathlib.Path(output).suffix == ".csv")