Merge branch 'release/0.1.0' into 'master'

Release/0.1.0

See merge request pablomartincalvo/Drogon!3
This commit is contained in:
Pablo Martin 2021-01-06 09:45:45 +00:00
commit 744a0a38d4
11 changed files with 4107 additions and 557 deletions

View file

@ -1,2 +0,0 @@
# -*- coding: utf-8 -*-

View file

@ -1,15 +1,18 @@
import sys
sys.path.append("..")
from time import sleep from time import sleep
from bs4 import BeautifulSoup
import re
import datetime import datetime
from db_layer.capturing_tasks_interface import capturing_interface
from db_layer.capturas_interface import capturas_interface from db_layer.capturing_tasks_interface import CapturingTasksInterface
from db_layer.capturas_interface import CapturasInterface
from core.scrapping_utils import UrlAttack from core.scrapping_utils import UrlAttack
from core.config import working_hours, minimum_seconds_between_tries from core.config import working_hours, minimum_seconds_between_tries
from core.throttling_utils import (
ThrottleManager,
WorkingHoursThrottlingRule,
CooldownThrottlingRule,
DynamicThrottlingRule,
)
from refresher.refresher import Refresher from refresher.refresher import Refresher
from core.parsing_utils import *
from core import my_logger from core import my_logger
import logging import logging
@ -20,7 +23,35 @@ class Capturer:
scraping and db storage. scraping and db storage.
""" """
def __init__(self) -> None: def __init__(
self,
throttling_manager: ThrottleManager,
capturing_tasks_interface: CapturingTasksInterface,
capturas_interface: CapturasInterface,
parsing_flow_generator: ParsingFlowGenerator,
url_acquisition_object: Type[UrlAttack],
dead_ad_checker: Callable,
) -> None:
"""
Receive all required objects.
:param throttling_manager: takes care of deciding whether a task should
be started
:param capturing_tasks_interface: interface to interact with the tasks
database
:param capturas_interface: interface to interact with the ad database
:param parsing_flow_generator: an object capable of generating empty
parsing flows to give each task a new one
:param url_acquisition_object: gateway to obtaining the HTML of an url
:param dead_ad_checker: callable capable of checking if an ad is dead
through its HTML
"""
self._throttling_manager = throttling_manager
self._capturing_tasks_interface = capturing_tasks_interface
self._capturas_interface = capturas_interface
self._parsing_flow_generator = parsing_flow_generator
self._url_acquisition_object = url_acquisition_object
self._dead_ad_checker = dead_ad_checker
self.last_try_datetime = datetime.datetime.now() self.last_try_datetime = datetime.datetime.now()
def start(self) -> None: def start(self) -> None:
@ -33,56 +64,35 @@ class Capturer:
logging.info("Starting capturer") logging.info("Starting capturer")
while True: while True:
if not self._in_working_hours(): while not self._throttling_manager.allow_next_task(
sleep(1800) last_attempt_timestamp=self.last_try_datetime
):
sleep(10)
logging.info("Waiting...") logging.info("Waiting...")
continue
seconds_to_next_capture = ( pending_task = self._capturing_tasks_interface.get_pending_task()
minimum_seconds_between_tries() - self._seconds_since_last_try()
logging.info("Got a task")
task = CapturingTask(
pending_task,
capturing_interface=self._capturing_tasks_interface,
new_parsing_flow=self._parsing_flow_generator.get_new_flow(),
url_acquisition_object=self._url_acquisition_object,
dead_ad_checker=self._dead_ad_checker,
) )
if seconds_to_next_capture > 0:
sleep(seconds_to_next_capture)
logging.info("Waiting...")
pending_task = capturing_interface.get_pending_task()
if not pending_task:
logging.info("No pending tasks.")
continue
task = CapturingTask(pending_task)
self.last_try_datetime = datetime.datetime.now() self.last_try_datetime = datetime.datetime.now()
task.capture() task.capture()
if task.status == "Data ready": if not task.status == "Data ready":
ad_data = task.get_ad_data()
else:
logging.warning("Something went wrong, not adding data.") logging.warning("Something went wrong, not adding data.")
continue continue
capturas_interface.insert_captura(ad_data) ad_data = task.get_ad_data()
task._update_status("Captura inserted") self._capturas_interface.insert_captura(ad_data)
task.update_status("Captura inserted")
logging.info("New ad inserted.") logging.info("New ad inserted.")
@staticmethod
def _in_working_hours() -> bool:
"""
Checks whether now is within the working hours of the daemon.
:return: True if so, false if not
"""
return (
working_hours["start"]
<= datetime.datetime.now().time()
<= working_hours["end"]
)
def _seconds_since_last_try(self) -> float:
"""
Computes how many seconds have passed since the last capturing attempt
:return: seconds since last try as integer
"""
return (datetime.datetime.now() - self.last_try_datetime).total_seconds()
class CapturingTask: class CapturingTask:
""" """
@ -92,29 +102,44 @@ class CapturingTask:
sleep_time_failed_request = 180 sleep_time_failed_request = 180
def __init__(self, parameters) -> None: def __init__(
self,
task_parameters: dict,
capturing_interface: CapturingTasksInterface,
new_parsing_flow: ParsingFlow,
url_acquisition_object: Type[UrlAttack],
dead_ad_checker: Callable,
) -> None:
""" """
Initialize with task parameters and mark the task as being worked on Initialize with task parameters and mark the task as being worked on
in the task queue. in the task queue.
:param parameters: dict with the necessary parameters for the task :param task_parameters: dict with the necessary parameters for the task
:param capturing_interface: interface to interact with the ad database
:param new_parsing_flow: an empty parsing flow
:param url_acquisition_object: gateway to obtaining the HTML of an url
:param dead_ad_checker: callable capable of checking if an ad is dead
""" """
self.uuid = parameters["uuid"] self.uuid = task_parameters["uuid"]
self.ad_url = parameters["ad_url"] self.ad_url = task_parameters["ad_url"]
self.uuid_exploring = parameters["fk_uuid_exploring"] self.uuid_exploring = task_parameters["fk_uuid_exploring"]
self.status = parameters["status"] self.status = task_parameters["status"]
self.request_failures = 1 self.request_failures = 1
self.html = None self.html = None
self._parsing_flow = new_parsing_flow
self._capturing_interface = capturing_interface
self._url_acquistion_object = url_acquisition_object
self._is_dead_ad = dead_ad_checker
self._update_status("Loading") self.update_status("Loading")
def _update_status(self, new_status) -> None: def update_status(self, new_status) -> None:
""" """
Updates the task status and persists it in the task queue. Updates the task status and persists it in the task queue.
:param new_status: string describing the new status :param new_status: string describing the new status
:return: None :return: None
""" """
self.status = new_status self.status = new_status
capturing_interface.update_capturing_task( self._capturing_interface.update_capturing_task(
self.uuid, self.uuid_exploring, self.status, self.ad_url self.uuid, self.uuid_exploring, self.status, self.ad_url
) )
@ -122,288 +147,132 @@ class CapturingTask:
""" """
Main flow of work Main flow of work
""" """
self._update_status("WIP") self.update_status("WIP")
while self.request_failures < 4: while self.request_failures < 4:
attack = UrlAttack(self.ad_url) attack = self._url_acquistion_object(self.ad_url)
attack.attack() attack.attack()
if attack.success: if attack.success:
self.html = attack.get_text() logging.info("URL attack successful.")
self._extract_data() self._parse_html(html=attack.get_text())
self._check_data()
return return
if not attack.success: if not attack.success:
logging.info("URL attack failed.")
try: try:
if Refresher.dead_ad_checker(attack.get_text()): if self._is_dead_ad(attack.get_text()):
self._update_status("Dead ad") self.update_status("Dead ad")
logging.info("Ad was tagged as dead.")
return return
except AttributeError: except AttributeError:
logging.error( logging.error(
"Something went wrong when checking if the ad is gone" "Something went wrong when checking if the ad is gone"
) )
logging.error(AttributeError)
self._update_status("Fail {}".format(self.request_failures)) self.update_status("Fail {}".format(self.request_failures))
self.request_failures += 1 self.request_failures += 1
sleep(CapturingTask.sleep_time_failed_request) sleep(CapturingTask.sleep_time_failed_request)
continue continue
self._update_status("Surrender") self.update_status("Surrender")
logging.warning(f"A task has surrendered. {self.ad_url}") logging.warning(f"A task has surrendered. {self.ad_url}")
def _extract_data(self) -> None:
"""
Parses the obtained html to extract the ad information.
:return: None
"""
self.parser = AdHtmlParser(self.html)
self.parser.parse()
def _check_data(self) -> None:
"""
Validates that all compulsory fields have been obtained and that the
values are within the expected. Sets the status of task accordingly.
:return: None
"""
if self.parser.fields_missing():
self._update_status("Fields missing")
return
if not self.parser.all_fields_are_valid():
self._update_status("Invalid value fields")
return
self._update_status("Data ready")
def get_ad_data(self) -> dict: def get_ad_data(self) -> dict:
""" """
Returns the extracted data. Returns the extracted data.
:return: dictionary with the data of the ad. :return: dictionary with the data of the ad.
""" """
return self.parser.get_data() return self._parsing_flow.field_values
def _parse_html(self, html: str) -> None:
class AdHtmlParser:
"""
Object for parsing, storing and validating the data of the HTML of an ad.
"""
def __init__(self, html_string: str) -> None:
""" """
Initializes an instance of the parser with the HTML of an ad. Execute the complete parsing flow and report the task status depending
:param html_string: the full HTML code of the ad page on the outcome.
""" :param html: the HTML of the ad
self.html = html_string
self.ad_fields = {
"referencia": {"found": False, "optional": False, "value": None},
"precio": {"found": False, "optional": False, "value": None},
"tamano_categorico": {"found": False, "optional": True, "value": None},
"m2": {"found": False, "optional": True, "value": None},
"tipo_anuncio": {"found": False, "optional": False, "value": None},
"calle": {"found": False, "optional": True, "value": None},
"barrio": {"found": False, "optional": False, "value": None},
"distrito": {"found": False, "optional": False, "value": None},
"ciudad": {"found": False, "optional": False, "value": None},
"cubierta": {"found": False, "optional": False, "value": None},
"puerta_auto": {"found": False, "optional": False, "value": None},
"ascensor": {"found": False, "optional": False, "value": None},
"alarma": {"found": False, "optional": False, "value": None},
"circuito": {"found": False, "optional": False, "value": None},
"personal": {"found": False, "optional": False, "value": None},
"telefono": {"found": False, "optional": True, "value": None},
}
def parse(self) -> None:
"""
Parses the HTML and stores the ad data.
:return: None :return: None
""" """
self._parsing_flow.execute_flow(soup=BeautifulSoup(html, "html5lib"))
soup = BeautifulSoup(self.html, "html5lib") if not self._parsing_flow.issues:
self.update_status("Data ready")
return
if soup.find_all("link", {"rel": "canonical"}) is not None: if not self._parsing_flow.all_found_fields_are_valid:
self.ad_fields["referencia"]["value"] = re.findall( self.update_status("Invalid value fields")
r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0]) logging.warning(f"Invalid fields found in ad: {self.ad_url}")
)[0] logging.warning(f"{self._parsing_flow.issues}")
self.ad_fields["referencia"]["found"] = True return
if not self._parsing_flow.all_non_optional_fields_were_found:
if soup.find_all("strong", {"class": "price"}) is not None: self.update_status("Fields missing")
self.ad_fields["precio"]["value"] = "".join( logging.warning(
re.findall( f"Couldn't scrap necessary fields: {self._parsing_flow.issues}"
r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0])
)
) )
self.ad_fields["precio"]["found"] = True return
if soup.find("div", {"class": "info-features"}) is not None:
try:
if (
""
not in soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
):
self.ad_fields["tamano_categorico"]["value"] = (
soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
)
self.ad_fields["tamano_categorico"]["found"] = True
except:
pass
posible_m2 = [
tag.text
for tag in soup.find("div", {"class": "info-features"}).find_all("span")
]
if [posible for posible in posible_m2 if "" in posible]:
self.ad_fields["m2"]["value"] = [
"".join(re.findall(r"[0-9]+,*[0-9]*", posible))
for posible in posible_m2
if "" in posible
][0].replace(",", ".")
self.ad_fields["m2"]["found"] = True
if soup.find("title") is not None:
if "venta" in soup.find("title").text:
self.ad_fields["tipo_anuncio"]["value"] = 1
else:
self.ad_fields["tipo_anuncio"]["value"] = 2
self.ad_fields["tipo_anuncio"]["found"] = True
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3:
self.ad_fields["calle"]["value"] = ""
self.ad_fields["ciudad"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip()
)
self.ad_fields["ciudad"]["found"] = True
self.ad_fields["distrito"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip()
)
self.ad_fields["distrito"]["found"] = True
self.ad_fields["barrio"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip()
)
self.ad_fields["barrio"]["found"] = True
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4:
self.ad_fields["calle"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip()
)
self.ad_fields["calle"]["found"] = True
features_lists = soup.find_all("div", {"class": "details-property_features"})
features = [
feature.text
for feature_list in features_lists
for feature in feature_list.find_all("li")
]
self.ad_fields["cubierta"]["value"] = 1 * any(
"Cubierta" in feature for feature in features
)
self.ad_fields["puerta_auto"]["value"] = 1 * any(
"Puerta" in feature for feature in features
)
self.ad_fields["ascensor"]["value"] = 1 * any(
"ascensor" in feature for feature in features
)
self.ad_fields["alarma"]["value"] = 1 * any(
"Alarma" in feature for feature in features
)
self.ad_fields["circuito"]["value"] = 1 * any(
"Cámaras" in feature for feature in features
)
self.ad_fields["personal"]["value"] = 1 * any(
"Personal" in feature for feature in features
)
self.ad_fields["cubierta"]["found"] = True
self.ad_fields["puerta_auto"]["found"] = True
self.ad_fields["ascensor"]["found"] = True
self.ad_fields["alarma"]["found"] = True
self.ad_fields["circuito"]["found"] = True
self.ad_fields["personal"]["found"] = True
if soup.find("p", {"class": "txt-bold _browserPhone icon-phone"}) is not None:
self.ad_fields["telefono"]["value"] = soup.find(
"p", {"class": "txt-bold _browserPhone icon-phone"}
).text.replace(" ", "")
self.ad_fields["telefono"]["found"] = True
def _validate(self) -> None:
"""
Checks whether the extracted values are valid against the expected
typology. Stores the results.
:return: None
"""
self.invalid_fields = []
if not re.match(r"[0-9]{4,20}", self.ad_fields["referencia"]["value"]):
self.invalid_fields.append("referencia")
if not re.match(r"[0-9]{1,20}", self.ad_fields["precio"]["value"]):
self.invalid_fields.append("precio")
possible_values_tamano = [
"2 coches o más",
"coche y moto",
"coche grande",
"coche pequeño",
"moto",
None,
]
if self.ad_fields["tamano_categorico"]["value"] not in possible_values_tamano:
self.invalid_fields.append("tamano_categorico")
if not "Barrio" in self.ad_fields["barrio"]["value"]:
self.invalid_fields.append("barrio")
if not "Distrito" in self.ad_fields["distrito"]["value"]:
self.invalid_fields.append("distrito")
if self.ad_fields["telefono"]["found"] and not re.match(
r"\s*\+?[0-9\s]*", self.ad_fields["telefono"]["value"]
):
self.invalid_fields.append("telefono")
# TODO añadir + a caracteres validos
def all_fields_are_valid(self) -> bool:
"""
Reports on whether the extracted data is valid.
:return: True if values are valid, false if not
"""
self._validate()
if self.invalid_fields:
return False
else:
return True
def fields_missing(self) -> None:
"""
Reports on whether all compulsory fields are present.
:return: True if some field is missing, false if not
"""
for key, contents in self.ad_fields.items():
if not contents["optional"] and not contents["found"]:
return True
return False
def get_data(self) -> dict:
"""
Returns the extracted data in the form of a dictionary.
:return: dictionary with the extracted data
"""
data = {}
for ad_field in self.ad_fields.keys():
data[ad_field] = self.ad_fields[ad_field]["value"]
return data
if __name__ == "__main__": if __name__ == "__main__":
capturer = Capturer()
capturing_tasks_interface = CapturingTasksInterface()
capturas_interface = CapturasInterface()
throttling_manager = ThrottleManager()
throttling_manager.add_rule(WorkingHoursThrottlingRule(working_hours)).add_rule(
CooldownThrottlingRule(minimum_seconds_between_tries),
required_argument_names=["last_attempt_timestamp"],
).add_rule(
DynamicThrottlingRule(
lambda: bool(capturing_tasks_interface.get_pending_task())
)
)
parsing_flow_generator = ParsingFlowGenerator(
ParsingFlow,
(
(ReferenciaFieldInstructions, {}),
(PrecioFieldInstructions, {}),
(TamanoCategoricoFieldInstructions, {}),
(M2FieldInstructions, {}),
(TipoAnuncioFieldInstructions, {}),
(CalleFieldInstructions, {}),
(BarrioFieldInstructions, {}),
(DistritoFieldInstructions, {}),
(CiudadFieldInstructions, {}),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "cubierta", "search_keyword": "Cubierta"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "puerta_auto", "search_keyword": "Puerta"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "ascensor", "search_keyword": "ascensor"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "alarma", "search_keyword": "Alarma"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "circuito", "search_keyword": "Cámaras"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "personal", "search_keyword": "Personal"},
),
(TelefonoFieldInstructions, {}),
),
)
capturer = Capturer(
throttling_manager=throttling_manager,
capturing_tasks_interface=capturing_tasks_interface,
capturas_interface=capturas_interface,
parsing_flow_generator=parsing_flow_generator,
url_acquisition_object=UrlAttack,
dead_ad_checker=Refresher.dead_ad_checker,
)
capturer.start() capturer.start()

