data-xexe/xexe/processes.py
2024-06-12 17:59:34 +02:00

154 lines
4.8 KiB
Python

import logging
import os
import pathlib
from typing import List
from money.currency import Currency
from xecd_rates_client import XecdClient
from xexe.constants import RATES_SOURCES
from xexe.exchange_rates import ExchangeRates, add_equal_rates, add_inverse_rates
from xexe.rate_fetching import MockRateFetcher, RateFetcher, XERateFetcher
from xexe.rate_writing import CSVRateWriter, RateWriter
from xexe.utils import DateRange, generate_currency_and_dates_combinations
logger = logging.getLogger()
def run_xe_healthcheck() -> None:
"""
Try to request the account data in xe.com's API.
If certain fields about the account are returned, it means the request was
successful.
"""
logger.info("Creating client.")
xecd = XecdClient(
account_id=os.environ["XE_ACCOUNT_ID"],
api_key=os.environ["XE_API_KEY"],
)
logger.info("Requesting account info.")
try:
account_info_response = xecd.account_info()
except Exception as e:
logger.error(
"There was an exception when trying to reach xe.com. See details below."
)
logger.error(e)
raise e
contains_good_response_fields = bool(
("id" in account_info_response.keys())
and ("organization" in account_info_response.keys())
and ("package" in account_info_response.keys()),
)
if not contains_good_response_fields:
logger.error("Didn't find the fields of a good response.")
raise ConnectionError("Response from xe.com is not successful.")
logger.info("xe.com reached successfully.")
logger.info("See response below.")
logger.info(account_info_response)
def run_get_rates(
date_range: DateRange,
currencies: List[Currency],
dry_run: bool,
rates_source: str,
ignore_warnings: bool,
output: pathlib.Path,
) -> None:
logger.info("Getting rates")
process_state = GetRatesProcessState(output=output, ignore_warnings=ignore_warnings)
rates = obtain_rates_from_source(
process_state,
rates_source=rates_source,
date_range=date_range,
currencies=currencies,
)
logger.info("Rates obtained.")
if dry_run:
logger.info("Dry run mode active. Not writing rates to output.")
return
write_rates_to_output(process_state, rates)
logger.info("Rates written to output.")
def obtain_rates_from_source(
process_state, rates_source: str, date_range: DateRange, currencies: List[Currency]
) -> ExchangeRates:
rates_fetcher = build_rate_fetcher(rates_source)
currency_and_date_combinations = generate_currency_and_dates_combinations(
date_range=date_range, currencies=currencies
)
large_api_call_planned = (
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
)
if large_api_call_planned and not process_state.ignore_warnings:
user_confirmation_string = "i understand"
user_response = input(
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward: "
)
if user_response != user_confirmation_string:
raise Exception("Execution aborted.")
logger.debug(
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
)
rates = ExchangeRates()
for combination in currency_and_date_combinations:
try:
rate = rates_fetcher.fetch_rate(
from_currency=combination["from_currency"],
to_currency=combination["to_currency"],
rate_date=combination["date"],
)
except Exception as e:
logger.error(f"Error while fetching rates.")
logger.error(e, exc_info=True)
raise ConnectionError(f"Could not fetch rates. See logs.")
rates.add_rate(rate)
rates = add_inverse_rates(rates)
rates = add_equal_rates(rates)
return rates
def write_rates_to_output(process_state, rates):
rates_writer = process_state.get_writer()
logger.info("Attempting writing rates to output.")
rates_writer.write_rates(rates)
def build_rate_fetcher(rates_source: str):
return RATES_SOURCES[rates_source]()
class GetRatesProcessState:
def __init__(self, output: str, ignore_warnings: bool) -> None:
self.writer = self._select_writer(output)
self.ignore_warnings = ignore_warnings
@staticmethod
def _select_writer(output: str) -> CSVRateWriter:
output_is_csv_file_path = bool(pathlib.Path(output).suffix == ".csv")
if output_is_csv_file_path:
return CSVRateWriter(output_file_path=output)
raise ValueError(f"Don't know how to handle passed output: {output}")
def get_writer(self) -> RateWriter:
return self.writer