data-xexe/xexe/processes.py

156 lines
4.9 KiB
Python
Raw Normal View History

2024-06-06 11:50:01 +02:00
import logging
2024-06-06 12:56:36 +02:00
import os
2024-06-11 13:46:17 +02:00
import pathlib
from typing import List
2024-06-06 12:56:36 +02:00
2024-06-11 13:46:17 +02:00
from money.currency import Currency
2024-06-06 12:56:36 +02:00
from xecd_rates_client import XecdClient
2024-06-06 11:50:01 +02:00
2024-06-11 20:33:35 +02:00
from xexe.exchange_rates import ExchangeRates
from xexe.rate_fetching import MockRateFetcher, RateFetcher, XERateFetcher
from xexe.rate_writing import CSVRateWriter, RateWriter
from xexe.utils import DateRange, generate_currency_and_dates_combinations
2024-06-11 13:46:17 +02:00
2024-06-06 11:50:01 +02:00
logger = logging.getLogger()
2024-06-06 12:58:15 +02:00
def run_xe_healthcheck() -> None:
"""
Try to request the account data in xe.com's API.
If certain fields about the account are returned, it means the request was
successful.
"""
2024-06-06 12:56:36 +02:00
logger.info("Creating client.")
xecd = XecdClient(
account_id=os.environ["XE_ACCOUNT_ID"],
api_key=os.environ["XE_API_KEY"],
)
logger.info("Requesting account info.")
try:
account_info_response = xecd.account_info()
except Exception as e:
logger.error(
"There was an exception when trying to reach xe.com. See details below."
)
logger.error(e)
raise e
contains_good_response_fields = bool(
("id" in account_info_response.keys())
and ("organization" in account_info_response.keys())
and ("package" in account_info_response.keys()),
)
if not contains_good_response_fields:
2024-06-06 12:58:15 +02:00
logger.error("Didn't find the fields of a good response.")
2024-06-06 12:56:36 +02:00
raise ConnectionError("Response from xe.com is not successful.")
logger.info("xe.com reached successfully.")
logger.info("See response below.")
logger.info(account_info_response)
2024-06-06 17:39:20 +02:00
2024-06-11 13:46:17 +02:00
def run_get_rates(
date_range: DateRange,
currencies: List[Currency],
dry_run: bool,
2024-06-11 21:10:07 +02:00
ignore_warnings: bool,
2024-06-11 13:46:17 +02:00
output: pathlib.Path,
) -> None:
2024-06-06 17:39:20 +02:00
logger.info("Getting rates")
2024-06-11 13:52:31 +02:00
2024-06-11 21:10:07 +02:00
process_state = GetRatesProcessState(
output=output, dry_run=dry_run, ignore_warnings=ignore_warnings
)
2024-06-11 15:29:30 +02:00
2024-06-11 20:33:35 +02:00
rates = obtain_rates_from_source(
2024-06-11 21:10:07 +02:00
process_state,
date_range=date_range,
currencies=currencies,
2024-06-11 20:33:35 +02:00
)
2024-06-11 21:10:07 +02:00
logger.info("Rates obtained.")
2024-06-11 20:33:35 +02:00
write_rates_to_output(process_state, rates)
2024-06-11 21:10:07 +02:00
logger.info("Rates written to output.")
2024-06-11 15:29:30 +02:00
2024-06-11 20:33:35 +02:00
def obtain_rates_from_source(
process_state, date_range: DateRange, currencies: List[Currency]
) -> ExchangeRates:
rates_fetcher = process_state.get_fetcher()
2024-06-11 15:29:30 +02:00
2024-06-11 20:33:35 +02:00
currency_and_date_combinations = generate_currency_and_dates_combinations(
date_range=date_range, currencies=currencies
)
2024-06-11 21:10:07 +02:00
large_api_call_planned = (
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
)
if large_api_call_planned and not process_state.ignore_warnings:
user_confirmation_string = "i understand"
user_response = input(
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward."
)
if user_response != user_confirmation_string:
raise Exception("Execution aborted.")
logger.debug(
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
)
2024-06-11 20:33:35 +02:00
rates = ExchangeRates()
for combination in currency_and_date_combinations:
try:
rate = rates_fetcher.fetch_rate(
from_currency=combination["from_currency"],
to_currency=combination["to_currency"],
rate_date=combination["date"],
)
except Exception as e:
logger.error(f"Error while fetching rates.")
logger.error(e, exc_info=True)
raise ConnectionError(f"Could not fetch rates. See logs.")
rates.add_rate(rate)
return rates
def write_rates_to_output(process_state, rates):
rates_writer = process_state.get_writer()
2024-06-11 21:10:07 +02:00
logger.info("Attempting writing rates to output.")
2024-06-11 20:33:35 +02:00
rates_writer.write_rates(rates)
2024-06-11 15:29:30 +02:00
class GetRatesProcessState:
2024-06-11 21:10:07 +02:00
def __init__(self, output: str, dry_run: bool, ignore_warnings: bool) -> None:
2024-06-11 20:33:35 +02:00
self.writer = self._select_writer(output)
self.fetcher = self._select_fetcher(dry_run)
2024-06-11 21:10:07 +02:00
self.ignore_warnings = ignore_warnings
2024-06-11 15:29:30 +02:00
@staticmethod
2024-06-11 20:33:35 +02:00
def _select_writer(output: str) -> CSVRateWriter:
2024-06-11 15:29:30 +02:00
output_is_csv_file_path = bool(pathlib.Path(output).suffix == ".csv")
if output_is_csv_file_path:
2024-06-11 20:33:35 +02:00
return CSVRateWriter(output_file_path=output)
2024-06-11 15:29:30 +02:00
raise ValueError(f"Don't know how to handle passed output: {output}")
@staticmethod
2024-06-11 21:10:07 +02:00
def _select_fetcher(dry_run: bool) -> RateFetcher:
2024-06-11 15:29:30 +02:00
if dry_run:
2024-06-11 21:10:07 +02:00
logger.info("Dry-run activated. Running against MockRateFetcher.")
return MockRateFetcher()
2024-06-11 15:29:30 +02:00
if not dry_run:
2024-06-11 21:10:07 +02:00
logger.info("Real run active. Running against XE.com's API.")
return XERateFetcher()
2024-06-11 20:33:35 +02:00
def get_fetcher(self) -> RateFetcher:
return self.fetcher
def get_writer(self) -> RateWriter:
return self.writer