data-xexe/xexe/processes.py

154 lines
4.7 KiB
Python
Raw Permalink Normal View History

2024-06-06 11:50:01 +02:00
import logging
2024-06-06 12:56:36 +02:00
import os
2024-06-11 13:46:17 +02:00
import pathlib
2025-05-26 15:50:17 +02:00
from typing import List, Set, Union
2024-06-06 12:56:36 +02:00
2024-06-11 13:46:17 +02:00
from money.currency import Currency
2024-06-06 12:56:36 +02:00
from xecd_rates_client import XecdClient
2024-06-06 11:50:01 +02:00
2024-06-13 15:56:57 +02:00
from xexe.constants import RATES_SOURCES
2025-05-26 15:50:17 +02:00
from xexe.currency_pair import CurrencyPair
from xexe.exchange_rates import ExchangeRates, add_equal_rates, add_inverse_rates
2024-06-13 16:32:03 +02:00
from xexe.rate_fetching import build_rate_fetcher
from xexe.rate_writing import build_rate_writer
2025-05-26 16:41:49 +02:00
from xexe.utils import DateRange, generate_pairs_and_dates_combinations
2024-06-11 13:46:17 +02:00
2024-06-06 11:50:01 +02:00
logger = logging.getLogger()
2024-06-06 12:58:15 +02:00
def run_xe_healthcheck() -> None:
"""
Try to request the account data in xe.com's API.
If certain fields about the account are returned, it means the request was
successful.
"""
2024-06-06 12:56:36 +02:00
logger.info("Creating client.")
xecd = XecdClient(
account_id=os.environ["XE_ACCOUNT_ID"],
api_key=os.environ["XE_API_KEY"],
)
logger.info("Requesting account info.")
try:
account_info_response = xecd.account_info()
except Exception as e:
logger.error(
"There was an exception when trying to reach xe.com. See details below."
)
logger.error(e)
raise e
contains_good_response_fields = bool(
("id" in account_info_response.keys())
and ("organization" in account_info_response.keys())
and ("package" in account_info_response.keys()),
)
if not contains_good_response_fields:
2024-06-06 12:58:15 +02:00
logger.error("Didn't find the fields of a good response.")
2024-06-06 12:56:36 +02:00
raise ConnectionError("Response from xe.com is not successful.")
logger.info("xe.com reached successfully.")
logger.info("See response below.")
logger.info(account_info_response)
2024-06-06 17:39:20 +02:00
2024-06-13 16:32:03 +02:00
def run_dwh_healthcheck():
"""
Try to connect to the DWH and perform some pre-requisites checks.
If this doesn't pass, don't even try to fetch rates because you won't be
able to write them into the DWH.
"""
logger.info("Connecting to DWH and looking for schema.")
dwh_rate_writer = build_rate_writer(output="dwh")
logger.info("DWH reached successfully.")
2024-06-11 13:46:17 +02:00
def run_get_rates(
date_range: DateRange,
dry_run: bool,
2024-06-12 17:41:31 +02:00
rates_source: str,
2024-06-11 21:10:07 +02:00
ignore_warnings: bool,
2024-06-11 13:46:17 +02:00
output: pathlib.Path,
2025-05-26 15:50:17 +02:00
pairs: Union[Set[CurrencyPair], None] = None,
2024-06-11 13:46:17 +02:00
) -> None:
2024-06-06 17:39:20 +02:00
logger.info("Getting rates")
2024-06-11 13:52:31 +02:00
2024-06-13 15:56:57 +02:00
process_state = GetRatesProcessState(ignore_warnings=ignore_warnings)
2024-06-11 15:29:30 +02:00
2025-05-26 16:41:49 +02:00
currency_and_date_combinations = generate_pairs_and_dates_combinations(
date_range=date_range, pairs=pairs
)
2025-05-26 16:33:25 +02:00
2024-06-11 20:33:35 +02:00
rates = obtain_rates_from_source(
rates_source=rates_source,
2025-05-26 16:33:25 +02:00
currency_and_date_combinations=currency_and_date_combinations,
2024-06-13 15:56:57 +02:00
ignore_warnings=ignore_warnings,
2024-06-11 20:33:35 +02:00
)
2024-06-11 21:10:07 +02:00
logger.info("Rates obtained.")
if dry_run:
logger.info("Dry run mode active. Not writing rates to output.")
return
2024-06-13 15:56:57 +02:00
write_rates_to_output(rates, output)
2024-06-11 21:10:07 +02:00
logger.info("Rates written to output.")
2024-06-11 15:29:30 +02:00
2024-06-11 20:33:35 +02:00
def obtain_rates_from_source(
2024-06-13 15:56:57 +02:00
rates_source: str,
ignore_warnings: bool,
2025-05-26 16:33:25 +02:00
currency_and_date_combinations,
2024-06-11 20:33:35 +02:00
) -> ExchangeRates:
2024-06-13 15:56:57 +02:00
rates_fetcher = build_rate_fetcher(
rates_source=rates_source, rate_sources_mapping=RATES_SOURCES
)
2024-06-11 15:29:30 +02:00
2024-06-11 21:10:07 +02:00
large_api_call_planned = (
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
)
2024-06-13 15:56:57 +02:00
if large_api_call_planned and not ignore_warnings:
2024-06-11 21:10:07 +02:00
user_confirmation_string = "i understand"
user_response = input(
2024-06-12 00:10:14 +02:00
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward: "
2024-06-11 21:10:07 +02:00
)
2024-06-12 00:10:14 +02:00
2024-06-11 21:10:07 +02:00
if user_response != user_confirmation_string:
raise Exception("Execution aborted.")
logger.debug(
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
)
2024-06-11 20:33:35 +02:00
rates = ExchangeRates()
for combination in currency_and_date_combinations:
try:
rate = rates_fetcher.fetch_rate(
from_currency=combination["from_currency"],
to_currency=combination["to_currency"],
rate_date=combination["date"],
)
except Exception as e:
logger.error(f"Error while fetching rates.")
logger.error(e, exc_info=True)
raise ConnectionError(f"Could not fetch rates. See logs.")
rates.add_rate(rate)
rates = add_inverse_rates(rates)
rates = add_equal_rates(rates)
2024-06-11 20:33:35 +02:00
return rates
2024-06-13 15:56:57 +02:00
def write_rates_to_output(rates, output) -> None:
rates_writer = build_rate_writer(output)
2024-06-11 21:10:07 +02:00
logger.info("Attempting writing rates to output.")
2024-06-11 20:33:35 +02:00
rates_writer.write_rates(rates)
2024-06-11 15:29:30 +02:00
class GetRatesProcessState:
2024-06-13 15:56:57 +02:00
def __init__(self, ignore_warnings: bool) -> None:
2024-06-11 21:10:07 +02:00
self.ignore_warnings = ignore_warnings