168 lines
5.1 KiB
Python
168 lines
5.1 KiB
Python
import logging
|
|
import os
|
|
import pathlib
|
|
from typing import List, Set, Union
|
|
|
|
from money.currency import Currency
|
|
from xecd_rates_client import XecdClient
|
|
|
|
from xexe.constants import RATES_SOURCES
|
|
from xexe.currency_pair import CurrencyPair
|
|
from xexe.exchange_rates import ExchangeRates, add_equal_rates, add_inverse_rates
|
|
from xexe.rate_fetching import build_rate_fetcher
|
|
from xexe.rate_writing import build_rate_writer
|
|
from xexe.utils import (
|
|
DateRange,
|
|
generate_currency_and_dates_combinations,
|
|
generate_pairs_and_dates_combinations,
|
|
)
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
def run_xe_healthcheck() -> None:
|
|
"""
|
|
Try to request the account data in xe.com's API.
|
|
|
|
If certain fields about the account are returned, it means the request was
|
|
successful.
|
|
|
|
"""
|
|
logger.info("Creating client.")
|
|
xecd = XecdClient(
|
|
account_id=os.environ["XE_ACCOUNT_ID"],
|
|
api_key=os.environ["XE_API_KEY"],
|
|
)
|
|
logger.info("Requesting account info.")
|
|
|
|
try:
|
|
account_info_response = xecd.account_info()
|
|
except Exception as e:
|
|
logger.error(
|
|
"There was an exception when trying to reach xe.com. See details below."
|
|
)
|
|
logger.error(e)
|
|
raise e
|
|
|
|
contains_good_response_fields = bool(
|
|
("id" in account_info_response.keys())
|
|
and ("organization" in account_info_response.keys())
|
|
and ("package" in account_info_response.keys()),
|
|
)
|
|
if not contains_good_response_fields:
|
|
logger.error("Didn't find the fields of a good response.")
|
|
raise ConnectionError("Response from xe.com is not successful.")
|
|
|
|
logger.info("xe.com reached successfully.")
|
|
logger.info("See response below.")
|
|
logger.info(account_info_response)
|
|
|
|
|
|
def run_dwh_healthcheck():
|
|
"""
|
|
Try to connect to the DWH and perform some pre-requisites checks.
|
|
|
|
If this doesn't pass, don't even try to fetch rates because you won't be
|
|
able to write them into the DWH.
|
|
"""
|
|
logger.info("Connecting to DWH and looking for schema.")
|
|
dwh_rate_writer = build_rate_writer(output="dwh")
|
|
logger.info("DWH reached successfully.")
|
|
|
|
|
|
def run_get_rates(
|
|
date_range: DateRange,
|
|
dry_run: bool,
|
|
rates_source: str,
|
|
ignore_warnings: bool,
|
|
output: pathlib.Path,
|
|
currencies: Union[Set[Currency], None] = None,
|
|
pairs: Union[Set[CurrencyPair], None] = None,
|
|
) -> None:
|
|
logger.info("Getting rates")
|
|
|
|
process_state = GetRatesProcessState(ignore_warnings=ignore_warnings)
|
|
|
|
rates = obtain_rates_from_source(
|
|
rates_source=rates_source,
|
|
date_range=date_range,
|
|
pairs=pairs,
|
|
currencies=currencies,
|
|
ignore_warnings=ignore_warnings,
|
|
)
|
|
logger.info("Rates obtained.")
|
|
|
|
if dry_run:
|
|
logger.info("Dry run mode active. Not writing rates to output.")
|
|
return
|
|
write_rates_to_output(rates, output)
|
|
logger.info("Rates written to output.")
|
|
|
|
|
|
def obtain_rates_from_source(
|
|
rates_source: str,
|
|
date_range: DateRange,
|
|
ignore_warnings: bool,
|
|
currencies: Union[Set[Currency], None] = None,
|
|
pairs: Union[Set[CurrencyPair], None] = None,
|
|
) -> ExchangeRates:
|
|
rates_fetcher = build_rate_fetcher(
|
|
rates_source=rates_source, rate_sources_mapping=RATES_SOURCES
|
|
)
|
|
|
|
if currencies:
|
|
currency_and_date_combinations = generate_currency_and_dates_combinations(
|
|
date_range=date_range, currencies=currencies
|
|
)
|
|
|
|
if pairs:
|
|
currency_and_date_combinations = generate_pairs_and_dates_combinations(
|
|
date_range=date_range, pairs=pairs
|
|
)
|
|
|
|
large_api_call_planned = (
|
|
rates_fetcher.is_production_grade and len(currency_and_date_combinations) > 100
|
|
)
|
|
if large_api_call_planned and not ignore_warnings:
|
|
user_confirmation_string = "i understand"
|
|
user_response = input(
|
|
f"WARNING: you are about to execute a large call {len(currency_and_date_combinations)} to a metered API. Type '{user_confirmation_string}' to move forward: "
|
|
)
|
|
|
|
if user_response != user_confirmation_string:
|
|
raise Exception("Execution aborted.")
|
|
|
|
logger.debug(
|
|
f"We are looking for the following rate combinations: {currency_and_date_combinations}"
|
|
)
|
|
|
|
rates = ExchangeRates()
|
|
for combination in currency_and_date_combinations:
|
|
try:
|
|
rate = rates_fetcher.fetch_rate(
|
|
from_currency=combination["from_currency"],
|
|
to_currency=combination["to_currency"],
|
|
rate_date=combination["date"],
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error while fetching rates.")
|
|
logger.error(e, exc_info=True)
|
|
raise ConnectionError(f"Could not fetch rates. See logs.")
|
|
|
|
rates.add_rate(rate)
|
|
|
|
rates = add_inverse_rates(rates)
|
|
rates = add_equal_rates(rates)
|
|
|
|
return rates
|
|
|
|
|
|
def write_rates_to_output(rates, output) -> None:
|
|
rates_writer = build_rate_writer(output)
|
|
logger.info("Attempting writing rates to output.")
|
|
rates_writer.write_rates(rates)
|
|
|
|
|
|
class GetRatesProcessState:
|
|
def __init__(self, ignore_warnings: bool) -> None:
|
|
self.ignore_warnings = ignore_warnings
|