camisatoshi-wordpress-reports/camisatoshi_wordpress_reports/controllers.py

194 lines
6.1 KiB
Python

from functools import partial
from pathlib import Path
import datetime
import logging
import csv
from dotenv import dotenv_values
from woocommerce import API
from camisatoshi_wordpress_reports.order import Order, Orders
from camisatoshi_wordpress_reports.constants import (
ORDER_KEYS,
UM_FIRST_AGREEMENT_PERCENTAGE,
DEFAULT_DOTENV_FILEPATH,
BBO_ROYALTY_FEE_PERCENTAGE,
BBO_SKUS,
)
from camisatoshi_wordpress_reports.report_building import (
OrderObtentionChainBuilder,
WoocomerceOrderScope,
keep_orders_containing_sku,
)
API_CONFIG = dotenv_values(
dotenv_path=Path.home() / Path(DEFAULT_DOTENV_FILEPATH)
)
WC_API = API(
url=API_CONFIG["URL"],
consumer_key=API_CONFIG["CONSUMER_KEY"],
consumer_secret=API_CONFIG["CONSUMER_SECRET"],
version=API_CONFIG["VERSION"],
)
logger = logging.getLogger()
def check_health():
logger.info(
f"Connecting to the configured WooCommerce at {API_CONFIG['URL']}"
)
try:
api_reported_version = WC_API.get("").json()["namespace"]
except:
raise ConnectionError(
"There was an issue connecting to the WooCommerce API."
)
logger.info(f"Informed version of the API: {API_CONFIG['VERSION']}")
logger.info(f"Version reported by the API itself: {api_reported_version}")
logger.info("Connection successful. The API is reachable.")
def generate_um_report(
start_date: datetime.datetime, end_date: datetime.datetime
) -> None:
logger.info(f"Fetching orders between {start_date} and {end_date}.")
orders_in_date_range = WC_API.get(
endpoint="orders",
params={
"after": start_date.isoformat(),
"before": end_date.isoformat(),
"per_page": 100,
"status": "processing,completed",
},
).json()
orders_in_date_range = Orders(
[
Order.from_api_response(order_raw_data)
for order_raw_data in orders_in_date_range
]
)
logger.info(f"Received {len(orders_in_date_range)} orders.")
relevant_skus = [
"TEE-05-BBO-BLACK",
"SUD-01-BBO-BLACK",
"TEE-09-SIMPLY-BITCOIN",
]
logger.info(f"Filtering by SKUs: {relevant_skus}")
relevant_orders = orders_in_date_range.filter_orders_by_skus(
skus=relevant_skus
)
logger.info(f"Kept {len(relevant_orders)} orders.")
logger.info(
"Checking if all orders have the sats_received entry filled in."
)
orders_without_sats_received = (
relevant_orders.filter_orders_without_sats_received()
)
if orders_without_sats_received:
logger.warning(
f"There are {len(orders_without_sats_received)} orders without a properly filled sats_received entry."
)
logger.warning(f"See details below.")
logger.warning(orders_without_sats_received)
raise ValueError(
"Not all orders have sats_received. Can't compute sats owed without that."
)
logger.info("Success, all orders have sats_received filled in.")
logger.info("Removing settled orders.")
unsettled_orders = relevant_orders.filter_unsettled_orders()
logger.info(f"Kept {len(unsettled_orders)} unsettled orders.")
logger.info("Order filtering finished.")
logger.info(
f"Relevant orders: {[order['id'] for order in unsettled_orders]}."
)
report = []
for relevant_sku in relevant_skus:
logger.debug(f"Reporting SKU {relevant_sku}")
for order in unsettled_orders:
if not order.contains_sku(relevant_sku):
continue
logger.debug(f"Reporting for order {order[ORDER_KEYS.id]}")
# A few helper variables to make the last variable more understandable
sats_received_for_sku = order.sats_received_for_sku(relevant_sku)
bbo_fee_if_applicable = (BBO_ROYALTY_FEE_PERCENTAGE * (relevant_sku in BBO_SKUS))
discount_factor_for_bbo_skus = 1 - bbo_fee_if_applicable
# We owe UM his percentages of the sats received after discounting
# royalties paid to BBO for the BBO products.
sats_owed_to_um = (
(sats_received_for_sku * discount_factor_for_bbo_skus)
* UM_FIRST_AGREEMENT_PERCENTAGE
)
report.append(
{
"order_id": order[ORDER_KEYS.id],
"sku": relevant_sku,
"units_sold": order.units_of_sku(relevant_sku),
"eur_income": order.sales_of_sku(relevant_sku),
"sats_income": sats_received_for_sku,
"sats_owed_to_um": sats_owed_to_um,
}
)
logger.info("Report generated.")
logger.info(report)
keys = report[0].keys()
with open("report.csv", "w", newline="") as output_file:
dict_writer = csv.DictWriter(output_file, keys)
dict_writer.writeheader()
dict_writer.writerows(report)
def generate_sku_report(start_date, end_date, sku):
logger.info(f"Fetching orders between {start_date} and {end_date}.")
report_chain_builder = OrderObtentionChainBuilder()
report_chain_builder.add_order_fetching_step(
wc_order_scope=WoocomerceOrderScope(
after=start_date.isoformat(),
before=end_date.isoformat(),
status="processing,completed",
)
)
report_chain_builder.add_order_filtering_step(
partial(keep_orders_containing_sku, sku=sku)
)
report_chain = report_chain_builder.get_report_chain()
relevant_orders = report_chain.run_chain(WC_API)
report = []
for order in relevant_orders:
report.append(
{
"order_id": order[ORDER_KEYS.id],
"sku": sku,
"units_sold": order.units_of_skus(sku),
"eur_income": order.sales_of_skus(sku),
}
)
logger.info("Report generated.")
logger.info(report)
keys = report[0].keys()
with open("report.csv", "w", newline="") as output_file:
dict_writer = csv.DictWriter(output_file, keys)
dict_writer.writeheader()
dict_writer.writerows(report)