from time import sleep import datetime from db_layer.capturing_tasks_interface import CapturingTasksInterface from db_layer.capturas_interface import CapturasInterface from core.scrapping_utils import UrlAttack from core.config import working_hours, minimum_seconds_between_tries from core.throttling_utils import ( ThrottleManager, WorkingHoursThrottlingRule, CooldownThrottlingRule, DynamicThrottlingRule, ) from refresher.refresher import Refresher from core.parsing_utils import * from core import my_logger import logging class Capturer: """ Daemon with the full flow of execution of individual ad requesting, data scraping and db storage. """ def __init__( self, throttling_manager: ThrottleManager, capturing_tasks_interface: CapturingTasksInterface, capturas_interface: CapturasInterface, parsing_flow_generator: ParsingFlowGenerator, url_acquisition_object: Type[UrlAttack], dead_ad_checker: Callable, ) -> None: """ Receive all required objects. :param throttling_manager: takes care of deciding whether a task should be started :param capturing_tasks_interface: interface to interact with the tasks database :param capturas_interface: interface to interact with the ad database :param parsing_flow_generator: an object capable of generating empty parsing flows to give each task a new one :param url_acquisition_object: gateway to obtaining the HTML of an url :param dead_ad_checker: callable capable of checking if an ad is dead through its HTML """ self._throttling_manager = throttling_manager self._capturing_tasks_interface = capturing_tasks_interface self._capturas_interface = capturas_interface self._parsing_flow_generator = parsing_flow_generator self._url_acquisition_object = url_acquisition_object self._dead_ad_checker = dead_ad_checker self.last_try_datetime = datetime.datetime.now() def start(self) -> None: """ Full flow of execution. Checks whether it should capture a URL, tries to do so and stores the result if successful. :return: None """ logging.info("Starting capturer") while True: while not self._throttling_manager.allow_next_task( last_attempt_timestamp=self.last_try_datetime ): sleep(10) logging.info("Waiting...") pending_task = self._capturing_tasks_interface.get_pending_task() logging.info("Got a task") task = CapturingTask( pending_task, capturing_interface=self._capturing_tasks_interface, new_parsing_flow=self._parsing_flow_generator.get_new_flow(), url_acquisition_object=self._url_acquisition_object, dead_ad_checker=self._dead_ad_checker, ) self.last_try_datetime = datetime.datetime.now() task.capture() if not task.status == "Data ready": logging.warning("Something went wrong, not adding data.") continue ad_data = task.get_ad_data() self._capturas_interface.insert_captura(ad_data) task.update_status("Captura inserted") logging.info("New ad inserted.") class CapturingTask: """ Task object wrapping the process of attempting to capture and ad, parsing the data and sending to db. """ sleep_time_failed_request = 180 def __init__( self, task_parameters: dict, capturing_interface: CapturingTasksInterface, new_parsing_flow: ParsingFlow, url_acquisition_object: Type[UrlAttack], dead_ad_checker: Callable, ) -> None: """ Initialize with task parameters and mark the task as being worked on in the task queue. :param task_parameters: dict with the necessary parameters for the task :param capturing_interface: interface to interact with the ad database :param new_parsing_flow: an empty parsing flow :param url_acquisition_object: gateway to obtaining the HTML of an url :param dead_ad_checker: callable capable of checking if an ad is dead """ self.uuid = task_parameters["uuid"] self.ad_url = task_parameters["ad_url"] self.uuid_exploring = task_parameters["fk_uuid_exploring"] self.status = task_parameters["status"] self.request_failures = 1 self.html = None self._parsing_flow = new_parsing_flow self._capturing_interface = capturing_interface self._url_acquistion_object = url_acquisition_object self._is_dead_ad = dead_ad_checker self.update_status("Loading") def update_status(self, new_status) -> None: """ Updates the task status and persists it in the task queue. :param new_status: string describing the new status :return: None """ self.status = new_status self._capturing_interface.update_capturing_task( self.uuid, self.uuid_exploring, self.status, self.ad_url ) def capture(self) -> None: """ Main flow of work """ self.update_status("WIP") while self.request_failures < 4: attack = self._url_acquistion_object(self.ad_url) attack.attack() if attack.success: logging.info("URL attack successful.") self._parse_html(html=attack.get_text()) return if not attack.success: logging.info("URL attack failed.") try: if self._is_dead_ad(attack.get_text()): self.update_status("Dead ad") return except AttributeError: logging.error( "Something went wrong when checking if the ad is gone" ) logging.error(AttributeError) self.update_status("Fail {}".format(self.request_failures)) self.request_failures += 1 sleep(CapturingTask.sleep_time_failed_request) continue self.update_status("Surrender") logging.warning(f"A task has surrendered. {self.ad_url}") def get_ad_data(self) -> dict: """ Returns the extracted data. :return: dictionary with the data of the ad. """ return self._parsing_flow.field_values def _parse_html(self, html: str) -> None: """ Execute the complete parsing flow and report the task status depending on the outcome. :param html: the HTML of the ad :return: None """ self._parsing_flow.execute_flow(soup=BeautifulSoup(html, "html5lib")) if not self._parsing_flow.issues: self.update_status("Data ready") return if not self._parsing_flow.all_found_fields_are_valid: self.update_status("Invalid value fields") logging.warning(f"Invalid fields found in ad: {self.ad_url}") logging.warning(f"{self._parsing_flow.issues}") return if not self._parsing_flow.all_non_optional_fields_were_found: self.update_status("Fields missing") logging.warning( f"Couldn't scrap necessary fields: {self._parsing_flow.issues}" ) return if __name__ == "__main__": capturing_tasks_interface = CapturingTasksInterface() capturas_interface = CapturasInterface() throttling_manager = ThrottleManager() throttling_manager.add_rule(WorkingHoursThrottlingRule(working_hours)).add_rule( CooldownThrottlingRule(minimum_seconds_between_tries), required_argument_names=["last_attempt_timestamp"], ).add_rule( DynamicThrottlingRule( lambda: bool(capturing_tasks_interface.get_pending_task()) ) ) parsing_flow_generator = ParsingFlowGenerator( ParsingFlow, ( (ReferenciaFieldInstructions, {}), (PrecioFieldInstructions, {}), (TamanoCategoricoFieldInstructions, {}), (M2FieldInstructions, {}), (TipoAnuncioFieldInstructions, {}), (CalleFieldInstructions, {}), (BarrioFieldInstructions, {}), (DistritoFieldInstructions, {}), (CiudadFieldInstructions, {}), ( SecondaryFeaturesFieldInstructions, {"field_name": "cubierta", "search_keyword": "Cubierta"}, ), ( SecondaryFeaturesFieldInstructions, {"field_name": "puerta_auto", "search_keyword": "Puerta"}, ), ( SecondaryFeaturesFieldInstructions, {"field_name": "ascensor", "search_keyword": "ascensor"}, ), ( SecondaryFeaturesFieldInstructions, {"field_name": "alarma", "search_keyword": "Alarma"}, ), ( SecondaryFeaturesFieldInstructions, {"field_name": "circuito", "search_keyword": "Cámaras"}, ), ( SecondaryFeaturesFieldInstructions, {"field_name": "personal", "search_keyword": "Personal"}, ), (TelefonoFieldInstructions, {}), ), ) capturer = Capturer( throttling_manager=throttling_manager, capturing_tasks_interface=capturing_tasks_interface, capturas_interface=capturas_interface, parsing_flow_generator=parsing_flow_generator, url_acquisition_object=UrlAttack, dead_ad_checker=Refresher.dead_ad_checker, ) capturer.start()