# -*- coding: utf-8 -*- import sys sys.path.append("..") import uuid import datetime from time import sleep from bs4 import BeautifulSoup import re from random import randint import mysql.connector from core.mysql_wrapper import get_anunciosdb, get_tasksdb from core.config import monthly_new_ads_target, working_hours from core.scrapping_utils import UrlAttack from core.alerts import alert_master from db_layer.capturing_tasks_interface import capturing_interface from core import my_logger import logging class Explorer: """ Daemon with the full flow of execution of generating a listing page url, requesting the page, scraping the ad references and storing logs in the task database """ sleep_time_no_work = 60 sleep_time_no_service = 600 ad_types = {"1": "alquiler", "2": "venta"} def __init__(self) -> None: """ Connect to database and set up initial parameters. """ try: self.anunciosdb = get_anunciosdb() self.tasksdb = get_tasksdb() except: print("Could not connect to anuncios DB") self.max_db_retries = 3 self.db_retries = 0 self.max_queue_retries = 3 self.queue_retries = 0 def start(self) -> None: """ Full flow of execution. Checks whether it should capture a URL, tries to do so and stores the result if successful. :return: None """ logging.info("Starting explorer") while True: if not self._is_there_work(): print("{}: Waiting. No work".format(datetime.datetime.now())) sleep(Explorer.sleep_time_no_work) continue logging.info("Waiting") if not self._database_is_up(): alert_master( "SQL DOWN", "El explorer informa de que SQL esta caida. Actividad detenida", ) raise ConnectionError("Unable to connect to database") current_task = ExploringTask(self._compose_listing_url()) current_task.explore() logging.info("Exploring task done...") if current_task.status == "Referencias ready": referencias = current_task.get_referencias() for referencia in referencias: capturing_interface.create_capturing_task( referencia, current_task.id ) current_task._update_status("Sent to queue") logging.info("The task was successful.") continue def _is_there_work(self) -> bool: """ Checks whether it should try to scrap a listing page according to limits and cooldowns. :return: True if it should work, false otherwise """ if any( [ self._check_if_recent_task(), not self._in_working_hours(), ( self._get_referencias_acquired_today() >= self._get_max_referencias_for_today() ), (self._get_tasks_created_today() >= self._get_max_tasks_today()), ] ): return False return True def _database_is_up(self) -> bool: """ Checks whether the db is reachable with some retries. :return: True if db is reachable, false if not """ while self.db_retries <= self.max_db_retries: try: self.anunciosdb.ping() self.db_retries = 0 return True except: sleep(Explorer.sleep_time_no_service) self.db_retries = self.db_retries + 1 return False @staticmethod def _in_working_hours() -> None: """ Checks whether now is within the working hours of the daemon. :return: True if so, false if not """ return ( working_hours["start"] <= datetime.datetime.now().time() <= working_hours["end"] ) def _get_referencias_acquired_today(self) -> int: """ Queries the database to obtain the count of scraped ads in the last 24h. :return: the resulting count """ query_statement = """ SELECT count(referencia) FROM primera_captura_full WHERE fecha_captura >= now() - INTERVAL 1 DAY; """ cursor_result = self.anunciosdb.query(query_statement) return cursor_result.fetchone()[0] def _get_max_referencias_for_today(self) -> float: """ Queries the database for the number of captured ads in the last 30 days and computes the max number of ad references to obtain today. :return: the max number of references """ query_statement = """ SELECT count(referencia) FROM primera_captura_full WHERE fecha_captura >= now() - INTERVAL 30 DAY; """ cursor_result = self.anunciosdb.query(query_statement) new_referencias_last_30 = cursor_result.fetchone()[0] deviation = ( monthly_new_ads_target - new_referencias_last_30 ) / monthly_new_ads_target max_referencias = (monthly_new_ads_target / 30) * (1 + deviation) return max_referencias def _get_tasks_created_today(self) -> int: """ Queries the database for the number of exploring tasks created in the last 24h, returns it. :return: number of exploring tasks created """ query_statement = """ SELECT count(uuid) FROM exploring_tasks_logs WHERE status = 'Attacked' AND write_time >= now() - INTERVAL 1 DAY; """ cursor_result = self.tasksdb.query(query_statement) tasks_created_today = cursor_result.fetchone()[0] return tasks_created_today def _get_max_tasks_today(self) -> float: """ Computes the current task goal :return: max current tasks target """ return (self._get_max_referencias_for_today() / 30) * 6 def _check_if_recent_task(self) -> int: """ Queries the db for the number of tasks created in the last 10 minutes. :return: the number of recently created tasks """ query_statement = """ SELECT count(uuid) FROM exploring_tasks_logs WHERE status = 'Attacked' AND write_time >= now() - INTERVAL 10 MINUTE """ cursor_result = self.tasksdb.query(query_statement) return cursor_result.fetchone()[0] @staticmethod def _compose_listing_url() -> str: """ Generates a listing page URL randomly. :return: the listing page URL """ root = "https://www.idealista.com/" type = Explorer.ad_types[str(randint(1, 2))] city = "barcelona" page_number = str(randint(1, 30)) url = ( root + type + "-garajes/" + city + "-" + city + "/" + "pagina-" + page_number + ".htm" ) return url class ExploringTask: """ Task object wrapping the process of attempting to capture a listing page, parsing the ad references and sending to db. """ def __init__(self, url: str) -> None: """ Initialize with task parameters and mark the task as being worked on in the task queue. :param url: string with the listing page url to be captured """ self.anunciosdb = get_anunciosdb() self.tasksdb = get_tasksdb() self.target_url = url self.id = str(uuid.uuid4()) self._update_status("Pending") def _update_status(self, new_status: str) -> None: """ Updates the task status and persists it in the task queue. :param new_status: string describing the new status :return: None """ self.status = new_status self._log_in_tasksdb() def explore(self) -> None: """ Main flow of work. :return: None """ attack = UrlAttack(self.target_url) attack.attack() self._update_status("Attacked") if not attack.success: self._update_status("Failure - Bad request") return self._validate_referencias(attack.get_text()) self._extract_referencias(attack.get_text()) if self.referencias: self._update_status("Referencias ready") elif self.there_are_referencias: self._update_status("Failure - No new referencias in HTML") else: self._update_status("Failure - HTML with no referencias") def _log_in_tasksdb(self) -> None: """ Logs status in the task db. :return: None """ query_statement = """INSERT INTO exploring_tasks_logs (uuid, write_time, status) VALUES (%(uuid)s, NOW(), %(status)s)""" query_parameters = {"uuid": self.id, "status": self.status} self.tasksdb.query(query_statement, query_parameters) def _validate_referencias(self, html: str) -> None: """ Checks that the ad references are in the HTML code. :param html: string with HTML code of the listing page :return: None """ soup = BeautifulSoup(html, "html5lib") ads = soup.find_all(class_="item") pattern = "^[0-9]{3,20}$" for ad in ads: if not re.match(pattern, ad["data-adid"]): alert_master( "Alerta - Referencias no válidas", """Una tarea de exploración ha considerado inválida una referencia. El texto de la referencia era : {} """.format( ad["data-adid"] ), ) break def _extract_referencias(self, html: str) -> None: """ Scraps the ad references out of the HTML code and stores them. :param html: string with HTML code of the listing page :return: None """ soup = BeautifulSoup(html, "html5lib") ads = soup.find_all(class_="item") self.there_are_referencias = bool(ads) self.referencias = [] for ad in ads: if self._is_new_listing(ad["data-adid"]): self.referencias.append(ad["data-adid"]) def _is_new_listing(self, referencia: str) -> bool: """ Checks if an ad reference already exists in the db. :param referencia: :return: True if it is new, false if not """ query_statement = """SELECT count(referencia) FROM capturas WHERE referencia = %s""" query_params = (referencia,) cursor_result = self.anunciosdb.query(query_statement, query_params) result = cursor_result.fetchone() if result[0] > 0: return False else: return True def get_referencias(self) -> list: """ Gets the references. :return: list of ad references """ return self.referencias if __name__ == "__main__": explorer = Explorer() explorer.start()