drogon/core/parsing_utils.py

597 lines
18 KiB
Python

from typing import Union, Iterable, Dict, Callable, Type, Tuple
import re
from bs4 import BeautifulSoup
class BaseTargetFieldInstructions:
"""
Abstract class for all field instructions. Implements useful decorators as
well as the main interface.
"""
class Decorators:
"""
Decorators to use across all field instructions.
"""
@classmethod
def fail_safe_scrape(cls, f: Callable) -> Callable:
"""
Wraps a scrape action in a try-except to control any errors, and
updates the state of the search accordingly.
:param f: the scrape function
:return: the wrapped function
"""
def wrapper(self, soup: BeautifulSoup):
try:
return f(self, soup)
except Exception as e:
self.found = False
self.search_issue = e
return self
return wrapper
@classmethod
def if_not_found_do_nothing(cls, f: Callable) -> Callable:
"""
Wraps a function to only execute it if the field has been found in
the html. Otherwise, do nothing.
:param f: the function that might get executed
:return: the wrapped function
"""
def wrapper(self):
if self.found:
return f(self)
return self
return wrapper
def __init__(self) -> None:
"""
Initialize attributes.
"""
self.is_optional = False
self.found = None
self.valid = None
self.value = None
self.search_issue = None
def scrape(self, soup: BeautifulSoup) -> None:
"""
Interface for the scrape method.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
raise NotImplementedError()
def validate(self) -> None:
"""
Interface for the validate method.
:return: None
"""
raise NotImplementedError()
class ReferenciaFieldInstructions(BaseTargetFieldInstructions):
"""
Instructions for field Referencia.
"""
field_name = "referencia"
def __init__(self) -> None:
"""
Initialize all default parameters.
"""
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "ReferenciaFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = re.findall(
r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0])
)[0]
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "ReferenciaFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{4,20}", self.value):
self.valid = True
return self
class TamanoCategoricoFieldInstructions(BaseTargetFieldInstructions):
field_name = "tamano_categorico"
possible_values = [
"2 coches o más",
"coche y moto",
"coche grande",
"coche pequeño",
"moto",
None,
]
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TamanoCategoricoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if (
""
not in soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
):
self.value = (
soup.find("div", {"class": "info-features"})
.find("span")
.find("span")
.text
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TamanoCategoricoFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in TamanoCategoricoFieldInstructions.possible_values:
self.valid = True
return self
class PrecioFieldInstructions(BaseTargetFieldInstructions):
field_name = "precio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "PrecioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = "".join(
re.findall(r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0]))
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "PrecioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,20}", self.value):
self.valid = True
return self
class M2FieldInstructions(BaseTargetFieldInstructions):
field_name = "m2"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "M2FieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
posible_m2 = [
tag.text
for tag in soup.find("div", {"class": "info-features"}).find_all("span")
]
if [posible for posible in posible_m2 if "" in posible]:
self.value = [
"".join(re.findall(r"[0-9]+,*[0-9]*", posible))
for posible in posible_m2
if "" in posible
][0].replace(",", ".")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "M2FieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if re.match(r"[0-9]{1,4}", self.value):
self.valid = True
return self
class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions):
field_name = "tipo_anuncio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TipoAnuncioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if "venta" in soup.find("title").text:
self.value = 1
self.found = True
if "Alquiler" in soup.find("title").text:
self.value = 2
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TipoAnuncioFieldInstructions":
"""
Check if the obtained value fits the expected format.
:return: self
"""
self.valid = False
if self.value in [1, 2]:
self.valid = True
return self
class CalleFieldInstructions(BaseTargetFieldInstructions):
field_name = "calle"
def __init__(self):
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CalleFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.found = False
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3:
self.value = ""
if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4:
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CalleFieldInstructions":
self.valid = True
return self
class BarrioFieldInstructions(BaseTargetFieldInstructions):
field_name = "barrio"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "BarrioFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "BarrioFieldInstructions":
self.valid = True
return self
class DistritoFieldInstructions(BaseTargetFieldInstructions):
field_name = "distrito"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "DistritoFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "DistritoFieldInstructions":
self.valid = True
return self
class CiudadFieldInstructions(BaseTargetFieldInstructions):
field_name = "ciudad"
def __init__(self):
super().__init__()
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "CiudadFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
self.value = (
soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip()
)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "CiudadFieldInstructions":
self.valid = True
return self
class SecondaryFeaturesFieldInstructions(BaseTargetFieldInstructions):
"""
Shared methods for secondary features found in a list in ads.
"""
def __init__(self, field_name: str, search_keyword: str):
super().__init__()
self.field_name = field_name
self._feature_keyword = search_keyword
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "SecondaryFeaturesFieldInstructions":
"""
Try to find the value and store it.
:param soup: a BeautifulSoup object for the target html
:return: self
"""
return self._find_feature_with_keyword(soup=soup, keyword=self._feature_keyword)
def _find_feature_with_keyword(
self, soup: BeautifulSoup, keyword: str
) -> "SecondaryFeaturesFieldInstructions":
"""
Checks if a feature is in the secondary list by keyword and stores the
value if found.
:param soup: a BeautifulSoup object for the target html
:param keyword: the keyword for that feature
:return: self
"""
features_lists = soup.find_all("div", {"class": "details-property_features"})
features = [
feature.text
for feature_list in features_lists
for feature in feature_list.find_all("li")
]
if not features:
self.found = False
return self
self.value = 1 * any(keyword in feature for feature in features)
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "SecondaryFeaturesFieldInstructions":
self.valid = False
if self.value in [0, 1]:
self.valid = True
return self
class TelefonoFieldInstructions(BaseTargetFieldInstructions):
field_name = "telefono"
def __init__(self):
"""
Check if the obtained value fits the expected format.
:return: self
"""
super().__init__()
self.is_optional = True
@BaseTargetFieldInstructions.Decorators.fail_safe_scrape
def scrape(self, soup: BeautifulSoup) -> "TelefonoFieldInstructions":
self.value = soup.find(
"p", {"class": "txt-bold _browserPhone icon-phone"}
).text.replace(" ", "")
self.found = True
return self
@BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing
def validate(self) -> "TelefonoFieldInstructions":
self.valid = False
if re.match(r"\s*\+?[0-9\s]*", self.value):
self.valid = True
return self
class ParsingFlow:
"""
Object to gather all instructions for a job run, execute them and present
the results.
"""
def __init__(self) -> None:
"""
Initialize the instruction list.
"""
self._instructions = []
def add_instructions(
self,
instructions: Union[
BaseTargetFieldInstructions, Iterable[BaseTargetFieldInstructions]
],
):
"""
Include new instructions to the internal list.
:param instructions: a single or iterable group of instructions
:return: self
"""
if isinstance(instructions, BaseTargetFieldInstructions):
self._instructions.append(instructions)
return self
self._instructions.extend(instructions)
return self
def execute_flow(self, soup: BeautifulSoup) -> None:
"""
Scraped and validate all fields according to instructions.
:param soup: a BeautifulSoup object for the target html
:return: None
"""
for instruction in self._instructions:
instruction.scrape(soup).validate()
@property
def field_values(self) -> Dict:
"""
Return the value for all fields, or None.
:return: a dict with the field names and values
"""
return {field.field_name: field.value for field in self._instructions}
@property
def all_found_fields_are_valid(self) -> bool:
"""
Check if all found fields are valid.
:return: True if the fields are valid, False otherwise
"""
relevant_fields = [
field.valid for field in self._instructions if field.found is True
]
return all(relevant_fields)
@property
def all_non_optional_fields_were_found(self) -> bool:
"""
Check if all compulsory fields were found.
:return: True if the fields were found, False otherwise
"""
found_or_not = [
field.found or field.is_optional for field in self._instructions
]
return all(found_or_not)
@property
def issues(self) -> Dict[str, dict]:
"""
Returns all identified issues during scraping and validation.
:return: the issues, bucketed by field
"""
issues = {}
for field in self._instructions:
if (field.found or field.is_optional) and (
field.valid is True or field.valid is None
):
continue
this_field_issues = {}
if not field.found and not field.is_optional:
this_field_issues["found"] = "Not found"
if field.search_issue:
this_field_issues["search_issue"] = field.search_issue
if not field.valid and field.valid is not None:
this_field_issues["validity"] = "Not valid"
this_field_issues["value"] = field.value
issues[field.field_name] = this_field_issues
return issues
class ParsingFlowGenerator:
"""
Class for creating multiple, empty flows based on a group of instructions.
"""
def __init__(
self,
parsing_flow_class: Type[ParsingFlow],
instructions_to_attach_with_params: Union[
Tuple[Type[BaseTargetFieldInstructions], Dict],
Tuple[Tuple[Type[BaseTargetFieldInstructions], Dict]],
],
) -> None:
"""
Set the flow class and group of instructions to use when creating new
instances of the flow class.
:param parsing_flow_class: the flow class to instantiate
:param instructions_to_attach_with_params: one or more pair of field
instructions class and the paramteres to use when instantiating them
"""
self._parsing_flow_class = parsing_flow_class
if not isinstance(instructions_to_attach_with_params, tuple):
instructions_to_attach_with_params = tuple(
instructions_to_attach_with_params
)
self._instructions_to_attach_with_params = instructions_to_attach_with_params
def get_new_flow(self) -> ParsingFlow:
"""
Instantiate a new parsing flow with the instantiated classes attached.
:return: the new parsing flow
"""
new_parsing_flow = self._parsing_flow_class()
for instruction, params in self._instructions_to_attach_with_params:
new_parsing_flow.add_instructions(instruction(**params))
return new_parsing_flow