import sys sys.path.append("..") from time import sleep from bs4 import BeautifulSoup import re import datetime from db_layer.capturing_tasks_interface import capturing_interface from db_layer.capturas_interface import capturas_interface from core.scrapping_utils import UrlAttack from core.config import working_hours, minimum_seconds_between_tries from refresher.refresher import Refresher from core import my_logger import logging class Capturer: """ Daemon with the full flow of execution of individual ad requesting, data scraping and db storage. """ def __init__(self) -> None: self.last_try_datetime = datetime.datetime.now() def start(self) -> None: """ Full flow of execution. Checks whether it should capture a URL, tries to do so and stores the result if successful. :return: None """ logging.info("Starting capturer") while True: if not self._in_working_hours(): sleep(1800) logging.info("Waiting...") continue seconds_to_next_capture = ( minimum_seconds_between_tries() - self._seconds_since_last_try() ) if seconds_to_next_capture > 0: sleep(seconds_to_next_capture) logging.info("Waiting...") pending_task = capturing_interface.get_pending_task() if not pending_task: logging.info("No pending tasks.") continue task = CapturingTask(pending_task) self.last_try_datetime = datetime.datetime.now() task.capture() if task.status == "Data ready": ad_data = task.get_ad_data() else: logging.warning("Something went wrong, not adding data.") continue capturas_interface.insert_captura(ad_data) task._update_status("Captura inserted") logging.info("New ad inserted.") @staticmethod def _in_working_hours() -> bool: """ Checks whether now is within the working hours of the daemon. :return: True if so, false if not """ return ( working_hours["start"] <= datetime.datetime.now().time() <= working_hours["end"] ) def _seconds_since_last_try(self) -> float: """ Computes how many seconds have passed since the last capturing attempt :return: seconds since last try as integer """ return (datetime.datetime.now() - self.last_try_datetime).total_seconds() class CapturingTask: """ Task object wrapping the process of attempting to capture and ad, parsing the data and sending to db. """ sleep_time_failed_request = 180 def __init__(self, parameters) -> None: """ Initialize with task parameters and mark the task as being worked on in the task queue. :param parameters: dict with the necessary parameters for the task """ self.uuid = parameters["uuid"] self.ad_url = parameters["ad_url"] self.uuid_exploring = parameters["fk_uuid_exploring"] self.status = parameters["status"] self.request_failures = 1 self.html = None self._update_status("Loading") def _update_status(self, new_status) -> None: """ Updates the task status and persists it in the task queue. :param new_status: string describing the new status :return: None """ self.status = new_status capturing_interface.update_capturing_task( self.uuid, self.uuid_exploring, self.status, self.ad_url ) def capture(self) -> None: """ Main flow of work """ self._update_status("WIP") while self.request_failures < 4: attack = UrlAttack(self.ad_url) attack.attack() if attack.success: self.html = attack.get_text() self._extract_data() self._check_data() return if not attack.success: try: if Refresher.dead_ad_checker(attack.get_text()): self._update_status("Dead ad") return except AttributeError: logging.error( "Something went wrong when checking if the ad is gone" ) self._update_status("Fail {}".format(self.request_failures)) self.request_failures += 1 sleep(CapturingTask.sleep_time_failed_request) continue self._update_status("Surrender") logging.warning(f"A task has surrendered. {self.ad_url}") def _extract_data(self) -> None: """ Parses the obtained html to extract the ad information. :return: None """ self.parser = AdHtmlParser(self.html) self.parser.parse() def _check_data(self) -> None: """ Validates that all compulsory fields have been obtained and that the values are within the expected. Sets the status of task accordingly. :return: None """ if self.parser.fields_missing(): self._update_status("Fields missing") return if not self.parser.all_fields_are_valid(): self._update_status("Invalid value fields") return self._update_status("Data ready") def get_ad_data(self) -> dict: """ Returns the extracted data. :return: dictionary with the data of the ad. """ return self.parser.get_data() class AdHtmlParser: """ Object for parsing, storing and validating the data of the HTML of an ad. """ def __init__(self, html_string: str) -> None: """ Initializes an instance of the parser with the HTML of an ad. :param html_string: the full HTML code of the ad page """ self.html = html_string self.ad_fields = { "referencia": {"found": False, "optional": False, "value": None}, "precio": {"found": False, "optional": False, "value": None}, "tamano_categorico": {"found": False, "optional": True, "value": None}, "m2": {"found": False, "optional": True, "value": None}, "tipo_anuncio": {"found": False, "optional": False, "value": None}, "calle": {"found": False, "optional": True, "value": None}, "barrio": {"found": False, "optional": False, "value": None}, "distrito": {"found": False, "optional": False, "value": None}, "ciudad": {"found": False, "optional": False, "value": None}, "cubierta": {"found": False, "optional": False, "value": None}, "puerta_auto": {"found": False, "optional": False, "value": None}, "ascensor": {"found": False, "optional": False, "value": None}, "alarma": {"found": False, "optional": False, "value": None}, "circuito": {"found": False, "optional": False, "value": None}, "personal": {"found": False, "optional": False, "value": None}, "telefono": {"found": False, "optional": True, "value": None}, } def parse(self) -> None: """ Parses the HTML and stores the ad data. :return: None """ soup = BeautifulSoup(self.html, "html5lib") if soup.find_all("link", {"rel": "canonical"}) is not None: self.ad_fields["referencia"]["value"] = re.findall( r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0]) )[0] self.ad_fields["referencia"]["found"] = True if soup.find_all("strong", {"class": "price"}) is not None: self.ad_fields["precio"]["value"] = "".join( re.findall( r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0]) ) ) self.ad_fields["precio"]["found"] = True if soup.find("div", {"class": "info-features"}) is not None: try: if ( "m²" not in soup.find("div", {"class": "info-features"}) .find("span") .find("span") .text ): self.ad_fields["tamano_categorico"]["value"] = ( soup.find("div", {"class": "info-features"}) .find("span") .find("span") .text ) self.ad_fields["tamano_categorico"]["found"] = True except: pass posible_m2 = [ tag.text for tag in soup.find("div", {"class": "info-features"}).find_all("span") ] if [posible for posible in posible_m2 if "m²" in posible]: self.ad_fields["m2"]["value"] = [ "".join(re.findall(r"[0-9]+,*[0-9]*", posible)) for posible in posible_m2 if "m²" in posible ][0].replace(",", ".") self.ad_fields["m2"]["found"] = True if soup.find("title") is not None: if "venta" in soup.find("title").text: self.ad_fields["tipo_anuncio"]["value"] = 1 else: self.ad_fields["tipo_anuncio"]["value"] = 2 self.ad_fields["tipo_anuncio"]["found"] = True if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3: self.ad_fields["calle"]["value"] = "" self.ad_fields["ciudad"]["value"] = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip() ) self.ad_fields["ciudad"]["found"] = True self.ad_fields["distrito"]["value"] = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip() ) self.ad_fields["distrito"]["found"] = True self.ad_fields["barrio"]["value"] = ( soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip() ) self.ad_fields["barrio"]["found"] = True if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4: self.ad_fields["calle"]["value"] = ( soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip() ) self.ad_fields["calle"]["found"] = True features_lists = soup.find_all("div", {"class": "details-property_features"}) features = [ feature.text for feature_list in features_lists for feature in feature_list.find_all("li") ] self.ad_fields["cubierta"]["value"] = 1 * any( "Cubierta" in feature for feature in features ) self.ad_fields["puerta_auto"]["value"] = 1 * any( "Puerta" in feature for feature in features ) self.ad_fields["ascensor"]["value"] = 1 * any( "ascensor" in feature for feature in features ) self.ad_fields["alarma"]["value"] = 1 * any( "Alarma" in feature for feature in features ) self.ad_fields["circuito"]["value"] = 1 * any( "Cámaras" in feature for feature in features ) self.ad_fields["personal"]["value"] = 1 * any( "Personal" in feature for feature in features ) self.ad_fields["cubierta"]["found"] = True self.ad_fields["puerta_auto"]["found"] = True self.ad_fields["ascensor"]["found"] = True self.ad_fields["alarma"]["found"] = True self.ad_fields["circuito"]["found"] = True self.ad_fields["personal"]["found"] = True if soup.find("p", {"class": "txt-bold _browserPhone icon-phone"}) is not None: self.ad_fields["telefono"]["value"] = soup.find( "p", {"class": "txt-bold _browserPhone icon-phone"} ).text.replace(" ", "") self.ad_fields["telefono"]["found"] = True def _validate(self) -> None: """ Checks whether the extracted values are valid against the expected typology. Stores the results. :return: None """ self.invalid_fields = [] if not re.match(r"[0-9]{4,20}", self.ad_fields["referencia"]["value"]): self.invalid_fields.append("referencia") if not re.match(r"[0-9]{1,20}", self.ad_fields["precio"]["value"]): self.invalid_fields.append("precio") possible_values_tamano = [ "2 coches o más", "coche y moto", "coche grande", "coche pequeño", "moto", None, ] if self.ad_fields["tamano_categorico"]["value"] not in possible_values_tamano: self.invalid_fields.append("tamano_categorico") if not "Barrio" in self.ad_fields["barrio"]["value"]: self.invalid_fields.append("barrio") if not "Distrito" in self.ad_fields["distrito"]["value"]: self.invalid_fields.append("distrito") if self.ad_fields["telefono"]["found"] and not re.match( r"\s*\+?[0-9\s]*", self.ad_fields["telefono"]["value"] ): self.invalid_fields.append("telefono") # TODO añadir + a caracteres validos def all_fields_are_valid(self) -> bool: """ Reports on whether the extracted data is valid. :return: True if values are valid, false if not """ self._validate() if self.invalid_fields: return False else: return True def fields_missing(self) -> None: """ Reports on whether all compulsory fields are present. :return: True if some field is missing, false if not """ for key, contents in self.ad_fields.items(): if not contents["optional"] and not contents["found"]: return True return False def get_data(self) -> dict: """ Returns the extracted data in the form of a dictionary. :return: dictionary with the extracted data """ data = {} for ad_field in self.ad_fields.keys(): data[ad_field] = self.ad_fields[ad_field]["value"] return data if __name__ == "__main__": capturer = Capturer() capturer.start()