Merge branch 'refactor/capturer_improved' into 'integration'

Refactor/capturer improved

See merge request pablomartincalvo/Drogon!1
This commit is contained in:
Pablo Martin 2021-01-04 21:23:16 +00:00
commit 6122f74e99
9 changed files with 4089 additions and 540 deletions

View file

@ -1,16 +1,18 @@
import sys
sys.path.append("..")
from time import sleep
from bs4 import BeautifulSoup
import re
import datetime
from db_layer.capturing_tasks_interface import capturing_interface
from db_layer.capturas_interface import capturas_interface
from db_layer.capturing_tasks_interface import CapturingTasksInterface
from db_layer.capturas_interface import CapturasInterface
from core.scrapping_utils import UrlAttack
from core.config import working_hours, minimum_seconds_between_tries
from core.throttling_utils import (
ThrottleManager,
WorkingHoursThrottlingRule,
CooldownThrottlingRule,
DynamicThrottlingRule,
)
from refresher.refresher import Refresher
from core import my_logger
from core.parsing_utils import *
import logging
@ -20,7 +22,35 @@ class Capturer:
scraping and db storage.
"""
def __init__(self) -> None:
def __init__(
self,
throttling_manager: ThrottleManager,
capturing_tasks_interface: CapturingTasksInterface,
capturas_interface: CapturasInterface,
parsing_flow_generator: ParsingFlowGenerator,
url_acquisition_object: Type[UrlAttack],
dead_ad_checker: Callable,
) -> None:
"""
Receive all required objects.
:param throttling_manager: takes care of deciding whether a task should
be started
:param capturing_tasks_interface: interface to interact with the tasks
database
:param capturas_interface: interface to interact with the ad database
:param parsing_flow_generator: an object capable of generating empty
parsing flows to give each task a new one
:param url_acquisition_object: gateway to obtaining the HTML of an url
:param dead_ad_checker: callable capable of checking if an ad is dead
through its HTML
"""
self._throttling_manager = throttling_manager
self._capturing_tasks_interface = capturing_tasks_interface
self._capturas_interface = capturas_interface
self._parsing_flow_generator = parsing_flow_generator
self._url_acquisition_object = url_acquisition_object
self._dead_ad_checker = dead_ad_checker
self.last_try_datetime = datetime.datetime.now()
def start(self) -> None:
@ -33,24 +63,23 @@ class Capturer:
logging.info("Starting capturer")
while True:
if not self._in_working_hours():
sleep(1800)
while not self._throttling_manager.allow_next_task(
last_attempt_timestamp=self.last_try_datetime
):
sleep(10)
logging.info("Waiting...")
continue
seconds_to_next_capture = (
minimum_seconds_between_tries() - self._seconds_since_last_try()
pending_task = self._capturing_tasks_interface.get_pending_task()
logging.info("Got a task")
task = CapturingTask(
pending_task,
capturing_interface=self._capturing_tasks_interface,
new_parsing_flow=self._parsing_flow_generator.get_new_flow(),
url_acquisition_object=self._url_acquisition_object,
dead_ad_checker=self._dead_ad_checker,
)
if seconds_to_next_capture > 0:
sleep(seconds_to_next_capture)
logging.info("Waiting...")
pending_task = capturing_interface.get_pending_task()
if not pending_task:
logging.info("No pending tasks.")
continue
task = CapturingTask(pending_task)
self.last_try_datetime = datetime.datetime.now()
task.capture()
@ -60,29 +89,10 @@ class Capturer:
logging.warning("Something went wrong, not adding data.")
continue
capturas_interface.insert_captura(ad_data)
task._update_status("Captura inserted")
self._capturas_interface.insert_captura(ad_data)
task.update_status("Captura inserted")
logging.info("New ad inserted.")
@staticmethod
def _in_working_hours() -> bool:
"""
Checks whether now is within the working hours of the daemon.
:return: True if so, false if not
"""
return (
working_hours["start"]
<= datetime.datetime.now().time()
<= working_hours["end"]
)
def _seconds_since_last_try(self) -> float:
"""
Computes how many seconds have passed since the last capturing attempt
:return: seconds since last try as integer
"""
return (datetime.datetime.now() - self.last_try_datetime).total_seconds()
class CapturingTask:
"""
@ -92,29 +102,44 @@ class CapturingTask:
sleep_time_failed_request = 180
def __init__(self, parameters) -> None:
def __init__(
self,
task_parameters: dict,
capturing_interface: CapturingTasksInterface,
new_parsing_flow: ParsingFlow,
url_acquisition_object: Type[UrlAttack],
dead_ad_checker: Callable,
) -> None:
"""
Initialize with task parameters and mark the task as being worked on
in the task queue.
:param parameters: dict with the necessary parameters for the task
:param task_parameters: dict with the necessary parameters for the task
:param capturing_interface: interface to interact with the ad database
:param new_parsing_flow: an empty parsing flow
:param url_acquisition_object: gateway to obtaining the HTML of an url
:param dead_ad_checker: callable capable of checking if an ad is dead
"""
self.uuid = parameters["uuid"]
self.ad_url = parameters["ad_url"]
self.uuid_exploring = parameters["fk_uuid_exploring"]
self.status = parameters["status"]
self.uuid = task_parameters["uuid"]
self.ad_url = task_parameters["ad_url"]
self.uuid_exploring = task_parameters["fk_uuid_exploring"]
self.status = task_parameters["status"]
self.request_failures = 1
self.html = None
self._parsing_flow = new_parsing_flow
self._capturing_interface = capturing_interface
self._url_acquistion_object = url_acquisition_object
self._is_dead_ad = dead_ad_checker
self._update_status("Loading")
self.update_status("Loading")
def _update_status(self, new_status) -> None:
def update_status(self, new_status) -> None:
"""
Updates the task status and persists it in the task queue.
:param new_status: string describing the new status
:return: None
"""
self.status = new_status
capturing_interface.update_capturing_task(
self._capturing_interface.update_capturing_task(
self.uuid, self.uuid_exploring, self.status, self.ad_url
)
@ -122,288 +147,128 @@ class CapturingTask:
"""
Main flow of work
"""
self._update_status("WIP")
self.update_status("WIP")
while self.request_failures < 4:
attack = UrlAttack(self.ad_url)
attack = self._url_acquistion_object(self.ad_url)
attack.attack()
if attack.success:
self.html = attack.get_text()
self._extract_data()
self._check_data()
self._parse_html(html=attack.get_text())
return
if not attack.success:
try:
if Refresher.dead_ad_checker(attack.get_text()):
self._update_status("Dead ad")
if self._is_dead_ad(attack.get_text()):
self.update_status("Dead ad")
return
except AttributeError:
logging.error(
"Something went wrong when checking if the ad is gone"
)
self._update_status("Fail {}".format(self.request_failures))
self.update_status("Fail {}".format(self.request_failures))
self.request_failures += 1
sleep(CapturingTask.sleep_time_failed_request)
continue
self._update_status("Surrender")
self.update_status("Surrender")
logging.warning(f"A task has surrendered. {self.ad_url}")
def _extract_data(self) -> None:
"""
Parses the obtained html to extract the ad information.
:return: None
"""
self.parser = AdHtmlParser(self.html)
self.parser.parse()
def _check_data(self) -> None:
"""
Validates that all compulsory fields have been obtained and that the
values are within the expected. Sets the status of task accordingly.
:return: None
"""
if self.parser.fields_missing():
self._update_status("Fields missing")
return
if not self.parser.all_fields_are_valid():
self._update_status("Invalid value fields")
return
self._update_status("Data ready")
def get_ad_data(self) -> dict:
"""
Returns the extracted data.
:return: dictionary with the data of the ad.
"""
return self.parser.get_data()
return self._parsing_flow.field_values
class AdHtmlParser:
def _parse_html(self, html: str) -> None:
"""
Object for parsing, storing and validating the data of the HTML of an ad.
"""
def __init__(self, html_string: str) -> None:
"""
Initializes an instance of the parser with the HTML of an ad.
:param html_string: the full HTML code of the ad page
"""
self.html = html_string
self.ad_fields = {
"referencia": {"found": False, "optional": False, "value": None},
"precio": {"found": False, "optional": False, "value": None},
"tamano_categorico": {"found": False, "optional": True, "value": None},
"m2": {"found": False, "optional": True, "value": None},
"tipo_anuncio": {"found": False, "optional": False, "value": None},
"calle": {"found": False, "optional": True, "value": None},
"barrio": {"found": False, "optional": False, "value": None},
"distrito": {"found": False, "optional": False, "value": None},
"ciudad": {"found": False, "optional": False, "value": None},
"cubierta": {"found": False, "optional": False, "value": None},
"puerta_auto": {"found": False, "optional": False, "value": None},
"ascensor": {"found": False, "optional": False, "value": None},
"alarma": {"found": False, "optional": False, "value": None},
"circuito": {"found": False, "optional": False, "value": None},
"personal": {"found": False, "optional": False, "value": None},
"telefono": {"found": False, "optional": True, "value": None},
}
def parse(self) -> None:
"""
Parses the HTML and stores the ad data.
Execute the complete parsing flow and report the task status depending
on the outcome.
:param html: the HTML of the ad
:return: None
"""
self._parsing_flow.execute_flow(soup=BeautifulSoup(html, "html5lib"))
soup = BeautifulSoup(self.html, "html5lib")
if not self._parsing_flow.issues:
self.update_status("Data ready")
return
if soup.find_all("link", {"rel": "canonical"}) is not None:
self.ad_fields["referencia"]["value"] = re.findall(
r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0])
)[0]
self.ad_fields["referencia"]["found"] = True
if soup.find_all("strong", {"class": "price"}) is not None:
self.ad_fields["precio"]["value"] = "".join(
re.findall(
r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0])
if not self._parsing_flow.all_found_fields_are_valid:
self.update_status("Invalid value fields")
logging.warning(f"Invalid fields found in ad: {self.ad_url}")
logging.warning(f"{self._parsing_flow.issues}")
return
if not self._parsing_flow.all_non_optional_fields_were_found:
self.update_status("Fields missing")
logging.warning(
f"Couldn't scrap necessary fields: {self._parsing_flow.issues}"
)
)
self.ad_fields["precio"]["found"] = True
if soup.find("div", {"class": "info-features"}) is not None:
try:
if (
""
not in soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
):
self.ad_fields["tamano_categorico"]["value"] = (
soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
)
self.ad_fields["tamano_categorico"]["found"] = True
except:
pass
posible_m2 = [
tag.text
for tag in soup.find("div", {"class": "info-features"}).find_all("span")
]
if [posible for posible in posible_m2 if "" in posible]:
self.ad_fields["m2"]["value"] = [
"".join(re.findall(r"[0-9]+,*[0-9]*", posible))
for posible in posible_m2
if "" in posible
][0].replace(",", ".")
self.ad_fields["m2"]["found"] = True
if soup.find("title") is not None:
if "venta" in soup.find("title").text:
self.ad_fields["tipo_anuncio"]["value"] = 1
else:
self.ad_fields["tipo_anuncio"]["value"] = 2
self.ad_fields["tipo_anuncio"]["found"] = True
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3:
self.ad_fields["calle"]["value"] = ""
self.ad_fields["ciudad"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip()
)
self.ad_fields["ciudad"]["found"] = True
self.ad_fields["distrito"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip()
)
self.ad_fields["distrito"]["found"] = True
self.ad_fields["barrio"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip()
)
self.ad_fields["barrio"]["found"] = True
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4:
self.ad_fields["calle"]["value"] = (
soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip()
)
self.ad_fields["calle"]["found"] = True
features_lists = soup.find_all("div", {"class": "details-property_features"})
features = [
feature.text
for feature_list in features_lists
for feature in feature_list.find_all("li")
]
self.ad_fields["cubierta"]["value"] = 1 * any(
"Cubierta" in feature for feature in features
)
self.ad_fields["puerta_auto"]["value"] = 1 * any(
"Puerta" in feature for feature in features
)
self.ad_fields["ascensor"]["value"] = 1 * any(
"ascensor" in feature for feature in features
)
self.ad_fields["alarma"]["value"] = 1 * any(
"Alarma" in feature for feature in features
)
self.ad_fields["circuito"]["value"] = 1 * any(
"Cámaras" in feature for feature in features
)
self.ad_fields["personal"]["value"] = 1 * any(
"Personal" in feature for feature in features
)
self.ad_fields["cubierta"]["found"] = True
self.ad_fields["puerta_auto"]["found"] = True
self.ad_fields["ascensor"]["found"] = True
self.ad_fields["alarma"]["found"] = True
self.ad_fields["circuito"]["found"] = True
self.ad_fields["personal"]["found"] = True
if soup.find("p", {"class": "txt-bold _browserPhone icon-phone"}) is not None:
self.ad_fields["telefono"]["value"] = soup.find(
"p", {"class": "txt-bold _browserPhone icon-phone"}
).text.replace(" ", "")
self.ad_fields["telefono"]["found"] = True
def _validate(self) -> None:
"""
Checks whether the extracted values are valid against the expected
typology. Stores the results.
:return: None
"""
self.invalid_fields = []
if not re.match(r"[0-9]{4,20}", self.ad_fields["referencia"]["value"]):
self.invalid_fields.append("referencia")
if not re.match(r"[0-9]{1,20}", self.ad_fields["precio"]["value"]):
self.invalid_fields.append("precio")
possible_values_tamano = [
"2 coches o más",
"coche y moto",
"coche grande",
"coche pequeño",
"moto",
None,
]
if self.ad_fields["tamano_categorico"]["value"] not in possible_values_tamano:
self.invalid_fields.append("tamano_categorico")
if not "Barrio" in self.ad_fields["barrio"]["value"]:
self.invalid_fields.append("barrio")
if not "Distrito" in self.ad_fields["distrito"]["value"]:
self.invalid_fields.append("distrito")
if self.ad_fields["telefono"]["found"] and not re.match(
r"\s*\+?[0-9\s]*", self.ad_fields["telefono"]["value"]
):
self.invalid_fields.append("telefono")
# TODO añadir + a caracteres validos
def all_fields_are_valid(self) -> bool:
"""
Reports on whether the extracted data is valid.
:return: True if values are valid, false if not
"""
self._validate()
if self.invalid_fields:
return False
else:
return True
def fields_missing(self) -> None:
"""
Reports on whether all compulsory fields are present.
:return: True if some field is missing, false if not
"""
for key, contents in self.ad_fields.items():
if not contents["optional"] and not contents["found"]:
return True
return False
def get_data(self) -> dict:
"""
Returns the extracted data in the form of a dictionary.
:return: dictionary with the extracted data
"""
data = {}
for ad_field in self.ad_fields.keys():
data[ad_field] = self.ad_fields[ad_field]["value"]
return data
return
if __name__ == "__main__":
capturer = Capturer()
capturing_tasks_interface = CapturingTasksInterface()
capturas_interface = CapturasInterface()
throttling_manager = ThrottleManager()
throttling_manager.add_rule(WorkingHoursThrottlingRule(working_hours)).add_rule(
CooldownThrottlingRule(minimum_seconds_between_tries),
required_argument_names=["last_attempt_timestamp"],
).add_rule(
DynamicThrottlingRule(
lambda: bool(capturing_tasks_interface.get_pending_task())
)
)
parsing_flow_generator = ParsingFlowGenerator(
ParsingFlow,
(
(ReferenciaFieldInstructions, {}),
(PrecioFieldInstructions, {}),
(TamanoCategoricoFieldInstructions, {}),
(M2FieldInstructions, {}),
(TipoAnuncioFieldInstructions, {}),
(CalleFieldInstructions, {}),
(BarrioFieldInstructions, {}),
(DistritoFieldInstructions, {}),
(CiudadFieldInstructions, {}),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "cubierta", "search_keyword": "Cubierta"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "puerta_auto", "search_keyword": "Puerta"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "ascensor", "search_keyword": "ascensor"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "alarma", "search_keyword": "Alarma"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "circuito", "search_keyword": "Cámaras"},
),
(
SecondaryFeaturesFieldInstructions,
{"field_name": "personal", "search_keyword": "Personal"},
),
(TelefonoFieldInstructions, {}),
),
)
capturer = Capturer(
throttling_manager=throttling_manager,
capturing_tasks_interface=capturing_tasks_interface,
capturas_interface=capturas_interface,
parsing_flow_generator=parsing_flow_generator,
url_acquisition_object=UrlAttack,
dead_ad_checker=Refresher.dead_ad_checker,
)
capturer.start()

597
core/parsing_utils.py Normal file
View file

@ -0,0 +1,597 @@
from typing import Union, Iterable, Dict, Callable, Type, Tuple
import re
from bs4 import BeautifulSoup
class BaseTargetFieldInstructions:
"""
Abstract class for all field instructions. Implements useful decorators as
well as the main interface.
"""
class Decorators:
"""
Decorators to use across all field instructions.
"""
@classmethod
def fail_safe_scrape(cls, f: Callable) -> Callable:
"""
Wraps a scrape action in a try-except to control any errors, and
updates the state of the search accordingly.
:param f: the scrape function
:return: the wrapped function
"""
def wrapper(self, soup: BeautifulSoup):
try:
return f(self, soup)
except Exception as e:
self.found = False
self.search_issue = e
return self
return wrapper
@classmethod
def if_not_found_do_nothing(cls, f: Callable) -> Callable:
"""
Wraps a function to only execute it if the field has been found in
the html. Otherwise, do nothing.
:param f: the function that might get executed
:return: the wrapped function
"""
def wrapper(self):
if self.found:
return f(self)
return self
return wrapper
def __init__(self) -> None:
"""
Initialize attributes.
"""
self.is_optional = False
self.found = None
self.valid = None
self.value = None
self.search_issue = None
def scrape(self, soup: BeautifulSoup) -> None:
"""
Interface for the scrape method.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
raise NotImplementedError()
def validate(self) -> None:
"""
Interface for the validate method.
:return: None
"""
raise NotImplementedError()
class ReferenciaFieldInstructions(BaseTargetFieldInstructions):
"""
Instructions for field Referencia.
"""
field_name = "referencia"
def __init__(self) -> None:
"""
Initialize all default parameters.
"""
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "ReferenciaFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = re.findall(
r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0])
)[0]
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "ReferenciaFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{4,20}", self.value):
self.valid = True
return self
class TamanoCategoricoFieldInstructions(BaseTargetFieldInstructions):
field_name = "tamano_categorico"
possible_values = [
"2 coches o más",
"coche y moto",
"coche grande",
"coche pequeño",
"moto",
None,
]
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TamanoCategoricoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if (
""
not in soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
):
self.value = (
soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TamanoCategoricoFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in TamanoCategoricoFieldInstructions.possible_values:
self.valid = True
return self
class PrecioFieldInstructions(BaseTargetFieldInstructions):
field_name = "precio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "PrecioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = "".join(
re.findall(r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0]))
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "PrecioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,20}", self.value):
self.valid = True
return self
class M2FieldInstructions(BaseTargetFieldInstructions):
field_name = "m2"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "M2FieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
posible_m2 = [
tag.text
for tag in soup.find("div", {"class": "info-features"}).find_all("span")
]
if [posible for posible in posible_m2 if "" in posible]:
self.value = [
"".join(re.findall(r"[0-9]+,*[0-9]*", posible))
for posible in posible_m2
if "" in posible
][0].replace(",", ".")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "M2FieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,4}", self.value):
self.valid = True
return self
class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions):
field_name = "tipo_anuncio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TipoAnuncioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if "venta" in soup.find("title").text:
self.value = 1
self.found = True
if "Alquiler" in soup.find("title").text:
self.value = 2
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TipoAnuncioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in [1, 2]:
self.valid = True
return self
class CalleFieldInstructions(BaseTargetFieldInstructions):
field_name = "calle"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CalleFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3:
self.value = ""
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4:
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CalleFieldInstructions":
self.valid = True
return self
class BarrioFieldInstructions(BaseTargetFieldInstructions):
field_name = "barrio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "BarrioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "BarrioFieldInstructions":
self.valid = True
return self
class DistritoFieldInstructions(BaseTargetFieldInstructions):
field_name = "distrito"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "DistritoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "DistritoFieldInstructions":
self.valid = True
return self
class CiudadFieldInstructions(BaseTargetFieldInstructions):
field_name = "ciudad"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CiudadFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CiudadFieldInstructions":
self.valid = True
return self
class SecondaryFeaturesFieldInstructions(BaseTargetFieldInstructions):
"""
Shared methods for secondary features found in a list in ads.
"""
def __init__(self, field_name: str, search_keyword: str):
super().__init__()
self.field_name = field_name
self._feature_keyword = search_keyword
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "SecondaryFeaturesFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
return self._find_feature_with_keyword(soup=soup, keyword=self._feature_keyword)
def _find_feature_with_keyword(
self, soup: BeautifulSoup, keyword: str
) -> "SecondaryFeaturesFieldInstructions":
"""
Checks if a feature is in the secondary list by keyword and stores the
value if found.
:param soup: a BeautifulSoup object for the target html
:param keyword: the keyword for that feature
:return: self
"""
features_lists = soup.find_all("div", {"class": "details-property_features"})
features = [
feature.text
for feature_list in features_lists
for feature in feature_list.find_all("li")
]
if not features:
self.found = False
return self
self.value = 1 * any(keyword in feature for feature in features)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "SecondaryFeaturesFieldInstructions":
self.valid = False
if self.value in [0, 1]:
self.valid = True
return self
class TelefonoFieldInstructions(BaseTargetFieldInstructions):
field_name = "telefono"
def __init__(self):
"""
Check if the obtained value fits the expected format.
:return: self
"""
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TelefonoFieldInstructions":
self.value = soup.find(
"p", {"class": "txt-bold _browserPhone icon-phone"}
).text.replace(" ", "")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TelefonoFieldInstructions":
self.valid = False
if re.match(r"\s*\+?[0-9\s]*", self.value):
self.valid = True
return self
class ParsingFlow:
"""
Object to gather all instructions for a job run, execute them and present
the results.
"""
def __init__(self) -> None:
"""
Initialize the instruction list.
"""
self._instructions = []
def add_instructions(
self,
instructions: Union[
BaseTargetFieldInstructions, Iterable[BaseTargetFieldInstructions]
],
):
"""
Include new instructions to the internal list.
:param instructions: a single or iterable group of instructions
:return: self
"""
if isinstance(instructions, BaseTargetFieldInstructions):
self._instructions.append(instructions)
return self
self._instructions.extend(instructions)
return self
def execute_flow(self, soup: BeautifulSoup) -> None:
"""
Scraped and validate all fields according to instructions.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
for instruction in self._instructions:
instruction.scrape(soup).validate()
@property
def field_values(self) -> Dict:
"""
Return the value for all fields, or None.
:return: a dict with the field names and values
"""
return {field.field_name: field.value for field in self._instructions}
@property
def all_found_fields_are_valid(self) -> bool:
"""
Check if all found fields are valid.
:return: True if the fields are valid, False otherwise
"""
relevant_fields = [
field.valid for field in self._instructions if field.found is True
]
return all(relevant_fields)
@property
def all_non_optional_fields_were_found(self) -> bool:
"""
Check if all compulsory fields were found.
:return: True if the fields were found, False otherwise
"""
found_or_not = [
field.found or field.is_optional for field in self._instructions
]
return all(found_or_not)
@property
def issues(self) -> Dict[str, dict]:
"""
Returns all identified issues during scraping and validation.
:return: the issues, bucketed by field
"""
issues = {}
for field in self._instructions:
if (field.found or field.is_optional) and (
field.valid is True or field.valid is None
):
continue
this_field_issues = {}
if not field.found and not field.is_optional:
this_field_issues["found"] = "Not found"
if field.search_issue:
this_field_issues["search_issue"] = field.search_issue
if not field.valid and field.valid is not None:
this_field_issues["validity"] = "Not valid"
this_field_issues["value"] = field.value
issues[field.field_name] = this_field_issues
return issues
class ParsingFlowGenerator:
"""
Class for creating multiple, empty flows based on a group of instructions.
"""
def __init__(
self,
parsing_flow_class: Type[ParsingFlow],
instructions_to_attach_with_params: Union[
Tuple[Type[BaseTargetFieldInstructions], Dict],
Tuple[Tuple[Type[BaseTargetFieldInstructions], Dict]],
],
) -> None:
"""
Set the flow class and group of instructions to use when creating new
instances of the flow class.
:param parsing_flow_class: the flow class to instantiate
:param instructions_to_attach_with_params: one or more pair of field
instructions class and the paramteres to use when instantiating them
"""
self._parsing_flow_class = parsing_flow_class
if not isinstance(instructions_to_attach_with_params, tuple):
instructions_to_attach_with_params = tuple(
instructions_to_attach_with_params
)
self._instructions_to_attach_with_params = instructions_to_attach_with_params
def get_new_flow(self) -> ParsingFlow:
"""
Instantiate a new parsing flow with the instantiated classes attached.
:return: the new parsing flow
"""
new_parsing_flow = self._parsing_flow_class()
for instruction, params in self._instructions_to_attach_with_params:
new_parsing_flow.add_instructions(instruction(**params))
return new_parsing_flow