597
core/parsing_utils.py Normal file
View file

@ -0,0 +1,597 @@
from typing import Union, Iterable, Dict, Callable, Type, Tuple
import re
from bs4 import BeautifulSoup
class BaseTargetFieldInstructions:
"""
Abstract class for all field instructions. Implements useful decorators as
well as the main interface.
"""
class Decorators:
"""
Decorators to use across all field instructions.
"""
@classmethod
def fail_safe_scrape(cls, f: Callable) -> Callable:
"""
Wraps a scrape action in a try-except to control any errors, and
updates the state of the search accordingly.
:param f: the scrape function
:return: the wrapped function
"""
def wrapper(self, soup: BeautifulSoup):
try:
return f(self, soup)
except Exception as e:
self.found = False
self.search_issue = e
return self
return wrapper
@classmethod
def if_not_found_do_nothing(cls, f: Callable) -> Callable:
"""
Wraps a function to only execute it if the field has been found in
the html. Otherwise, do nothing.
:param f: the function that might get executed
:return: the wrapped function
"""
def wrapper(self):
if self.found:
return f(self)
return self
return wrapper
def __init__(self) -> None:
"""
Initialize attributes.
"""
self.is_optional = False
self.found = None
self.valid = None
self.value = None
self.search_issue = None
def scrape(self, soup: BeautifulSoup) -> None:
"""
Interface for the scrape method.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
raise NotImplementedError()
def validate(self) -> None:
"""
Interface for the validate method.
:return: None
"""
raise NotImplementedError()
class ReferenciaFieldInstructions(BaseTargetFieldInstructions):
"""
Instructions for field Referencia.
"""
field_name = "referencia"
def __init__(self) -> None:
"""
Initialize all default parameters.
"""
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "ReferenciaFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = re.findall(
r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0])
)[0]
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "ReferenciaFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{4,20}", self.value):
self.valid = True
return self
class TamanoCategoricoFieldInstructions(BaseTargetFieldInstructions):
field_name = "tamano_categorico"
possible_values = [
"2 coches o más",
"coche y moto",
"coche grande",
"coche pequeño",
"moto",
None,
]
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TamanoCategoricoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if (
""
not in soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
):
self.value = (
soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TamanoCategoricoFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in TamanoCategoricoFieldInstructions.possible_values:
self.valid = True
return self
class PrecioFieldInstructions(BaseTargetFieldInstructions):
field_name = "precio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "PrecioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = "".join(
re.findall(r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0]))
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "PrecioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,20}", self.value):
self.valid = True
return self
class M2FieldInstructions(BaseTargetFieldInstructions):
field_name = "m2"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "M2FieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
posible_m2 = [
tag.text
for tag in soup.find("div", {"class": "info-features"}).find_all("span")
]
if [posible for posible in posible_m2 if "" in posible]:
self.value = [
"".join(re.findall(r"[0-9]+,*[0-9]*", posible))
for posible in posible_m2
if "" in posible
][0].replace(",", ".")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "M2FieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,4}", self.value):
self.valid = True
return self
class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions):
field_name = "tipo_anuncio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TipoAnuncioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if "venta" in soup.find("title").text:
self.value = 1
self.found = True
if "Alquiler" in soup.find("title").text:
self.value = 2
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TipoAnuncioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in [1, 2]:
self.valid = True
return self
class CalleFieldInstructions(BaseTargetFieldInstructions):
field_name = "calle"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CalleFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3:
self.value = ""
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4:
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CalleFieldInstructions":
self.valid = True
return self
class BarrioFieldInstructions(BaseTargetFieldInstructions):
field_name = "barrio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "BarrioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "BarrioFieldInstructions":
self.valid = True
return self
class DistritoFieldInstructions(BaseTargetFieldInstructions):
field_name = "distrito"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "DistritoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "DistritoFieldInstructions":
self.valid = True
return self
class CiudadFieldInstructions(BaseTargetFieldInstructions):
field_name = "ciudad"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CiudadFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CiudadFieldInstructions":
self.valid = True
return self
class SecondaryFeaturesFieldInstructions(BaseTargetFieldInstructions):
"""
Shared methods for secondary features found in a list in ads.
"""
def __init__(self, field_name: str, search_keyword: str):
super().__init__()
self.field_name = field_name
self._feature_keyword = search_keyword
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "SecondaryFeaturesFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
return self._find_feature_with_keyword(soup=soup, keyword=self._feature_keyword)
def _find_feature_with_keyword(
self, soup: BeautifulSoup, keyword: str
) -> "SecondaryFeaturesFieldInstructions":
"""
Checks if a feature is in the secondary list by keyword and stores the
value if found.
:param soup: a BeautifulSoup object for the target html
:param keyword: the keyword for that feature
:return: self
"""
features_lists = soup.find_all("div", {"class": "details-property_features"})
features = [
feature.text
for feature_list in features_lists
for feature in feature_list.find_all("li")
]
if not features:
self.found = False
return self
self.value = 1 * any(keyword in feature for feature in features)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "SecondaryFeaturesFieldInstructions":
self.valid = False
if self.value in [0, 1]:
self.valid = True
return self
class TelefonoFieldInstructions(BaseTargetFieldInstructions):
field_name = "telefono"
def __init__(self):
"""
Check if the obtained value fits the expected format.
:return: self
"""
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TelefonoFieldInstructions":
self.value = soup.find(
"p", {"class": "txt-bold _browserPhone icon-phone"}
).text.replace(" ", "")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TelefonoFieldInstructions":
self.valid = False
if re.match(r"\s*\+?[0-9\s]*", self.value):
self.valid = True
return self
class ParsingFlow:
"""
Object to gather all instructions for a job run, execute them and present
the results.
"""
def __init__(self) -> None:
"""
Initialize the instruction list.
"""
self._instructions = []
def add_instructions(
self,
instructions: Union[
BaseTargetFieldInstructions, Iterable[BaseTargetFieldInstructions]
],
):
"""
Include new instructions to the internal list.
:param instructions: a single or iterable group of instructions
:return: self
"""
if isinstance(instructions, BaseTargetFieldInstructions):
self._instructions.append(instructions)
return self
self._instructions.extend(instructions)
return self
def execute_flow(self, soup: BeautifulSoup) -> None:
"""
Scraped and validate all fields according to instructions.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
for instruction in self._instructions:
instruction.scrape(soup).validate()
@property
def field_values(self) -> Dict:
"""
Return the value for all fields, or None.
:return: a dict with the field names and values
"""
return {field.field_name: field.value for field in self._instructions}
@property
def all_found_fields_are_valid(self) -> bool:
"""
Check if all found fields are valid.
:return: True if the fields are valid, False otherwise
"""
relevant_fields = [
field.valid for field in self._instructions if field.found is True
]
return all(relevant_fields)
@property
def all_non_optional_fields_were_found(self) -> bool:
"""
Check if all compulsory fields were found.
:return: True if the fields were found, False otherwise
"""
found_or_not = [
field.found or field.is_optional for field in self._instructions
]
return all(found_or_not)
@property
def issues(self) -> Dict[str, dict]:
"""
Returns all identified issues during scraping and validation.
:return: the issues, bucketed by field
"""
issues = {}
for field in self._instructions:
if (field.found or field.is_optional) and (
field.valid is True or field.valid is None
):
continue
this_field_issues = {}
if not field.found and not field.is_optional:
this_field_issues["found"] = "Not found"
if field.search_issue:
this_field_issues["search_issue"] = field.search_issue
if not field.valid and field.valid is not None:
this_field_issues["validity"] = "Not valid"
this_field_issues["value"] = field.value
issues[field.field_name] = this_field_issues
return issues
class ParsingFlowGenerator:
"""
Class for creating multiple, empty flows based on a group of instructions.
"""
def __init__(
self,
parsing_flow_class: Type[ParsingFlow],
instructions_to_attach_with_params: Union[
Tuple[Type[BaseTargetFieldInstructions], Dict],
Tuple[Tuple[Type[BaseTargetFieldInstructions], Dict]],
],
) -> None:
"""
Set the flow class and group of instructions to use when creating new
instances of the flow class.
:param parsing_flow_class: the flow class to instantiate
:param instructions_to_attach_with_params: one or more pair of field
instructions class and the paramteres to use when instantiating them
"""
self._parsing_flow_class = parsing_flow_class
if not isinstance(instructions_to_attach_with_params, tuple):
instructions_to_attach_with_params = tuple(
instructions_to_attach_with_params
)
self._instructions_to_attach_with_params = instructions_to_attach_with_params
def get_new_flow(self) -> ParsingFlow:
"""
Instantiate a new parsing flow with the instantiated classes attached.
:return: the new parsing flow
"""
new_parsing_flow = self._parsing_flow_class()
for instruction, params in self._instructions_to_attach_with_params:
new_parsing_flow.add_instructions(instruction(**params))
return new_parsing_flow

View file

@ -92,10 +92,13 @@ class UrlAttack:
if self.response.ok: if self.response.ok:
self.success = True self.success = True
except Exception as e: except Exception:
self.success = False self.success = False
if random.randrange(0, 100) < UrlAttack.identity_change_probability: if (
not self.success
or random.randrange(0, 100) < UrlAttack.identity_change_probability
):
self._change_identity() self._change_identity()
def _change_identity(self) -> None: def _change_identity(self) -> None:

190
core/throttling_utils.py Normal file
View file

@ -0,0 +1,190 @@
from typing import List, Callable
import datetime
class BaseThrottlingRule:
"""
Interface for all throttling rules.
"""
def __call__(self, **kwargs) -> bool:
"""
Upon calling the rule itself, the underlying check gets executed.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
return self._check_rule(**kwargs)
def _check_rule(self, **kwargs) -> bool:
"""
Interface for internal method to check the rule.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
raise NotImplementedError
class WorkingHoursThrottlingRule(BaseThrottlingRule):
"""
Rule for checking if current time is within the defined working hours.
"""
def __init__(self, working_hours: dict) -> None:
"""
Set the working hours as a dict with "start" and "end" keys, which
contain time objects.
:param working_hours: the definition of the working hours range
:return: None
"""
self._working_hours = working_hours
def _check_rule(self) -> bool:
"""
Call underyling check method.
:return: True if the check is OK, False otherwise
"""
return self._inside_working_hours()
def _inside_working_hours(self) -> bool:
"""
Checks if the current time is between the defined window of working
hours.
:return: True if within range, False otherwise
"""
return (
self._working_hours["start"]
<= datetime.datetime.now().time()
<= self._working_hours["end"]
)
class CooldownThrottlingRule(BaseThrottlingRule):
"""
Rule for checking if a certain time period has passed since the last
execution.
:attribute required_arguments: the list with arguments expected to be ready
for unpacking when checking the rule.
"""
required_arguments = ["last_attempt_timestamp"]
def __init__(self, cooldown_time_generator: Callable) -> None:
"""
Set the passed cooldown timer generator.
:param cooldown_time_generator: a callable object that returns some
number of seconds. Can be random or static.
"""
self._cooldown_time_generator = cooldown_time_generator
self._current_cooldown_time = self._cooldown_time_generator()
def _check_rule(self, **kwargs) -> bool:
"""
Unpack argument and call underyling check method.
:return: True if the check is OK, False otherwise
"""
last_attempt_timestamp = kwargs["last_attempt_timestamp"]
return self._check_if_cooldowned(last_attempt_timestamp)
def _check_if_cooldowned(self, last_attempt_timestamp: datetime) -> bool:
"""
Checks if the cooldown time has passed. If so, set a new one.
:param last_attempt_timestamp: timestamp for the last time whatever
must be throttled happened.
:return: True if the cooldown time has passed, False otherwise
"""
cooldown_release_timestamp = last_attempt_timestamp + datetime.timedelta(
seconds=self._current_cooldown_time
)
if datetime.datetime.now() > cooldown_release_timestamp:
self._current_cooldown_time = self._cooldown_time_generator()
return True
return False
class DynamicThrottlingRule(BaseThrottlingRule):
"""
A basic interface to dynamically set any function, optionally with
arguments, as a throttling rule.
"""
def __init__(self, any_callable: Callable) -> None:
"""
Sets the callable that will act as a check. Only condition is that the
callable should return a boolean value.
:param any_callable: the check callable object
"""
self._some_rule = any_callable
def _check_rule(self, **kwargs) -> bool:
"""
Calls the dynamically set callable while passing any given arguments.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
return self._some_rule(**kwargs)
class ThrottleManager:
"""
Holds and runs all throttling rules on demand.
"""
def __init__(self) -> None:
"""
Initialize internal attributes.
"""
self._rules_to_check = []
self._rules_and_required_arguments = dict()
def allow_next_task(self, **kwargs) -> bool:
"""
Checks all the internal rules and returns whether all of them passed
successfully or not.
:param kwargs: any arguments needed by the rules
:return: True if all rules passed positively, False otherwise
"""
check_result = self._check_all_rules(**kwargs)
return check_result
def add_rule(
self, rule: BaseThrottlingRule, required_argument_names: List[str] = None
) -> "ThrottleManager":
"""
Includes a new rule to the manager together with the argument names
that the rule call expects.
:param rule: the rule instance
:param required_argument_names: the required argument names to execute
the check for that rule
:return: the ThrottleManager instance
"""
required_argument_names = required_argument_names or []
self._rules_to_check.append(rule)
self._rules_and_required_arguments[rule.__class__] = required_argument_names
return self
def _check_all_rules(self, **kwargs) -> bool:
"""
Executes checks (lazily) with the right arguments for each of them and
collects results.
:param kwargs: all passed arguments
:return: True if all checks passed, False otherwise
"""
checks = []
for rule in self._rules_to_check:
arguments_for_rule = {
argument_name: kwargs[argument_name]
for argument_name in self._rules_and_required_arguments[rule.__class__]
}
checks.append(rule(**arguments_for_rule))
if checks[-1] == False:
return False
return True

View file

@ -1,16 +1,18 @@
import sys import logging
sys.path.append("..")
from time import sleep from time import sleep
from core.config import refresher_delay
from db_layer.capturas_interface import capturas_interface from db_layer.capturas_interface import capturas_interface
from db_layer.capturing_tasks_interface import capturing_interface from db_layer.capturing_tasks_interface import capturing_interface
from core.config import refresher_delay
from core import my_logger
import logging
class Refresher: class Refresher:
def start(self): @staticmethod
def start() -> None:
"""
Execute main flow.
:return: None
"""
while True: while True:
sleep(refresher_delay) sleep(refresher_delay)
@ -28,12 +30,9 @@ class Refresher:
:param html: HTML del anuncio en string. :param html: HTML del anuncio en string.
:return: True si esta dado de baja, False si no. :return: True si esta dado de baja, False si no.
""" """
try: if "anunciante" in html and "baja" in html:
if ":-|" in html or "El anunciante lo dio de baja" in html: return True
return True else:
else:
return False
except TypeError:
return False return False

170
tests/capturer_test.py Normal file
View file

@ -0,0 +1,170 @@
from tests.mock_classes import (
MockCapturingInterface,
MockParsingFlow,
MockUrlAttackReturnsSuccess,
MockUrlAttackReturnsFailure,
)
from capturer.capturer import CapturingTask
def test_capturing_task_successful_task_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
fake_resulting_field_values = {
"a_field": {"a_value": 1},
"another_field": {"another_value": 2},
}
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=True,
mock_all_non_optional_fields_were_found=True,
mock_field_values_to_return=fake_resulting_field_values,
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
final_data = task.get_ad_data()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Data ready"
and fake_resulting_field_values == final_data
)
def test_capturing_task_dead_ad_task_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=False,
issues_to_return={"some_field": {"valid": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsFailure,
dead_ad_checker=lambda x: True,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Dead ad"
)
def test_capturing_task_invalid_fields_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=False,
issues_to_return={"some_field": {"valid": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Invalid value fields"
)
def test_capturing_task_missing_fields_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_non_optional_fields_were_found=False,
issues_to_return={"some_field": {"found": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Fields missing"
)
def test_capturing_task_unexpected_issue_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow()
mock_capturing_interface = MockCapturingInterface()
CapturingTask.sleep_time_failed_request = 0 # Override quite long sleep time
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsFailure,
dead_ad_checker=lambda x: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Surrender"
)

File diff suppressed because one or more lines are too long

97
tests/mock_classes.py Normal file
View file

@ -0,0 +1,97 @@
from collections import namedtuple
from typing import Dict
from bs4 import BeautifulSoup
from db_layer.capturing_tasks_interface import CapturingTasksInterface
from core.parsing_utils import ParsingFlow
from core.scrapping_utils import UrlAttack
class MockCapturingInterface(CapturingTasksInterface):
task_state_record = namedtuple(
"TaskStateRecord", ["uuid", "uuid_exploring", "status", "ad_url"]
)
def __init__(self):
self.tasks = {}
def update_capturing_task(self, uuid, uuid_exploring, status, ad_url):
if uuid not in self.tasks:
self.tasks[uuid] = []
self.tasks[uuid].append(
MockCapturingInterface.task_state_record(
uuid=uuid, uuid_exploring=uuid_exploring, status=status, ad_url=ad_url
)
)
class MockParsingFlow(ParsingFlow):
def __init__(
self,
issues_to_return: Dict[str, dict] = None,
mock_all_found_fields_are_valid: bool = True,
mock_all_non_optional_fields_were_found: bool = True,
mock_field_values_to_return: Dict[str, dict] = None,
):
args_with_empty_dict_as_default = [
issues_to_return,
mock_field_values_to_return,
]
for arg in args_with_empty_dict_as_default:
if arg is None:
arg = dict()
self._issues = issues_to_return
self._mock_all_found_fields_are_valid = mock_all_found_fields_are_valid
self._mock_field_values_to_return = mock_field_values_to_return
self._mock_all_non_optional_fields_were_found = (
mock_all_non_optional_fields_were_found
)
def execute_flow(self, soup: BeautifulSoup) -> None:
pass
@property
def issues(self) -> Dict[str, dict]:
return self._issues
@property
def all_found_fields_are_valid(self) -> bool:
return self._mock_all_found_fields_are_valid
@property
def all_non_optional_fields_were_found(self) -> bool:
return self._mock_all_non_optional_fields_were_found
@property
def field_values(self) -> Dict:
return self._mock_field_values_to_return
class MockUrlAttack(UrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def get_text(self) -> str:
return "<html>this_is_a_fake_html_string</html>"
class MockUrlAttackReturnsSuccess(MockUrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def attack(self) -> None:
self.success = True
self.has_been_attacked = True
class MockUrlAttackReturnsFailure(MockUrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def attack(self) -> None:
self.success = False
self.has_been_attacked = True

2755
tests/parsing_utils_test.py Normal file

File diff suppressed because one or more lines are too long

108
tests/throttling_test.py Normal file
View file

@ -0,0 +1,108 @@
import datetime
from core.throttling_utils import (
ThrottleManager,
CooldownThrottlingRule,
WorkingHoursThrottlingRule,
DynamicThrottlingRule,
)
def test_working_hours_throttling_rule_checks():
working_hours_rule = WorkingHoursThrottlingRule(
working_hours={
"start": (datetime.datetime.now() + datetime.timedelta(seconds=-5)).time(),
"end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(),
}
)
assert working_hours_rule() == True
def test_working_hours_throttling_rule_does_not_check():
working_hours_rule = WorkingHoursThrottlingRule(
working_hours={
"start": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(),
"end": (datetime.datetime.now() + datetime.timedelta(hours=2)).time(),
}
)
assert working_hours_rule() == False
def test_cooldown_throttling_rule_checks():
time_generator = lambda: 60
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
assert (
cooldown_rule(
last_attempt_timestamp=datetime.datetime.now()
+ datetime.timedelta(seconds=-120)
)
== True
)
def test_cooldown_throttling_rule_does_not_check():
time_generator = lambda: 60
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
assert cooldown_rule(last_attempt_timestamp=datetime.datetime.now()) == False
def test_dynamic_rule_checks():
mock_check = lambda: True
rule = DynamicThrottlingRule(any_callable=mock_check)
assert rule() == True
def test_dynamic_rule_does_not_check():
mock_check = lambda: False
rule = DynamicThrottlingRule(any_callable=mock_check)
assert rule() == False
def test_dynamic_rule_arguments_pass_properly():
def pass_a_bool(some_bool):
return some_bool
rule = DynamicThrottlingRule(pass_a_bool)
assert (rule(some_bool=True) == True) and (rule(some_bool=False) == False)
def test_throttle_manager_checks_rules():
throttle_manager = ThrottleManager()
def pass_a_bool(some_bool):
return some_bool
some_rules = [
WorkingHoursThrottlingRule(
working_hours={
"start": (
datetime.datetime.now() + datetime.timedelta(seconds=-5)
).time(),
"end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(),
}
),
CooldownThrottlingRule(cooldown_time_generator=lambda: 0),
DynamicThrottlingRule(any_callable=pass_a_bool),
]
some_arguments = [[], ["last_attempt_timestamp"], ["some_bool"]]
some_rules_and_arguments = zip(some_rules, some_arguments)
for rule, arguments in some_rules_and_arguments:
throttle_manager.add_rule(rule, required_argument_names=arguments)
assert throttle_manager.allow_next_task(
last_attempt_timestamp=datetime.datetime.now(), some_bool=True
)