data-xexe/xexe/processes.py
2025-05-26 16:41:49 +02:00

163 lines
5 KiB
Python

import logging
import os
import pathlib
from itertools import combinations
from typing import List, Set, Union
from money.currency import Currency
from xecd_rates_client import XecdClient
from xexe.constants import RATES_SOURCES
from xexe.currency_pair import CurrencyPair
from xexe.exchange_rates import ExchangeRates, add_equal_rates, add_inverse_rates
from xexe.rate_fetching import build_rate_fetcher
from xexe.rate_writing import build_rate_writer
from xexe.utils import DateRange, generate_pairs_and_dates_combinations
logger = logging.getLogger()
def run_xe_healthcheck() -> None:
"""
Try to request the account data in xe.com's API.
If certain fields about the account are returned, it means the request was
successful.
"""
logger.info("Creating client.")
xecd = XecdClient(
account_id=os.environ["XE_ACCOUNT_ID"],
api_key=os.environ["XE_API_KEY"],
)
logger.info("Requesting account info.")
try:
account_info_response = xecd.account_info()
except Exception as e:
logger.error(
"There was an exception when trying to reach xe.com. See details below."
)
logger.error(e)
raise e
contains_good_response_fields = bool(
("id" in account_info_response.keys())
and ("organization" in account_info_response.keys())
and ("package" in account_info_response.keys()),
)
if not contains_good_response_fields:
logger.error("Didn't find the fields of a good response.")
raise ConnectionError("Response from xe.com is not successful.")
logger.info("xe.com reached successfully.")
logger.info("See response below.")
logger.info(account_info_response)
def run_dwh_healthcheck():
"""
Try to connect to the DWH and perform some pre-requisites checks.
If this doesn't pass, don't even try to fetch rates because you won't be
able to write them into the DWH.
"""
logger.info("Connecting to DWH and looking for schema.")
dwh_rate_writer = build_rate_writer(output="dwh")
logger.info("DWH reached successfully.")
def run_get_rates(
date_range: DateRange,
dry_run: bool,
rates_source: str,
ignore_warnings: bool,
output: pathlib.Path,
currencies: Union[Set[Currency], None] = None,
pairs: Union[Set[CurrencyPair], None] = None,
) -> None:
logger.info("Getting rates")
process_state = GetRatesProcessState(ignore_warnings=ignore_warnings)
if currencies:
pairs = list(combinations(currencies, 2))
pairs = [
CurrencyPair(from_currency=pair[0], to_currency=pair[1]) for pair in pairs
]
currency_and_date_combinations = generate_pairs_and_dates_combinations(
date_range=date_range, pairs=pairs
)
rates = obtain_rates_from_source(
rates_source=rates_source,
date_range=date_range,
currency_and_date_combinations=currency_and_date_combinations,
ignore_warnings=ignore_warnings,
)
logger.info("Rates obtained.")
if dry_run:
logger.info("Dry run mode active. Not writing rates to output.")
return
write_rates_to_output(rates, output)
logger.info("Rates written to output.")
def obtain_rates_from_source(
rates_source: str,
date_range: DateRange,
ignore_warnings: bool,
currency_and_date_combinations,
) -> ExchangeRates:
rates_fetcher = build_rate_fetcher(
rates_source=rates_source, rate_sources_mapping=RATES_SOURCES
)
large_api_call_planned = (
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
)
if large_api_call_planned and not ignore_warnings:
user_confirmation_string = "i understand"
user_response = input(
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward: "
)
if user_response != user_confirmation_string:
raise Exception("Execution aborted.")
logger.debug(
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
)
rates = ExchangeRates()
for combination in currency_and_date_combinations:
try:
rate = rates_fetcher.fetch_rate(
from_currency=combination["from_currency"],
to_currency=combination["to_currency"],
rate_date=combination["date"],
)
except Exception as e:
logger.error(f"Error while fetching rates.")
logger.error(e, exc_info=True)
raise ConnectionError(f"Could not fetch rates. See logs.")
rates.add_rate(rate)
rates = add_inverse_rates(rates)
rates = add_equal_rates(rates)
return rates
def write_rates_to_output(rates, output) -> None:
rates_writer = build_rate_writer(output)
logger.info("Attempting writing rates to output.")
rates_writer.write_rates(rates)
class GetRatesProcessState:
def __init__(self, ignore_warnings: bool) -> None:
self.ignore_warnings = ignore_warnings