View file

@ -92,10 +92,13 @@ class UrlAttack:
if self.response.ok:
self.success = True
except Exception as e:
except Exception:
self.success = False
if random.randrange(0, 100) < UrlAttack.identity_change_probability:
if (
not self.success
or random.randrange(0, 100) < UrlAttack.identity_change_probability
):
self._change_identity()
def _change_identity(self) -> None:

190
core/throttling_utils.py Normal file
View file

@ -0,0 +1,190 @@
from typing import List, Callable
import datetime
class BaseThrottlingRule:
"""
Interface for all throttling rules.
"""
def __call__(self, **kwargs) -> bool:
"""
Upon calling the rule itself, the underlying check gets executed.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
return self._check_rule(**kwargs)
def _check_rule(self, **kwargs) -> bool:
"""
Interface for internal method to check the rule.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
raise NotImplementedError
class WorkingHoursThrottlingRule(BaseThrottlingRule):
"""
Rule for checking if current time is within the defined working hours.
"""
def __init__(self, working_hours: dict) -> None:
"""
Set the working hours as a dict with "start" and "end" keys, which
contain time objects.
:param working_hours: the definition of the working hours range
:return: None
"""
self._working_hours = working_hours
def _check_rule(self) -> bool:
"""
Call underyling check method.
:return: True if the check is OK, False otherwise
"""
return self._inside_working_hours()
def _inside_working_hours(self) -> bool:
"""
Checks if the current time is between the defined window of working
hours.
:return: True if within range, False otherwise
"""
return (
self._working_hours["start"]
<= datetime.datetime.now().time()
<= self._working_hours["end"]
)
class CooldownThrottlingRule(BaseThrottlingRule):
"""
Rule for checking if a certain time period has passed since the last
execution.
:attribute required_arguments: the list with arguments expected to be ready
for unpacking when checking the rule.
"""
required_arguments = ["last_attempt_timestamp"]
def __init__(self, cooldown_time_generator: Callable) -> None:
"""
Set the passed cooldown timer generator.
:param cooldown_time_generator: a callable object that returns some
number of seconds. Can be random or static.
"""
self._cooldown_time_generator = cooldown_time_generator
self._current_cooldown_time = self._cooldown_time_generator()
def _check_rule(self, **kwargs) -> bool:
"""
Unpack argument and call underyling check method.
:return: True if the check is OK, False otherwise
"""
last_attempt_timestamp = kwargs["last_attempt_timestamp"]
return self._check_if_cooldowned(last_attempt_timestamp)
def _check_if_cooldowned(self, last_attempt_timestamp: datetime) -> bool:
"""
Checks if the cooldown time has passed. If so, set a new one.
:param last_attempt_timestamp: timestamp for the last time whatever
must be throttled happened.
:return: True if the cooldown time has passed, False otherwise
"""
cooldown_release_timestamp = last_attempt_timestamp + datetime.timedelta(
seconds=self._current_cooldown_time
)
if datetime.datetime.now() > cooldown_release_timestamp:
self._current_cooldown_time = self._cooldown_time_generator()
return True
return False
class DynamicThrottlingRule(BaseThrottlingRule):
"""
A basic interface to dynamically set any function, optionally with
arguments, as a throttling rule.
"""
def __init__(self, any_callable: Callable) -> None:
"""
Sets the callable that will act as a check. Only condition is that the
callable should return a boolean value.
:param any_callable: the check callable object
"""
self._some_rule = any_callable
def _check_rule(self, **kwargs) -> bool:
"""
Calls the dynamically set callable while passing any given arguments.
:param kwargs: arguments for check
:return: True if the check is OK, False otherwise
"""
return self._some_rule(**kwargs)
class ThrottleManager:
"""
Holds and runs all throttling rules on demand.
"""
def __init__(self) -> None:
"""
Initialize internal attributes.
"""
self._rules_to_check = []
self._rules_and_required_arguments = dict()
def allow_next_task(self, **kwargs) -> bool:
"""
Checks all the internal rules and returns whether all of them passed
successfully or not.
:param kwargs: any arguments needed by the rules
:return: True if all rules passed positively, False otherwise
"""
check_result = self._check_all_rules(**kwargs)
return check_result
def add_rule(
self, rule: BaseThrottlingRule, required_argument_names: List[str] = None
) -> "ThrottleManager":
"""
Includes a new rule to the manager together with the argument names
that the rule call expects.
:param rule: the rule instance
:param required_argument_names: the required argument names to execute
the check for that rule
:return: the ThrottleManager instance
"""
required_argument_names = required_argument_names or []
self._rules_to_check.append(rule)
self._rules_and_required_arguments[rule.__class__] = required_argument_names
return self
def _check_all_rules(self, **kwargs) -> bool:
"""
Executes checks (lazily) with the right arguments for each of them and
collects results.
:param kwargs: all passed arguments
:return: True if all checks passed, False otherwise
"""
checks = []
for rule in self._rules_to_check:
arguments_for_rule = {
argument_name: kwargs[argument_name]
for argument_name in self._rules_and_required_arguments[rule.__class__]
}
checks.append(rule(**arguments_for_rule))
if checks[-1] == False:
return False
return True

