Implemented a new throttling module to remove redundance in the project.
This commit is contained in:
parent
f207dd5dda
commit
2a9483981e
2 changed files with 298 additions and 0 deletions
192
core/throttling_utils.py
Normal file
192
core/throttling_utils.py
Normal file
|
|
@ -0,0 +1,192 @@
|
||||||
|
from typing import List, Callable
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
|
class BaseThrottlingRule:
|
||||||
|
"""
|
||||||
|
Interface for all throttling rules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __call__(self, **kwargs) -> bool:
|
||||||
|
"""
|
||||||
|
Upon calling the rule itself, the underlying check gets executed.
|
||||||
|
:param kwargs: arguments for check
|
||||||
|
:return: True if the check is OK, False otherwise
|
||||||
|
"""
|
||||||
|
return self._check_rule(**kwargs)
|
||||||
|
|
||||||
|
def _check_rule(self, **kwargs) -> bool:
|
||||||
|
"""
|
||||||
|
Interface for internal method to check the rule.
|
||||||
|
:param kwargs: arguments for check
|
||||||
|
:return: True if the check is OK, False otherwise
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class WorkingHoursThrottlingRule(BaseThrottlingRule):
|
||||||
|
"""
|
||||||
|
Rule for checking if current time is within the defined working hours.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, working_hours: dict) -> None:
|
||||||
|
"""
|
||||||
|
Set the working hours as a dict with "start" and "end" keys, which
|
||||||
|
contain time objects.
|
||||||
|
:param working_hours: the definition of the working hours range
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
self._working_hours = working_hours
|
||||||
|
|
||||||
|
def _check_rule(self) -> bool:
|
||||||
|
"""
|
||||||
|
Call underyling check method.
|
||||||
|
:return: True if the check is OK, False otherwise
|
||||||
|
"""
|
||||||
|
return self._inside_working_hours()
|
||||||
|
|
||||||
|
def _inside_working_hours(self) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if the current time is between the defined window of working
|
||||||
|
hours.
|
||||||
|
:return: True if within range, False otherwise
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
self._working_hours["start"]
|
||||||
|
<= datetime.datetime.now().time()
|
||||||
|
<= self._working_hours["end"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CooldownThrottlingRule(BaseThrottlingRule):
|
||||||
|
"""
|
||||||
|
Rule for checking if a certain time period has passed since the last
|
||||||
|
execution.
|
||||||
|
:attribute required_arguments: the list with arguments expected to be ready
|
||||||
|
for unpacking when checking the rule.
|
||||||
|
"""
|
||||||
|
|
||||||
|
required_arguments = ["last_attempt_timestamp"]
|
||||||
|
|
||||||
|
def __init__(self, cooldown_time_generator: Callable) -> None:
|
||||||
|
"""
|
||||||
|
Set the passed cooldown timer generator.
|
||||||
|
:param cooldown_time_generator: a callable object that returns some
|
||||||
|
number of seconds. Can be random or static.
|
||||||
|
"""
|
||||||
|
self._cooldown_time_generator = cooldown_time_generator
|
||||||
|
self._current_cooldown_time = self._cooldown_time_generator()
|
||||||
|
|
||||||
|
def _check_rule(self, **kwargs) -> bool:
|
||||||
|
"""
|
||||||
|
Unpack argument and call underyling check method.
|
||||||
|
:return: True if the check is OK, False otherwise
|
||||||
|
"""
|
||||||
|
last_attempt_timestamp = kwargs["last_attempt_timestamp"]
|
||||||
|
|
||||||
|
return self._check_if_cooldowned(last_attempt_timestamp)
|
||||||
|
|
||||||
|
def _check_if_cooldowned(self, last_attempt_timestamp: datetime) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if the cooldown time has passed. If so, set a new one.
|
||||||
|
:param last_attempt_timestamp: timestamp for the last time whatever
|
||||||
|
must be throttled happened.
|
||||||
|
:return: True if the cooldown time has passed, False otherwise
|
||||||
|
"""
|
||||||
|
cooldown_release_timestamp = last_attempt_timestamp + datetime.timedelta(
|
||||||
|
seconds=self._current_cooldown_time
|
||||||
|
)
|
||||||
|
|
||||||
|
if datetime.datetime.now() > cooldown_release_timestamp:
|
||||||
|
self._current_cooldown_time = self._cooldown_time_generator()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicThrottlingRule(BaseThrottlingRule):
|
||||||
|
"""
|
||||||
|
A basic interface to dynamically set any function, optionally with
|
||||||
|
arguments, as a throttling rule.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, any_callable: Callable) -> None:
|
||||||
|
"""
|
||||||
|
Sets the callable that will act as a check. Only condition is that the
|
||||||
|
callable should return a boolean value.
|
||||||
|
:param any_callable: the check callable object
|
||||||
|
"""
|
||||||
|
self._some_rule = any_callable
|
||||||
|
|
||||||
|
def _check_rule(self, **kwargs) -> bool:
|
||||||
|
"""
|
||||||
|
Calls the dynamically set callable while passing any given arguments.
|
||||||
|
:param kwargs: arguments for check
|
||||||
|
:return: True if the check is OK, False otherwise
|
||||||
|
"""
|
||||||
|
return self._some_rule(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class ThrottleManager:
|
||||||
|
"""
|
||||||
|
Holds and runs all throttling rules on demand.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""
|
||||||
|
Initialize internal attributes.
|
||||||
|
"""
|
||||||
|
self._rules_to_check = []
|
||||||
|
self._rules_and_required_arguments = dict()
|
||||||
|
|
||||||
|
def allow_next_task(self, **kwargs) -> bool:
|
||||||
|
"""
|
||||||
|
Checks all the internal rules and returns whether all of them passed
|
||||||
|
successfully or not.
|
||||||
|
:param kwargs: any arguments needed by the rules
|
||||||
|
:return: True if all rules passed positively, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
check_results = self._check_all_rules(**kwargs)
|
||||||
|
|
||||||
|
if not all(check_results):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def add_rule(
|
||||||
|
self, rule: BaseThrottlingRule, required_argument_names: List[str] = None
|
||||||
|
) -> "ThrottleManager":
|
||||||
|
"""
|
||||||
|
Includes a new rule to the manager together with the argument names
|
||||||
|
that the rule call expects.
|
||||||
|
:param rule: the rule instance
|
||||||
|
:param required_argument_names: the required argument names to execute
|
||||||
|
the check for that rule
|
||||||
|
:return: the ThrottleManager instance
|
||||||
|
"""
|
||||||
|
required_argument_names = required_argument_names or []
|
||||||
|
|
||||||
|
self._rules_to_check.append(rule)
|
||||||
|
self._rules_and_required_arguments[rule.__class__] = required_argument_names
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _check_all_rules(self, **kwargs) -> List[bool]:
|
||||||
|
"""
|
||||||
|
Executes all checks with the right arguments for each of them and
|
||||||
|
collects results.
|
||||||
|
:param kwargs: all passed arguments
|
||||||
|
:return: the result of each individual check
|
||||||
|
"""
|
||||||
|
checks = []
|
||||||
|
|
||||||
|
for rule in self._rules_to_check:
|
||||||
|
arguments_for_rule = {
|
||||||
|
argument_name: kwargs[argument_name]
|
||||||
|
for argument_name in self._rules_and_required_arguments[rule.__class__]
|
||||||
|
}
|
||||||
|
checks.append(rule(**arguments_for_rule))
|
||||||
|
continue
|
||||||
|
|
||||||
|
return checks
|
||||||
106
tests/throttling_test.py
Normal file
106
tests/throttling_test.py
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from core.throttling_utils import (
|
||||||
|
ThrottleManager,
|
||||||
|
CooldownThrottlingRule,
|
||||||
|
WorkingHoursThrottlingRule,
|
||||||
|
DynamicThrottlingRule,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_working_hours_throttling_rule_checks():
|
||||||
|
working_hours_rule = WorkingHoursThrottlingRule(
|
||||||
|
working_hours={
|
||||||
|
"start": datetime.datetime.now().time(),
|
||||||
|
"end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert working_hours_rule() == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_working_hours_throttling_rule_does_not_check():
|
||||||
|
working_hours_rule = WorkingHoursThrottlingRule(
|
||||||
|
working_hours={
|
||||||
|
"start": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(),
|
||||||
|
"end": (datetime.datetime.now() + datetime.timedelta(hours=2)).time(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert working_hours_rule() == False
|
||||||
|
|
||||||
|
|
||||||
|
def test_cooldown_throttling_rule_checks():
|
||||||
|
time_generator = lambda: 60
|
||||||
|
|
||||||
|
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
cooldown_rule(
|
||||||
|
last_attempt_timestamp=datetime.datetime.now()
|
||||||
|
+ datetime.timedelta(seconds=-120)
|
||||||
|
)
|
||||||
|
== True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cooldown_throttling_rule_does_not_check():
|
||||||
|
time_generator = lambda: 60
|
||||||
|
|
||||||
|
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
|
||||||
|
|
||||||
|
assert cooldown_rule(last_attempt_timestamp=datetime.datetime.now()) == False
|
||||||
|
|
||||||
|
|
||||||
|
def test_dynamic_rule_checks():
|
||||||
|
mock_check = lambda: True
|
||||||
|
|
||||||
|
rule = DynamicThrottlingRule(any_callable=mock_check)
|
||||||
|
|
||||||
|
assert rule() == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_dynamic_rule_does_not_check():
|
||||||
|
mock_check = lambda: False
|
||||||
|
|
||||||
|
rule = DynamicThrottlingRule(any_callable=mock_check)
|
||||||
|
|
||||||
|
assert rule() == False
|
||||||
|
|
||||||
|
|
||||||
|
def test_dynamic_rule_arguments_pass_properly():
|
||||||
|
def pass_a_bool(some_bool):
|
||||||
|
return some_bool
|
||||||
|
|
||||||
|
rule = DynamicThrottlingRule(pass_a_bool)
|
||||||
|
|
||||||
|
assert (rule(some_bool=True) == True) and (rule(some_bool=False) == False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_throttle_manager_checks_rules():
|
||||||
|
throttle_manager = ThrottleManager()
|
||||||
|
|
||||||
|
def pass_a_bool(some_bool):
|
||||||
|
return some_bool
|
||||||
|
|
||||||
|
some_rules = [
|
||||||
|
WorkingHoursThrottlingRule(
|
||||||
|
working_hours={
|
||||||
|
"start": datetime.datetime.now().time(),
|
||||||
|
"end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(),
|
||||||
|
}
|
||||||
|
),
|
||||||
|
CooldownThrottlingRule(cooldown_time_generator=lambda: 0),
|
||||||
|
DynamicThrottlingRule(any_callable=pass_a_bool),
|
||||||
|
]
|
||||||
|
|
||||||
|
some_arguments = [[], ["last_attempt_timestamp"], ["some_bool"]]
|
||||||
|
|
||||||
|
some_rules_and_arguments = zip(some_rules, some_arguments)
|
||||||
|
|
||||||
|
for rule, arguments in some_rules_and_arguments:
|
||||||
|
throttle_manager.add_rule(rule, required_argument_names=arguments)
|
||||||
|
|
||||||
|
assert throttle_manager.allow_next_task(
|
||||||
|
last_attempt_timestamp=datetime.datetime.now(), some_bool=True
|
||||||
|
)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue