from typing import Union, Iterable, Dict, Callable, Type import re from bs4 import BeautifulSoup class BaseTargetFieldInstructions: """ Abstract class for all field instructions. Implements useful decorators as well as the main interface. """ class Decorators: """ Decorators to use across all field instructions. """ @classmethod def fail_safe_scrape(cls, f: Callable) -> Callable: """ Wraps a scrape action in a try-except to control any errors, and updates the state of the search accordingly. :param f: the scrape function :return: the wrapped function """ def wrapper(self, soup: BeautifulSoup): try: return f(self, soup) except Exception as e: self.found = False self.search_issue = e return self return wrapper @classmethod def if_not_found_do_nothing(cls, f: Callable) -> Callable: """ Wraps a function to only execute it if the field has been found in the html. Otherwise, do nothing. :param f: the function that might get executed :return: the wrapped function """ def wrapper(self): if self.found: return f(self) return self return wrapper def __init__(self) -> None: """ Initialize attributes. """ self.is_optional = False self.found = None self.valid = None self.value = None self.search_issue = None def scrape(self, soup: BeautifulSoup) -> None: """ Interface for the scrape method. :param soup: a BeautifulSoup object for the target html :return: None """ raise NotImplementedError() def validate(self) -> None: """ Interface for the validate method. :return: None """ raise NotImplementedError() class ReferenciaFieldInstructions(BaseTargetFieldInstructions): """ Instructions for field Referencia. """ field_name = "referencia" def __init__(self) -> None: """ Initialize all default parameters. """ super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "ReferenciaFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.value = re.findall( r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0]) )[0] self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "ReferenciaFieldInstructions": """ Check if the obtained value fits the expected format. :return: self """ self.valid = False if re.match(r"[0-9]{4,20}", self.value): self.valid = True return self class TamanoCategoricoFieldInstructions(BaseTargetFieldInstructions): field_name = "tamano_categorico" possible_values = [ "2 coches o más", "coche y moto", "coche grande", "coche pequeño", "moto", None, ] def __init__(self): super().__init__() self.is_optional = True @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "TamanoCategoricoFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.found = False if ( "m²" not in soup.find("div", {"class": "info-features"}) .find("span") .find("span") .text ): self.value = ( soup.find("div", {"class": "info-features"}) .find("span") .find("span") .text ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "TamanoCategoricoFieldInstructions": """ Check if the obtained value fits the expected format. :return: self """ self.valid = False if self.value in TamanoCategoricoFieldInstructions.possible_values: self.valid = True return self class PrecioFieldInstructions(BaseTargetFieldInstructions): field_name = "precio" def __init__(self): super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "PrecioFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.value = "".join( re.findall(r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0])) ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "PrecioFieldInstructions": """ Check if the obtained value fits the expected format. :return: self """ self.valid = False if re.match(r"[0-9]{1,20}", self.value): self.valid = True return self class M2FieldInstructions(BaseTargetFieldInstructions): field_name = "m2" def __init__(self): super().__init__() self.is_optional = True @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "M2FieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.found = False posible_m2 = [ tag.text for tag in soup.find("div", {"class": "info-features"}).find_all("span") ] if [posible for posible in posible_m2 if "m²" in posible]: self.value = [ "".join(re.findall(r"[0-9]+,*[0-9]*", posible)) for posible in posible_m2 if "m²" in posible ][0].replace(",", ".") self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "M2FieldInstructions": """ Check if the obtained value fits the expected format. :return: self """ self.valid = False if re.match(r"[0-9]{1,4}", self.value): self.valid = True return self class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions): field_name = "tipo_anuncio" def __init__(self): super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "TipoAnuncioFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.found = False if "venta" in soup.find("title").text: self.value = 1 self.found = True if "alquiler" in soup.find("title").text: self.value = 2 self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "TipoAnuncioFieldInstructions": """ Check if the obtained value fits the expected format. :return: self """ self.valid = False if self.value in [1, 2]: self.valid = True return self class CalleFieldInstructions(BaseTargetFieldInstructions): field_name = "calle" def __init__(self): super().__init__() self.is_optional = True @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "CalleFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.found = False if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3: self.value = "" if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4: self.value = ( soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip() ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "CalleFieldInstructions": self.valid = True return self class BarrioFieldInstructions(BaseTargetFieldInstructions): field_name = "barrio" def __init__(self): super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "BarrioFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.value = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip() ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "BarrioFieldInstructions": self.valid = True return self class DistritoFieldInstructions(BaseTargetFieldInstructions): field_name = "distrito" def __init__(self): super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "DistritoFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.value = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip() ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "DistritoFieldInstructions": self.valid = True return self class CiudadFieldInstructions(BaseTargetFieldInstructions): field_name = "ciudad" def __init__(self): super().__init__() @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "CiudadFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ self.value = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip() ) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "CiudadFieldInstructions": self.valid = True return self class SecondaryFeaturesFieldInstructions(BaseTargetFieldInstructions): """ Shared methods for secondary features found in a list in ads. """ def __init__(self, field_name: str, search_keyword: str): super().__init__() self.field_name = field_name self._feature_keyword = search_keyword @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "SecondaryFeaturesFieldInstructions": """ Try to find the value and store it. :param soup: a BeautifulSoup object for the target html :return: self """ return self._find_feature_with_keyword(soup=soup, keyword=self._feature_keyword) def _find_feature_with_keyword( self, soup: BeautifulSoup, keyword: str ) -> "SecondaryFeaturesFieldInstructions": """ Checks if a feature is in the secondary list by keyword and stores the value if found. :param soup: a BeautifulSoup object for the target html :param keyword: the keyword for that feature :return: self """ features_lists = soup.find_all("div", {"class": "details-property_features"}) features = [ feature.text for feature_list in features_lists for feature in feature_list.find_all("li") ] if not features: self.found = False return self self.value = 1 * any(keyword in feature for feature in features) self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "SecondaryFeaturesFieldInstructions": self.valid = False if self.value in [0, 1]: self.valid = True return self class TelefonoFieldInstructions(BaseTargetFieldInstructions): field_name = "telefono" def __init__(self): """ Check if the obtained value fits the expected format. :return: self """ super().__init__() self.is_optional = True @BaseTargetFieldInstructions.Decorators.fail_safe_scrape def scrape(self, soup: BeautifulSoup) -> "TelefonoFieldInstructions": self.value = soup.find( "p", {"class": "txt-bold _browserPhone icon-phone"} ).text.replace(" ", "") self.found = True return self @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing def validate(self) -> "TelefonoFieldInstructions": self.valid = False if re.match(r"\s*\+?[0-9\s]*", self.value): self.valid = True return self class ParsingFlow: """ Object to gather all instructions for a job run, execute them and present the results. """ def __init__(self) -> None: """ Initialize the instruction list. """ self._instructions = [] def add_instructions( self, instructions: Union[ BaseTargetFieldInstructions, Iterable[BaseTargetFieldInstructions] ], ): """ Include new instructions to the internal list. :param instructions: a single or iterable group of instructions :return: self """ if isinstance(instructions, BaseTargetFieldInstructions): self._instructions.append(instructions) return self self._instructions.extend(instructions) return self def execute_flow(self, soup: BeautifulSoup) -> None: """ Scraped and validate all fields according to instructions. :param soup: a BeautifulSoup object for the target html :return: None """ for instruction in self._instructions: instruction.scrape(soup).validate() @property def field_values(self) -> Dict: """ Return the value for all fields, or None. :return: a dict with the field names and values """ return {field.field_name: field.value for field in self._instructions} @property def all_found_fields_are_valid(self) -> bool: """ Check if all found fields are valid. :return: True if the fields are valid, False otherwise """ relevant_fields = [ field.valid for field in self._instructions if field.found is True ] return all(relevant_fields) @property def all_non_optional_fields_were_found(self) -> bool: """ Check if all compulsory fields were found. :return: True if the fields were found, False otherwise """ found_or_not = [ field.found or field.is_optional for field in self._instructions ] return all(found_or_not) @property def issues(self) -> Dict[str, dict]: """ Returns all identified issues during scraping and validation. :return: the issues, bucketed by field """ issues = {} for field in self._instructions: if (field.found or field.is_optional) and field.valid: continue this_field_issues = {} if not field.found: this_field_issues["found"] = "Not found" if field.search_issue: this_field_issues["search_issue"] = field.search_issue if not field.valid: this_field_issues["validity"] = "Not valid" this_field_issues["value"] = field.value issues[field.field_name] = this_field_issues return issues class ParsingFlowGenerator: """ Class for creating multiple, empty flows based on a group of instructions. """ def __init__( self, parsing_flow_class: Type[ParsingFlow], instructions_to_attach_with_params: Dict[ Type[BaseTargetFieldInstructions], dict ], ) -> None: """ Set the flow class and group of instructions to use when creating new instances of the flow class. :param parsing_flow_class: the flow class to instantiate :param instructions_to_attach_with_params: a key-value pair of field instructions class and the paramteres to use when instantiating them """ self._parsing_flow_class = parsing_flow_class self._instructions_to_attach_with_params = instructions_to_attach_with_params def get_new_flow(self) -> ParsingFlow: """ Instantiate a new parsing flow with the instantiated classes attached. :return: the new parsing flow """ new_parsing_flow = self._parsing_flow_class() for instruction, params in self._instructions_to_attach_with_params.items(): new_parsing_flow.add_instructions(instruction(**params)) return new_parsing_flow