170
tests/capturer_test.py Normal file
View file

@ -0,0 +1,170 @@
from tests.mock_classes import (
MockCapturingInterface,
MockParsingFlow,
MockUrlAttackReturnsSuccess,
MockUrlAttackReturnsFailure,
)
from capturer.capturer import CapturingTask
def test_capturing_task_successful_task_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
fake_resulting_field_values = {
"a_field": {"a_value": 1},
"another_field": {"another_value": 2},
}
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=True,
mock_all_non_optional_fields_were_found=True,
mock_field_values_to_return=fake_resulting_field_values,
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
final_data = task.get_ad_data()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Data ready"
and fake_resulting_field_values == final_data
)
def test_capturing_task_dead_ad_task_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=False,
issues_to_return={"some_field": {"valid": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsFailure,
dead_ad_checker=lambda x: True,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Dead ad"
)
def test_capturing_task_invalid_fields_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_found_fields_are_valid=False,
issues_to_return={"some_field": {"valid": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Invalid value fields"
)
def test_capturing_task_missing_fields_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow(
mock_all_non_optional_fields_were_found=False,
issues_to_return={"some_field": {"found": False}},
)
mock_capturing_interface = MockCapturingInterface()
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsSuccess,
dead_ad_checker=lambda: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Fields missing"
)
def test_capturing_task_unexpected_issue_surrender_flow():
the_task_parameters = dict()
the_task_parameters["uuid"] = "test_uuid"
the_task_parameters["ad_url"] = "test_url"
the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid"
the_task_parameters["status"] = "Pending"
mock_parsing_flow = MockParsingFlow()
mock_capturing_interface = MockCapturingInterface()
CapturingTask.sleep_time_failed_request = 0 # Override quite long sleep time
task = CapturingTask(
task_parameters=the_task_parameters,
capturing_interface=mock_capturing_interface,
new_parsing_flow=mock_parsing_flow,
url_acquisition_object=MockUrlAttackReturnsFailure,
dead_ad_checker=lambda x: False,
)
task.capture()
assert (
len(mock_capturing_interface.tasks) == 1
and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status
== "Surrender"
)

File diff suppressed because one or more lines are too long

97
tests/mock_classes.py Normal file
View file

@ -0,0 +1,97 @@
from collections import namedtuple
from typing import Dict
from bs4 import BeautifulSoup
from db_layer.capturing_tasks_interface import CapturingTasksInterface
from core.parsing_utils import ParsingFlow
from core.scrapping_utils import UrlAttack
class MockCapturingInterface(CapturingTasksInterface):
task_state_record = namedtuple(
"TaskStateRecord", ["uuid", "uuid_exploring", "status", "ad_url"]
)
def __init__(self):
self.tasks = {}
def update_capturing_task(self, uuid, uuid_exploring, status, ad_url):
if uuid not in self.tasks:
self.tasks[uuid] = []
self.tasks[uuid].append(
MockCapturingInterface.task_state_record(
uuid=uuid, uuid_exploring=uuid_exploring, status=status, ad_url=ad_url
)
)
class MockParsingFlow(ParsingFlow):
def __init__(
self,
issues_to_return: Dict[str, dict] = None,
mock_all_found_fields_are_valid: bool = True,
mock_all_non_optional_fields_were_found: bool = True,
mock_field_values_to_return: Dict[str, dict] = None,
):
args_with_empty_dict_as_default = [
issues_to_return,
mock_field_values_to_return,
]
for arg in args_with_empty_dict_as_default:
if arg is None:
arg = dict()
self._issues = issues_to_return
self._mock_all_found_fields_are_valid = mock_all_found_fields_are_valid
self._mock_field_values_to_return = mock_field_values_to_return
self._mock_all_non_optional_fields_were_found = (
mock_all_non_optional_fields_were_found
)
def execute_flow(self, soup: BeautifulSoup) -> None:
pass
@property
def issues(self) -> Dict[str, dict]:
return self._issues
@property
def all_found_fields_are_valid(self) -> bool:
return self._mock_all_found_fields_are_valid
@property
def all_non_optional_fields_were_found(self) -> bool:
return self._mock_all_non_optional_fields_were_found
@property
def field_values(self) -> Dict:
return self._mock_field_values_to_return
class MockUrlAttack(UrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def get_text(self) -> str:
return "<html>this_is_a_fake_html_string</html>"
class MockUrlAttackReturnsSuccess(MockUrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def attack(self) -> None:
self.success = True
self.has_been_attacked = True
class MockUrlAttackReturnsFailure(MockUrlAttack):
def __init__(self, url: str) -> None:
super().__init__(url=url)
def attack(self) -> None:
self.success = False
self.has_been_attacked = True

2755
tests/parsing_utils_test.py Normal file

File diff suppressed because one or more lines are too long

108
tests/throttling_test.py Normal file
View file

@ -0,0 +1,108 @@
import datetime
from core.throttling_utils import (
ThrottleManager,
CooldownThrottlingRule,
WorkingHoursThrottlingRule,
DynamicThrottlingRule,
)
def test_working_hours_throttling_rule_checks():
working_hours_rule = WorkingHoursThrottlingRule(
working_hours={
"start": (datetime.datetime.now() + datetime.timedelta(seconds=-5)).time(),
"end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(),
}
)
assert working_hours_rule() == True
def test_working_hours_throttling_rule_does_not_check():
working_hours_rule = WorkingHoursThrottlingRule(
working_hours={
"start": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(),
"end": (datetime.datetime.now() + datetime.timedelta(hours=2)).time(),
}
)
assert working_hours_rule() == False
def test_cooldown_throttling_rule_checks():
time_generator = lambda: 60
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
assert (
cooldown_rule(
last_attempt_timestamp=datetime.datetime.now()
+ datetime.timedelta(seconds=-120)
)
== True
)
def test_cooldown_throttling_rule_does_not_check():
time_generator = lambda: 60
cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator)
assert cooldown_rule(last_attempt_timestamp=datetime.datetime.now()) == False
def test_dynamic_rule_checks():
mock_check = lambda: True
rule = DynamicThrottlingRule(any_callable=mock_check)
assert rule() == True
def test_dynamic_rule_does_not_check():
mock_check = lambda: False
rule = DynamicThrottlingRule(any_callable=mock_check)
assert rule() == False
def test_dynamic_rule_arguments_pass_properly():
def pass_a_bool(some_bool):
return some_bool
rule = DynamicThrottlingRule(pass_a_bool)
assert (rule(some_bool=True) == True) and (rule(some_bool=False) == False)
def test_throttle_manager_checks_rules():
throttle_manager = ThrottleManager()
def pass_a_bool(some_bool):
return some_bool
some_rules = [
WorkingHoursThrottlingRule(
working_hours={
"start": (
datetime.datetime.now() + datetime.timedelta(seconds=-5)
).time(),
"end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(),
}
),
CooldownThrottlingRule(cooldown_time_generator=lambda: 0),
DynamicThrottlingRule(any_callable=pass_a_bool),
]
some_arguments = [[], ["last_attempt_timestamp"], ["some_bool"]]
some_rules_and_arguments = zip(some_rules, some_arguments)
for rule, arguments in some_rules_and_arguments:
throttle_manager.add_rule(rule, required_argument_names=arguments)
assert throttle_manager.allow_next_task(
last_attempt_timestamp=datetime.datetime.now(), some_bool=True
)