160 lines
5 KiB
Python
160 lines
5 KiB
Python
import logging
|
|
import os
|
|
import pathlib
|
|
from typing import List
|
|
|
|
from money.currency import Currency
|
|
from xecd_rates_client import XecdClient
|
|
|
|
from xexe.exchange_rates import ExchangeRates, add_equal_rates, add_inverse_rates
|
|
from xexe.rate_fetching import MockRateFetcher, RateFetcher, XERateFetcher
|
|
from xexe.rate_writing import CSVRateWriter, RateWriter
|
|
from xexe.utils import DateRange, generate_currency_and_dates_combinations
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
def run_xe_healthcheck() -> None:
|
|
"""
|
|
Try to request the account data in xe.com's API.
|
|
|
|
If certain fields about the account are returned, it means the request was
|
|
successful.
|
|
|
|
"""
|
|
logger.info("Creating client.")
|
|
xecd = XecdClient(
|
|
account_id=os.environ["XE_ACCOUNT_ID"],
|
|
api_key=os.environ["XE_API_KEY"],
|
|
)
|
|
logger.info("Requesting account info.")
|
|
|
|
try:
|
|
account_info_response = xecd.account_info()
|
|
except Exception as e:
|
|
logger.error(
|
|
"There was an exception when trying to reach xe.com. See details below."
|
|
)
|
|
logger.error(e)
|
|
raise e
|
|
|
|
contains_good_response_fields = bool(
|
|
("id" in account_info_response.keys())
|
|
and ("organization" in account_info_response.keys())
|
|
and ("package" in account_info_response.keys()),
|
|
)
|
|
if not contains_good_response_fields:
|
|
logger.error("Didn't find the fields of a good response.")
|
|
raise ConnectionError("Response from xe.com is not successful.")
|
|
|
|
logger.info("xe.com reached successfully.")
|
|
logger.info("See response below.")
|
|
logger.info(account_info_response)
|
|
|
|
|
|
def run_get_rates(
|
|
date_range: DateRange,
|
|
currencies: List[Currency],
|
|
dry_run: bool,
|
|
rates_source: str,
|
|
ignore_warnings: bool,
|
|
output: pathlib.Path,
|
|
) -> None:
|
|
logger.info("Getting rates")
|
|
|
|
process_state = GetRatesProcessState(
|
|
output=output, dry_run=dry_run, ignore_warnings=ignore_warnings
|
|
)
|
|
|
|
rates = obtain_rates_from_source(
|
|
process_state,
|
|
date_range=date_range,
|
|
currencies=currencies,
|
|
)
|
|
logger.info("Rates obtained.")
|
|
write_rates_to_output(process_state, rates)
|
|
logger.info("Rates written to output.")
|
|
|
|
|
|
def obtain_rates_from_source(
|
|
process_state, date_range: DateRange, currencies: List[Currency]
|
|
) -> ExchangeRates:
|
|
rates_fetcher = process_state.get_fetcher()
|
|
|
|
currency_and_date_combinations = generate_currency_and_dates_combinations(
|
|
date_range=date_range, currencies=currencies
|
|
)
|
|
|
|
large_api_call_planned = (
|
|
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
|
|
)
|
|
if large_api_call_planned and not process_state.ignore_warnings:
|
|
user_confirmation_string = "i understand"
|
|
user_response = input(
|
|
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward: "
|
|
)
|
|
|
|
if user_response != user_confirmation_string:
|
|
raise Exception("Execution aborted.")
|
|
|
|
logger.debug(
|
|
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
|
|
)
|
|
|
|
rates = ExchangeRates()
|
|
for combination in currency_and_date_combinations:
|
|
try:
|
|
rate = rates_fetcher.fetch_rate(
|
|
from_currency=combination["from_currency"],
|
|
to_currency=combination["to_currency"],
|
|
rate_date=combination["date"],
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error while fetching rates.")
|
|
logger.error(e, exc_info=True)
|
|
raise ConnectionError(f"Could not fetch rates. See logs.")
|
|
|
|
rates.add_rate(rate)
|
|
|
|
rates = add_inverse_rates(rates)
|
|
rates = add_equal_rates(rates)
|
|
|
|
return rates
|
|
|
|
|
|
def write_rates_to_output(process_state, rates):
|
|
rates_writer = process_state.get_writer()
|
|
logger.info("Attempting writing rates to output.")
|
|
rates_writer.write_rates(rates)
|
|
|
|
|
|
class GetRatesProcessState:
|
|
def __init__(self, output: str, dry_run: bool, ignore_warnings: bool) -> None:
|
|
self.writer = self._select_writer(output)
|
|
self.fetcher = self._select_fetcher(dry_run)
|
|
self.ignore_warnings = ignore_warnings
|
|
|
|
@staticmethod
|
|
def _select_writer(output: str) -> CSVRateWriter:
|
|
output_is_csv_file_path = bool(pathlib.Path(output).suffix == ".csv")
|
|
|
|
if output_is_csv_file_path:
|
|
return CSVRateWriter(output_file_path=output)
|
|
|
|
raise ValueError(f"Don't know how to handle passed output: {output}")
|
|
|
|
@staticmethod
|
|
def _select_fetcher(dry_run: bool) -> RateFetcher:
|
|
if dry_run:
|
|
logger.info("Dry-run activated. Running against MockRateFetcher.")
|
|
return MockRateFetcher()
|
|
|
|
if not dry_run:
|
|
logger.info("Real run active. Running against XE.com's API.")
|
|
return XERateFetcher()
|
|
|
|
def get_fetcher(self) -> RateFetcher:
|
|
return self.fetcher
|
|
|
|
def get_writer(self) -> RateWriter:
|
|
return self.writer
|