from typing import List, Callable import datetime class BaseThrottlingRule: """ Interface for all throttling rules. """ def __call__(self, **kwargs) -> bool: """ Upon calling the rule itself, the underlying check gets executed. :param kwargs: arguments for check :return: True if the check is OK, False otherwise """ return self._check_rule(**kwargs) def _check_rule(self, **kwargs) -> bool: """ Interface for internal method to check the rule. :param kwargs: arguments for check :return: True if the check is OK, False otherwise """ raise NotImplementedError class WorkingHoursThrottlingRule(BaseThrottlingRule): """ Rule for checking if current time is within the defined working hours. """ def __init__(self, working_hours: dict) -> None: """ Set the working hours as a dict with "start" and "end" keys, which contain time objects. :param working_hours: the definition of the working hours range :return: None """ self._working_hours = working_hours def _check_rule(self) -> bool: """ Call underyling check method. :return: True if the check is OK, False otherwise """ return self._inside_working_hours() def _inside_working_hours(self) -> bool: """ Checks if the current time is between the defined window of working hours. :return: True if within range, False otherwise """ return ( self._working_hours["start"] <= datetime.datetime.now().time() <= self._working_hours["end"] ) class CooldownThrottlingRule(BaseThrottlingRule): """ Rule for checking if a certain time period has passed since the last execution. :attribute required_arguments: the list with arguments expected to be ready for unpacking when checking the rule. """ required_arguments = ["last_attempt_timestamp"] def __init__(self, cooldown_time_generator: Callable) -> None: """ Set the passed cooldown timer generator. :param cooldown_time_generator: a callable object that returns some number of seconds. Can be random or static. """ self._cooldown_time_generator = cooldown_time_generator self._current_cooldown_time = self._cooldown_time_generator() def _check_rule(self, **kwargs) -> bool: """ Unpack argument and call underyling check method. :return: True if the check is OK, False otherwise """ last_attempt_timestamp = kwargs["last_attempt_timestamp"] return self._check_if_cooldowned(last_attempt_timestamp) def _check_if_cooldowned(self, last_attempt_timestamp: datetime) -> bool: """ Checks if the cooldown time has passed. If so, set a new one. :param last_attempt_timestamp: timestamp for the last time whatever must be throttled happened. :return: True if the cooldown time has passed, False otherwise """ cooldown_release_timestamp = last_attempt_timestamp + datetime.timedelta( seconds=self._current_cooldown_time ) if datetime.datetime.now() > cooldown_release_timestamp: self._current_cooldown_time = self._cooldown_time_generator() return True return False class DynamicThrottlingRule(BaseThrottlingRule): """ A basic interface to dynamically set any function, optionally with arguments, as a throttling rule. """ def __init__(self, any_callable: Callable) -> None: """ Sets the callable that will act as a check. Only condition is that the callable should return a boolean value. :param any_callable: the check callable object """ self._some_rule = any_callable def _check_rule(self, **kwargs) -> bool: """ Calls the dynamically set callable while passing any given arguments. :param kwargs: arguments for check :return: True if the check is OK, False otherwise """ return self._some_rule(**kwargs) class ThrottleManager: """ Holds and runs all throttling rules on demand. """ def __init__(self) -> None: """ Initialize internal attributes. """ self._rules_to_check = [] self._rules_and_required_arguments = dict() def allow_next_task(self, **kwargs) -> bool: """ Checks all the internal rules and returns whether all of them passed successfully or not. :param kwargs: any arguments needed by the rules :return: True if all rules passed positively, False otherwise """ check_result = self._check_all_rules(**kwargs) return check_result def add_rule( self, rule: BaseThrottlingRule, required_argument_names: List[str] = None ) -> "ThrottleManager": """ Includes a new rule to the manager together with the argument names that the rule call expects. :param rule: the rule instance :param required_argument_names: the required argument names to execute the check for that rule :return: the ThrottleManager instance """ required_argument_names = required_argument_names or [] self._rules_to_check.append(rule) self._rules_and_required_arguments[rule.__class__] = required_argument_names return self def _check_all_rules(self, **kwargs) -> bool: """ Executes checks (lazily) with the right arguments for each of them and collects results. :param kwargs: all passed arguments :return: True if all checks passed, False otherwise """ checks = [] for rule in self._rules_to_check: arguments_for_rule = { argument_name: kwargs[argument_name] for argument_name in self._rules_and_required_arguments[rule.__class__] } checks.append(rule(**arguments_for_rule)) if checks[-1] == False: return False return True