From 2a9483981ed5a2e454ae6f6d58e85ecdbd604343 Mon Sep 17 00:00:00 2001 From: pablo Date: Sat, 26 Dec 2020 18:54:04 +0100 Subject: [PATCH 01/13] Implemented a new throttling module to remove redundance in the project. --- core/throttling_utils.py | 192 +++++++++++++++++++++++++++++++++++++++ tests/throttling_test.py | 106 +++++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 core/throttling_utils.py create mode 100644 tests/throttling_test.py diff --git a/core/throttling_utils.py b/core/throttling_utils.py new file mode 100644 index 0000000..ccdf8d6 --- /dev/null +++ b/core/throttling_utils.py @@ -0,0 +1,192 @@ +from typing import List, Callable +import datetime + + +class BaseThrottlingRule: + """ + Interface for all throttling rules. + """ + + def __call__(self, **kwargs) -> bool: + """ + Upon calling the rule itself, the underlying check gets executed. + :param kwargs: arguments for check + :return: True if the check is OK, False otherwise + """ + return self._check_rule(**kwargs) + + def _check_rule(self, **kwargs) -> bool: + """ + Interface for internal method to check the rule. + :param kwargs: arguments for check + :return: True if the check is OK, False otherwise + """ + raise NotImplementedError + + +class WorkingHoursThrottlingRule(BaseThrottlingRule): + """ + Rule for checking if current time is within the defined working hours. + """ + + def __init__(self, working_hours: dict) -> None: + """ + Set the working hours as a dict with "start" and "end" keys, which + contain time objects. + :param working_hours: the definition of the working hours range + :return: None + """ + self._working_hours = working_hours + + def _check_rule(self) -> bool: + """ + Call underyling check method. + :return: True if the check is OK, False otherwise + """ + return self._inside_working_hours() + + def _inside_working_hours(self) -> bool: + """ + Checks if the current time is between the defined window of working + hours. + :return: True if within range, False otherwise + """ + return ( + self._working_hours["start"] + <= datetime.datetime.now().time() + <= self._working_hours["end"] + ) + + +class CooldownThrottlingRule(BaseThrottlingRule): + """ + Rule for checking if a certain time period has passed since the last + execution. + :attribute required_arguments: the list with arguments expected to be ready + for unpacking when checking the rule. + """ + + required_arguments = ["last_attempt_timestamp"] + + def __init__(self, cooldown_time_generator: Callable) -> None: + """ + Set the passed cooldown timer generator. + :param cooldown_time_generator: a callable object that returns some + number of seconds. Can be random or static. + """ + self._cooldown_time_generator = cooldown_time_generator + self._current_cooldown_time = self._cooldown_time_generator() + + def _check_rule(self, **kwargs) -> bool: + """ + Unpack argument and call underyling check method. + :return: True if the check is OK, False otherwise + """ + last_attempt_timestamp = kwargs["last_attempt_timestamp"] + + return self._check_if_cooldowned(last_attempt_timestamp) + + def _check_if_cooldowned(self, last_attempt_timestamp: datetime) -> bool: + """ + Checks if the cooldown time has passed. If so, set a new one. + :param last_attempt_timestamp: timestamp for the last time whatever + must be throttled happened. + :return: True if the cooldown time has passed, False otherwise + """ + cooldown_release_timestamp = last_attempt_timestamp + datetime.timedelta( + seconds=self._current_cooldown_time + ) + + if datetime.datetime.now() > cooldown_release_timestamp: + self._current_cooldown_time = self._cooldown_time_generator() + return True + + return False + + +class DynamicThrottlingRule(BaseThrottlingRule): + """ + A basic interface to dynamically set any function, optionally with + arguments, as a throttling rule. + """ + + def __init__(self, any_callable: Callable) -> None: + """ + Sets the callable that will act as a check. Only condition is that the + callable should return a boolean value. + :param any_callable: the check callable object + """ + self._some_rule = any_callable + + def _check_rule(self, **kwargs) -> bool: + """ + Calls the dynamically set callable while passing any given arguments. + :param kwargs: arguments for check + :return: True if the check is OK, False otherwise + """ + return self._some_rule(**kwargs) + + +class ThrottleManager: + """ + Holds and runs all throttling rules on demand. + """ + + def __init__(self) -> None: + """ + Initialize internal attributes. + """ + self._rules_to_check = [] + self._rules_and_required_arguments = dict() + + def allow_next_task(self, **kwargs) -> bool: + """ + Checks all the internal rules and returns whether all of them passed + successfully or not. + :param kwargs: any arguments needed by the rules + :return: True if all rules passed positively, False otherwise + """ + + check_results = self._check_all_rules(**kwargs) + + if not all(check_results): + return False + + return True + + def add_rule( + self, rule: BaseThrottlingRule, required_argument_names: List[str] = None + ) -> "ThrottleManager": + """ + Includes a new rule to the manager together with the argument names + that the rule call expects. + :param rule: the rule instance + :param required_argument_names: the required argument names to execute + the check for that rule + :return: the ThrottleManager instance + """ + required_argument_names = required_argument_names or [] + + self._rules_to_check.append(rule) + self._rules_and_required_arguments[rule.__class__] = required_argument_names + + return self + + def _check_all_rules(self, **kwargs) -> List[bool]: + """ + Executes all checks with the right arguments for each of them and + collects results. + :param kwargs: all passed arguments + :return: the result of each individual check + """ + checks = [] + + for rule in self._rules_to_check: + arguments_for_rule = { + argument_name: kwargs[argument_name] + for argument_name in self._rules_and_required_arguments[rule.__class__] + } + checks.append(rule(**arguments_for_rule)) + continue + + return checks diff --git a/tests/throttling_test.py b/tests/throttling_test.py new file mode 100644 index 0000000..3a9f916 --- /dev/null +++ b/tests/throttling_test.py @@ -0,0 +1,106 @@ +import datetime + +from core.throttling_utils import ( + ThrottleManager, + CooldownThrottlingRule, + WorkingHoursThrottlingRule, + DynamicThrottlingRule, +) + + +def test_working_hours_throttling_rule_checks(): + working_hours_rule = WorkingHoursThrottlingRule( + working_hours={ + "start": datetime.datetime.now().time(), + "end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(), + } + ) + + assert working_hours_rule() == True + + +def test_working_hours_throttling_rule_does_not_check(): + working_hours_rule = WorkingHoursThrottlingRule( + working_hours={ + "start": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(), + "end": (datetime.datetime.now() + datetime.timedelta(hours=2)).time(), + } + ) + + assert working_hours_rule() == False + + +def test_cooldown_throttling_rule_checks(): + time_generator = lambda: 60 + + cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator) + + assert ( + cooldown_rule( + last_attempt_timestamp=datetime.datetime.now() + + datetime.timedelta(seconds=-120) + ) + == True + ) + + +def test_cooldown_throttling_rule_does_not_check(): + time_generator = lambda: 60 + + cooldown_rule = CooldownThrottlingRule(cooldown_time_generator=time_generator) + + assert cooldown_rule(last_attempt_timestamp=datetime.datetime.now()) == False + + +def test_dynamic_rule_checks(): + mock_check = lambda: True + + rule = DynamicThrottlingRule(any_callable=mock_check) + + assert rule() == True + + +def test_dynamic_rule_does_not_check(): + mock_check = lambda: False + + rule = DynamicThrottlingRule(any_callable=mock_check) + + assert rule() == False + + +def test_dynamic_rule_arguments_pass_properly(): + def pass_a_bool(some_bool): + return some_bool + + rule = DynamicThrottlingRule(pass_a_bool) + + assert (rule(some_bool=True) == True) and (rule(some_bool=False) == False) + + +def test_throttle_manager_checks_rules(): + throttle_manager = ThrottleManager() + + def pass_a_bool(some_bool): + return some_bool + + some_rules = [ + WorkingHoursThrottlingRule( + working_hours={ + "start": datetime.datetime.now().time(), + "end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(), + } + ), + CooldownThrottlingRule(cooldown_time_generator=lambda: 0), + DynamicThrottlingRule(any_callable=pass_a_bool), + ] + + some_arguments = [[], ["last_attempt_timestamp"], ["some_bool"]] + + some_rules_and_arguments = zip(some_rules, some_arguments) + + for rule, arguments in some_rules_and_arguments: + throttle_manager.add_rule(rule, required_argument_names=arguments) + + assert throttle_manager.allow_next_task( + last_attempt_timestamp=datetime.datetime.now(), some_bool=True + ) From d136144a4e2c685dc33435b063f95c4722c85b69 Mon Sep 17 00:00:00 2001 From: pablo Date: Sat, 26 Dec 2020 20:25:56 +0100 Subject: [PATCH 02/13] Throttling checks are now lazy. --- core/throttling_utils.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/throttling_utils.py b/core/throttling_utils.py index ccdf8d6..5b02bf3 100644 --- a/core/throttling_utils.py +++ b/core/throttling_utils.py @@ -147,12 +147,9 @@ class ThrottleManager: :return: True if all rules passed positively, False otherwise """ - check_results = self._check_all_rules(**kwargs) + check_result = self._check_all_rules(**kwargs) - if not all(check_results): - return False - - return True + return check_result def add_rule( self, rule: BaseThrottlingRule, required_argument_names: List[str] = None @@ -172,12 +169,12 @@ class ThrottleManager: return self - def _check_all_rules(self, **kwargs) -> List[bool]: + def _check_all_rules(self, **kwargs) -> bool: """ - Executes all checks with the right arguments for each of them and + Executes checks (lazily) with the right arguments for each of them and collects results. :param kwargs: all passed arguments - :return: the result of each individual check + :return: True if all checks passed, False otherwise """ checks = [] @@ -187,6 +184,7 @@ class ThrottleManager: for argument_name in self._rules_and_required_arguments[rule.__class__] } checks.append(rule(**arguments_for_rule)) - continue + if checks[-1] == False: + return False - return checks + return True From 3f9a6d8e537462397136f411012dc6f532e9a7db Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 27 Dec 2020 12:35:02 +0100 Subject: [PATCH 03/13] Integrated throttling in capturer. --- capturer/capturer.py | 60 ++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/capturer/capturer.py b/capturer/capturer.py index 3acda04..3b25056 100644 --- a/capturer/capturer.py +++ b/capturer/capturer.py @@ -5,12 +5,18 @@ from time import sleep from bs4 import BeautifulSoup import re import datetime + from db_layer.capturing_tasks_interface import capturing_interface from db_layer.capturas_interface import capturas_interface from core.scrapping_utils import UrlAttack from core.config import working_hours, minimum_seconds_between_tries +from core.throttling_utils import ( + ThrottleManager, + WorkingHoursThrottlingRule, + CooldownThrottlingRule, + DynamicThrottlingRule, +) from refresher.refresher import Refresher -from core import my_logger import logging @@ -20,7 +26,8 @@ class Capturer: scraping and db storage. """ - def __init__(self) -> None: + def __init__(self, throttling_manager: ThrottleManager) -> None: + self._throttling_manager = throttling_manager self.last_try_datetime = datetime.datetime.now() def start(self) -> None: @@ -33,22 +40,15 @@ class Capturer: logging.info("Starting capturer") while True: - if not self._in_working_hours(): - sleep(1800) - logging.info("Waiting...") - continue - - seconds_to_next_capture = ( - minimum_seconds_between_tries() - self._seconds_since_last_try() - ) - if seconds_to_next_capture > 0: - sleep(seconds_to_next_capture) + while not self._throttling_manager.allow_next_task( + last_attempt_timestamp=self.last_try_datetime + ): + sleep(10) logging.info("Waiting...") pending_task = capturing_interface.get_pending_task() - if not pending_task: - logging.info("No pending tasks.") - continue + + logging.info("Got a task") task = CapturingTask(pending_task) self.last_try_datetime = datetime.datetime.now() @@ -64,25 +64,6 @@ class Capturer: task._update_status("Captura inserted") logging.info("New ad inserted.") - @staticmethod - def _in_working_hours() -> bool: - """ - Checks whether now is within the working hours of the daemon. - :return: True if so, false if not - """ - return ( - working_hours["start"] - <= datetime.datetime.now().time() - <= working_hours["end"] - ) - - def _seconds_since_last_try(self) -> float: - """ - Computes how many seconds have passed since the last capturing attempt - :return: seconds since last try as integer - """ - return (datetime.datetime.now() - self.last_try_datetime).total_seconds() - class CapturingTask: """ @@ -405,5 +386,14 @@ class AdHtmlParser: if __name__ == "__main__": - capturer = Capturer() + + throttling_manager = ThrottleManager() + throttling_manager.add_rule(WorkingHoursThrottlingRule(working_hours)).add_rule( + CooldownThrottlingRule(minimum_seconds_between_tries), + required_argument_names=["last_attempt_timestamp"], + ).add_rule( + DynamicThrottlingRule(lambda: bool(capturing_interface.get_pending_task())) + ) + + capturer = Capturer(throttling_manager=throttling_manager) capturer.start() From 3b79ba06d828996d428cd7b9246a5a8df0eb5314 Mon Sep 17 00:00:00 2001 From: pablo Date: Tue, 29 Dec 2020 17:38:17 +0100 Subject: [PATCH 04/13] Created parsing_utils module to refactor HTML parsing and validation actions. --- core/parsing_utils.py | 547 +++++++ tests/parsing_utils_test.py | 2726 +++++++++++++++++++++++++++++++++++ 2 files changed, 3273 insertions(+) create mode 100644 core/parsing_utils.py create mode 100644 tests/parsing_utils_test.py diff --git a/core/parsing_utils.py b/core/parsing_utils.py new file mode 100644 index 0000000..0cd7259 --- /dev/null +++ b/core/parsing_utils.py @@ -0,0 +1,547 @@ +from typing import Union, Iterable, Dict, Callable +import re + +from bs4 import BeautifulSoup + + +class BaseTargetFieldInstructions: + """ + Abstract class for all field instructions. Implements useful decorators as + well as the main interface. + """ + + class Decorators: + """ + Decorators to use across all field instructions. + """ + + @classmethod + def fail_safe_scrape(cls, f: Callable) -> Callable: + """ + Wraps a scrape action in a try-except to control any errors, and + updates the state of the search accordingly. + :param f: the scrape function + :return: the wrapped function + """ + + def wrapper(self, soup: BeautifulSoup): + try: + return f(self, soup) + except Exception as e: + self.found = False + self.search_issue = e + return self + + return wrapper + + @classmethod + def if_not_found_do_nothing(cls, f: Callable) -> Callable: + """ + Wraps a function to only execute it if the field has been found in + the html. Otherwise, do nothing. + :param f: the function that might get executed + :return: the wrapped function + """ + + def wrapper(self): + if self.found: + return f(self) + return self + + return wrapper + + def __init__(self) -> None: + """ + Initialize attributes. + """ + self.is_optional = False + self.found = None + self.valid = None + self.value = None + self.search_issue = None + + def scrape(self, soup: BeautifulSoup) -> None: + """ + Interface for the scrape method. + :param soup: a BeautifulSoup object for the target html + :return: None + """ + raise NotImplementedError() + + def validate(self) -> None: + """ + Interface for the validate method. + :return: None + """ + raise NotImplementedError() + + +class ReferenciaFieldInstructions(BaseTargetFieldInstructions): + """ + Instructions for field Referencia. + """ + + field_name = "referencia" + + def __init__(self) -> None: + """ + Initialize all default parameters. + """ + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "ReferenciaFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.value = re.findall( + r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0]) + )[0] + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "ReferenciaFieldInstructions": + """ + Check if the obtained value fits the expected format. + :return: self + """ + self.valid = False + if re.match(r"[0-9]{4,20}", self.value): + self.valid = True + return self + + +class TamanoCategoricoFieldInstructions(BaseTargetFieldInstructions): + + field_name = "tamano_categorico" + possible_values = [ + "2 coches o más", + "coche y moto", + "coche grande", + "coche pequeño", + "moto", + None, + ] + + def __init__(self): + super().__init__() + self.is_optional = True + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "TamanoCategoricoFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.found = False + if ( + "m²" + not in soup.find("div", {"class": "info-features"}) + .find("span") + .find("span") + .text + ): + self.value = ( + soup.find("div", {"class": "info-features"}) + .find("span") + .find("span") + .text + ) + self.found = True + + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "TamanoCategoricoFieldInstructions": + """ + Check if the obtained value fits the expected format. + :return: self + """ + self.valid = False + if self.value in TamanoCategoricoFieldInstructions.possible_values: + self.valid = True + + return self + + +class PrecioFieldInstructions(BaseTargetFieldInstructions): + + field_name = "precio" + + def __init__(self): + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "PrecioFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.value = "".join( + re.findall(r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0])) + ) + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "PrecioFieldInstructions": + """ + Check if the obtained value fits the expected format. + :return: self + """ + self.valid = False + if re.match(r"[0-9]{1,20}", self.value): + self.valid = True + + return self + + +class M2FieldInstructions(BaseTargetFieldInstructions): + field_name = "m2" + + def __init__(self): + super().__init__() + self.is_optional = True + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "M2FieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.found = False + posible_m2 = [ + tag.text + for tag in soup.find("div", {"class": "info-features"}).find_all("span") + ] + if [posible for posible in posible_m2 if "m²" in posible]: + self.value = [ + "".join(re.findall(r"[0-9]+,*[0-9]*", posible)) + for posible in posible_m2 + if "m²" in posible + ][0].replace(",", ".") + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "M2FieldInstructions": + """ + Check if the obtained value fits the expected format. + :return: self + """ + self.valid = False + if re.match(r"[0-9]{1,4}", self.value): + self.valid = True + return self + + +class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions): + field_name = "tipo_anuncio" + + def __init__(self): + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "TipoAnuncioFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.found = False + if "venta" in soup.find("title").text: + self.value = 1 + self.found = True + if "alquiler" in soup.find("title").text: + self.value = 2 + self.found = True + + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "TipoAnuncioFieldInstructions": + """ + Check if the obtained value fits the expected format. + :return: self + """ + self.valid = False + if self.value in [1, 2]: + self.valid = True + return self + + +class CalleFieldInstructions(BaseTargetFieldInstructions): + field_name = "calle" + + def __init__(self): + super().__init__() + self.is_optional = True + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "CalleFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.found = False + if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3: + self.value = "" + if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4: + self.value = ( + soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip() + ) + self.found = True + + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "CalleFieldInstructions": + self.valid = True + return self + + +class BarrioFieldInstructions(BaseTargetFieldInstructions): + field_name = "barrio" + + def __init__(self): + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "BarrioFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.value = ( + soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip() + ) + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "BarrioFieldInstructions": + self.valid = True + return self + + +class DistritoFieldInstructions(BaseTargetFieldInstructions): + field_name = "distrito" + + def __init__(self): + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "DistritoFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.value = ( + soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip() + ) + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "DistritoFieldInstructions": + self.valid = True + return self + + +class CiudadFieldInstructions(BaseTargetFieldInstructions): + field_name = "ciudad" + + def __init__(self): + super().__init__() + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "CiudadFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + self.value = ( + soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip() + ) + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "CiudadFieldInstructions": + self.valid = True + return self + + +class SecondaryFeaturesFieldInstructions(BaseTargetFieldInstructions): + """ + Shared methods for secondary features found in a list in ads. + """ + + def __init__(self, field_name: str, search_keyword: str): + super().__init__() + self.field_name = field_name + self._feature_keyword = search_keyword + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "SecondaryFeaturesFieldInstructions": + """ + Try to find the value and store it. + :param soup: a BeautifulSoup object for the target html + :return: self + """ + return self._find_feature_with_keyword(soup=soup, keyword=self._feature_keyword) + + def _find_feature_with_keyword( + self, soup: BeautifulSoup, keyword: str + ) -> "SecondaryFeaturesFieldInstructions": + """ + Checks if a feature is in the secondary list by keyword and stores the + value if found. + :param soup: a BeautifulSoup object for the target html + :param keyword: the keyword for that feature + :return: self + """ + features_lists = soup.find_all("div", {"class": "details-property_features"}) + features = [ + feature.text + for feature_list in features_lists + for feature in feature_list.find_all("li") + ] + if not features: + self.found = False + return self + self.value = 1 * any(keyword in feature for feature in features) + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "SecondaryFeaturesFieldInstructions": + self.valid = False + if self.value in [0, 1]: + self.valid = True + return self + + +class TelefonoFieldInstructions(BaseTargetFieldInstructions): + field_name = "telefono" + + def __init__(self): + """ + Check if the obtained value fits the expected format. + :return: self + """ + super().__init__() + self.is_optional = True + + @BaseTargetFieldInstructions.Decorators.fail_safe_scrape + def scrape(self, soup: BeautifulSoup) -> "TelefonoFieldInstructions": + self.value = soup.find( + "p", {"class": "txt-bold _browserPhone icon-phone"} + ).text.replace(" ", "") + self.found = True + return self + + @BaseTargetFieldInstructions.Decorators.if_not_found_do_nothing + def validate(self) -> "TelefonoFieldInstructions": + self.valid = False + if re.match(r"\s*\+?[0-9\s]*", self.value): + self.valid = True + return self + + +class ParsingFlow: + """ + Object to gather all instructions for a job run, execute them and present + the results. + """ + + def __init__(self) -> None: + """ + Initialize the instruction list. + """ + self._instructions = [] + + def add_instructions( + self, + instructions: Union[ + BaseTargetFieldInstructions, Iterable[BaseTargetFieldInstructions] + ], + ): + """ + Include new instructions to the internal list. + :param instructions: a single or iterable group of instructions + :return: self + """ + if isinstance(instructions, BaseTargetFieldInstructions): + self._instructions.append(instructions) + return self + self._instructions.extend(instructions) + + return self + + def execute_flow(self, soup: BeautifulSoup) -> None: + """ + Scraped and validate all fields according to instructions. + :param soup: a BeautifulSoup object for the target html + :return: None + """ + for instruction in self._instructions: + instruction.scrape(soup).validate() + + @property + def all_found_fields_are_valid(self) -> bool: + """ + Check if all found fields are valid. + :return: True if the fields are valid, False otherwise + """ + relevant_fields = [ + field.valid for field in self._instructions if field.found == True + ] + + return all(relevant_fields) + + @property + def all_non_optional_fields_were_found(self) -> bool: + """ + Check if all compulsory fields were found. + :return: True if the fields were found, False otherwise + """ + found_or_not = [ + field.found or field.is_optional for field in self._instructions + ] + + return all(found_or_not) + + @property + def issues(self) -> Dict[str, dict]: + """ + Returns all identified issues during scraping and validation. + :return: the issues, bucketed by field + """ + issues = {} + + for field in self._instructions: + if (field.found or field.is_optional) and field.valid: + continue + this_field_issues = {} + if not field.found: + this_field_issues["found"] = "Not found" + if field.search_issue: + this_field_issues["search_issue"] = field.search_issue + if not field.valid: + this_field_issues["validity"] = "Not valid" + this_field_issues["value"] = field.value + + issues[field.field_name] = this_field_issues + + return issues diff --git a/tests/parsing_utils_test.py b/tests/parsing_utils_test.py new file mode 100644 index 0000000..533b4ba --- /dev/null +++ b/tests/parsing_utils_test.py @@ -0,0 +1,2726 @@ +import pytest + +from bs4 import BeautifulSoup + +from core.parsing_utils import ( + ParsingFlow, + ReferenciaFieldInstructions, + PrecioFieldInstructions, + TamanoCategoricoFieldInstructions, + M2FieldInstructions, + TipoAnuncioFieldInstructions, + CalleFieldInstructions, + BarrioFieldInstructions, + DistritoFieldInstructions, + CiudadFieldInstructions, + SecondaryFeaturesFieldInstructions, + TelefonoFieldInstructions, +) + + +@pytest.fixture +def real_ad_html(): + html = """ + + + + + + + + + + + + + + + Garaje en venta en Passatge de Simó, 10, La Sagrada Família, Barcelona — idealista

Garaje en venta en Passatge de Simó, 10

La Sagrada Família, Barcelona Ver mapa
17.900
Plaza para coche pequeño 9,15 m²
Nota personal

Comentario del anunciante

Publicidad

Características básicas

  • Plaza para coche pequeño
  • 9,15 m²
  • Cubierta

Extras

  • Puerta automática de garaje
  • Personal de seguridad

Anuncio actualizado hace 7 días

¿Hay algún error en este anuncio?

Infórmanos para corregirlo y ayudar a otros usuarios.

Cuéntanos qué error has visto
Precio del inmueble: 17.900 €
Impuestos y gastos: -
Precio + gastos: -
%
%
Tu cuota mensual: 3.120
Analizar mi caso Estos resultados son orientativos, calculados con los números que has introducido. Condiciones generales.

Ubicación

  • Passatge de Simó, 10
  • Barrio La Sagrada Família
  • Distrito Eixample
  • Barcelona
  • Área de Barcelona, Barcelona

Estadísticas

Anuncio actualizado el 20 de diciembre

Ver número de visitas y contactos de este anuncio

Publicidad

+ """ + return html + + +@pytest.fixture +def unrelated_html(): + html = """ + + + + + +Primer on Python Decorators – Real Python + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+
+Python Decorators +
+

Primer on Python Decorators

+

+by Geir Arne Hjelle + + +intermediate +python +

+
+ +Tweet +Share +Email + +
+
+

+
+ + +
+

Watch Now This tutorial has a related video course created by the Real Python team. Watch it together with the written tutorial to deepen your understanding: Python Decorators 101

+
+

In this tutorial on decorators, we’ll look at what they are and how to create and use them. Decorators provide a simple syntax for calling higher-order functions.

+

By definition, a decorator is a function that takes another function and extends the behavior of the latter function without explicitly modifying it.

+

This sounds confusing, but it’s really not, especially after you’ve seen a few examples of how decorators work. You can find all the examples from this article here.

+ + + +

Updates:

+
    +
  • 08/22/2018: Major update adding more examples and more advanced decorators
  • +
  • 01/12/2016: Updated examples to Python 3 (v3.5.1) syntax and added a new example
  • +
  • 11/01/2015: Added a brief explanation on the functools.wraps() decorator
  • +
+

Functions

+

Before you can understand decorators, you must first understand how functions work. For our purposes, a function returns a value based on the given arguments. Here is a very simple example:

+
>>>
>>> def add_one(number):
+...     return number + 1
+
+>>> add_one(2)
+3
+
+

In general, functions in Python may also have side effects rather than just turning an input into an output. The print() function is a basic example of this: it returns None while having the side effect of outputting something to the console. However, to understand decorators, it is enough to think about functions as something that turns given arguments into a value.

+ +

First-Class Objects

+

In Python, functions are first-class objects. This means that functions can be passed around and used as arguments, just like any other object (string, int, float, list, and so on). Consider the following three functions:

+
def say_hello(name):
+    return f"Hello {name}"
+
+def be_awesome(name):
+    return f"Yo {name}, together we are the awesomest!"
+
+def greet_bob(greeter_func):
+    return greeter_func("Bob")
+
+

Here, say_hello() and be_awesome() are regular functions that expect a name given as a string. The greet_bob() function however, expects a function as its argument. We can, for instance, pass it the say_hello() or the be_awesome() function:

+
>>>
>>> greet_bob(say_hello)
+'Hello Bob'
+
+>>> greet_bob(be_awesome)
+'Yo Bob, together we are the awesomest!'
+
+

Note that greet_bob(say_hello) refers to two functions, but in different ways: greet_bob() and say_hello. The say_hello function is named without parentheses. This means that only a reference to the function is passed. The function is not executed. The greet_bob() function, on the other hand, is written with parentheses, so it will be called as usual.

+

Inner Functions

+

It’s possible to define functions inside other functions. Such functions are called inner functions. Here’s an example of a function with two inner functions:

+
def parent():
+    print("Printing from the parent() function")
+
+    def first_child():
+        print("Printing from the first_child() function")
+
+    def second_child():
+        print("Printing from the second_child() function")
+
+    second_child()
+    first_child()
+
+

What happens when you call the parent() function? Think about this for a minute. The output will be as follows:

+
>>>
>>> parent()
+Printing from the parent() function
+Printing from the second_child() function
+Printing from the first_child() function
+
+

Note that the order in which the inner functions are defined does not matter. Like with any other functions, the printing only happens when the inner functions are executed.

+

Furthermore, the inner functions are not defined until the parent function is called. They are locally scoped to parent(): they only exist inside the parent() function as local variables. Try calling first_child(). You should get an error:

+
Traceback (most recent call last):
+  File "<stdin>", line 1, in <module>
+NameError: name 'first_child' is not defined
+
+

Whenever you call parent(), the inner functions first_child() and second_child() are also called. But because of their local scope, they aren’t available outside of the parent() function.

+

Returning Functions From Functions

+

Python also allows you to use functions as return values. The following example returns one of the inner functions from the outer parent() function:

+
def parent(num):
+    def first_child():
+        return "Hi, I am Emma"
+
+    def second_child():
+        return "Call me Liam"
+
+    if num == 1:
+        return first_child
+    else:
+        return second_child
+
+

Note that you are returning first_child without the parentheses. Recall that this means that you are returning a reference to the function first_child. In contrast first_child() with parentheses refers to the result of evaluating the function. This can be seen in the following example:

+
>>>
>>> first = parent(1)
+>>> second = parent(2)
+
+>>> first
+<function parent.<locals>.first_child at 0x7f599f1e2e18>
+
+>>> second
+<function parent.<locals>.second_child at 0x7f599dad5268>
+
+

The somewhat cryptic output simply means that the first variable refers to the local first_child() function inside of parent(), while second points to second_child().

+

You can now use first and second as if they are regular functions, even though the functions they point to can’t be accessed directly:

+
>>>
>>> first()
+'Hi, I am Emma'
+
+>>> second()
+'Call me Liam'
+
+

Finally, note that in the earlier example you executed the inner functions within the parent function, for instance first_child(). However, in this last example, you did not add parentheses to the inner functions—first_child—upon returning. That way, you got a reference to each function that you could call in the future. Make sense?

+

Simple Decorators

+

Now that you’ve seen that functions are just like any other object in Python, you’re ready to move on and see the magical beast that is the Python decorator. Let’s start with an example:

+
def my_decorator(func):
+    def wrapper():
+        print("Something is happening before the function is called.")
+        func()
+        print("Something is happening after the function is called.")
+    return wrapper
+
+def say_whee():
+    print("Whee!")
+
+say_whee = my_decorator(say_whee)
+
+

Can you guess what happens when you call say_whee()? Try it:

+
>>>
>>> say_whee()
+Something is happening before the function is called.
+Whee!
+Something is happening after the function is called.
+
+

To understand what’s going on here, look back at the previous examples. We are literally just applying everything you have learned so far.

+

The so-called decoration happens at the following line:

+
say_whee = my_decorator(say_whee)
+
+

In effect, the name say_whee now points to the wrapper() inner function. Remember that you return wrapper as a function when you call my_decorator(say_whee):

+
>>>
>>> say_whee
+<function my_decorator.<locals>.wrapper at 0x7f3c5dfd42f0>
+
+

However, wrapper() has a reference to the original say_whee() as func, and calls that function between the two calls to print().

+

Put simply: decorators wrap a function, modifying its behavior.

+

Before moving on, let’s have a look at a second example. Because wrapper() is a regular Python function, the way a decorator modifies a function can change dynamically. So as not to disturb your neighbors, the following example will only run the decorated code during the day:

+
from datetime import datetime
+
+def not_during_the_night(func):
+    def wrapper():
+        if 7 <= datetime.now().hour < 22:
+            func()
+        else:
+            pass  # Hush, the neighbors are asleep
+    return wrapper
+
+def say_whee():
+    print("Whee!")
+
+say_whee = not_during_the_night(say_whee)
+
+

If you try to call say_whee() after bedtime, nothing will happen:

+
>>>
>>> say_whee()
+>>>
+
+

Syntactic Sugar!

+

The way you decorated say_whee() above is a little clunky. First of all, you end up typing the name say_whee three times. In addition, the decoration gets a bit hidden away below the definition of the function.

+

Instead, Python allows you to use decorators in a simpler way with the @ symbol, sometimes called the “pie” syntax. The following example does the exact same thing as the first decorator example:

+
def my_decorator(func):
+    def wrapper():
+        print("Something is happening before the function is called.")
+        func()
+        print("Something is happening after the function is called.")
+    return wrapper
+
+@my_decorator
+def say_whee():
+    print("Whee!")
+
+

So, @my_decorator is just an easier way of saying say_whee = my_decorator(say_whee). It’s how you apply a decorator to a function.

+

Reusing Decorators

+

Recall that a decorator is just a regular Python function. All the usual tools for easy reusability are available. Let’s move the decorator to its own module that can be used in many other functions.

+

Create a file called decorators.py with the following content:

+
def do_twice(func):
+    def wrapper_do_twice():
+        func()
+        func()
+    return wrapper_do_twice
+
+ +

You can now use this new decorator in other files by doing a regular import:

+
from decorators import do_twice
+
+@do_twice
+def say_whee():
+    print("Whee!")
+
+

When you run this example, you should see that the original say_whee() is executed twice:

+
>>>
>>> say_whee()
+Whee!
+Whee!
+
+ +

Decorating Functions With Arguments

+

Say that you have a function that accepts some arguments. Can you still decorate it? Let’s try:

+
from decorators import do_twice
+
+@do_twice
+def greet(name):
+    print(f"Hello {name}")
+
+

Unfortunately, running this code raises an error:

+
>>>
>>> greet("World")
+Traceback (most recent call last):
+  File "<stdin>", line 1, in <module>
+TypeError: wrapper_do_twice() takes 0 positional arguments but 1 was given
+
+

The problem is that the inner function wrapper_do_twice() does not take any arguments, but name="World" was passed to it. You could fix this by letting wrapper_do_twice() accept one argument, but then it would not work for the say_whee() function you created earlier.

+

The solution is to use *args and **kwargs in the inner wrapper function. Then it will accept an arbitrary number of positional and keyword arguments. Rewrite decorators.py as follows:

+
def do_twice(func):
+    def wrapper_do_twice(*args, **kwargs):
+        func(*args, **kwargs)
+        func(*args, **kwargs)
+    return wrapper_do_twice
+
+

The wrapper_do_twice() inner function now accepts any number of arguments and passes them on to the function it decorates. Now both your say_whee() and greet() examples works:

+
>>>
>>> say_whee()
+Whee!
+Whee!
+
+>>> greet("World")
+Hello World
+Hello World
+
+

Returning Values From Decorated Functions

+

What happens to the return value of decorated functions? Well, that’s up to the decorator to decide. Let’s say you decorate a simple function as follows:

+
from decorators import do_twice
+
+@do_twice
+def return_greeting(name):
+    print("Creating greeting")
+    return f"Hi {name}"
+
+

Try to use it:

+
>>>
>>> hi_adam = return_greeting("Adam")
+Creating greeting
+Creating greeting
+>>> print(hi_adam)
+None
+
+

Oops, your decorator ate the return value from the function.

+

Because the do_twice_wrapper() doesn’t explicitly return a value, the call return_greeting("Adam") ended up returning None.

+

To fix this, you need to make sure the wrapper function returns the return value of the decorated function. Change your decorators.py file:

+
def do_twice(func):
+    def wrapper_do_twice(*args, **kwargs):
+        func(*args, **kwargs)
+        return func(*args, **kwargs)
+    return wrapper_do_twice
+
+

The return value from the last execution of the function is returned:

+
>>>
>>> return_greeting("Adam")
+Creating greeting
+Creating greeting
+'Hi Adam'
+
+

Who Are You, Really?

+

A great convenience when working with Python, especially in the interactive shell, is its powerful introspection ability. Introspection is the ability of an object to know about its own attributes at runtime. For instance, a function knows its own name and documentation:

+
>>>
>>> print
+<built-in function print>
+
+>>> print.__name__
+'print'
+
+>>> help(print)
+Help on built-in function print in module builtins:
+
+print(...)
+    <full help message>
+
+

The introspection works for functions you define yourself as well:

+
>>>
>>> say_whee
+<function do_twice.<locals>.wrapper_do_twice at 0x7f43700e52f0>
+
+>>> say_whee.__name__
+'wrapper_do_twice'
+
+>>> help(say_whee)
+Help on function wrapper_do_twice in module decorators:
+
+wrapper_do_twice()
+
+

However, after being decorated, say_whee() has gotten very confused about its identity. It now reports being the wrapper_do_twice() inner function inside the do_twice() decorator. Although technically true, this is not very useful information.

+

To fix this, decorators should use the @functools.wraps decorator, which will preserve information about the original function. Update decorators.py again:

+
import functools
+
+def do_twice(func):
+    @functools.wraps(func)
+    def wrapper_do_twice(*args, **kwargs):
+        func(*args, **kwargs)
+        return func(*args, **kwargs)
+    return wrapper_do_twice
+
+

You do not need to change anything about the decorated say_whee() function:

+
>>>
>>> say_whee
+<function say_whee at 0x7ff79a60f2f0>
+
+>>> say_whee.__name__
+'say_whee'
+
+>>> help(say_whee)
+Help on function say_whee in module whee:
+
+say_whee()
+
+

Much better! Now say_whee() is still itself after decoration.

+ +

A Few Real World Examples

+

Let’s look at a few more useful examples of decorators. You’ll notice that they’ll mainly follow the same pattern that you’ve learned so far:

+
import functools
+
+def decorator(func):
+    @functools.wraps(func)
+    def wrapper_decorator(*args, **kwargs):
+        # Do something before
+        value = func(*args, **kwargs)
+        # Do something after
+        return value
+    return wrapper_decorator
+
+

This formula is a good boilerplate template for building more complex decorators.

+ +

Timing Functions

+

Let’s start by creating a @timer decorator. It will measure the time a function takes to execute and print the duration to the console. Here’s the code:

+
import functools
+import time
+
+def timer(func):
+    """Print the runtime of the decorated function"""
+    @functools.wraps(func)
+    def wrapper_timer(*args, **kwargs):
+        start_time = time.perf_counter()    # 1
+        value = func(*args, **kwargs)
+        end_time = time.perf_counter()      # 2
+        run_time = end_time - start_time    # 3
+        print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
+        return value
+    return wrapper_timer
+
+@timer
+def waste_some_time(num_times):
+    for _ in range(num_times):
+        sum([i**2 for i in range(10000)])
+
+

This decorator works by storing the time just before the function starts running (at the line marked # 1) and just after the function finishes (at # 2). The time the function takes is then the difference between the two (at # 3). We use the time.perf_counter() function, which does a good job of measuring time intervals. Here are some examples of timings:

+
>>>
>>> waste_some_time(1)
+Finished 'waste_some_time' in 0.0010 secs
+
+>>> waste_some_time(999)
+Finished 'waste_some_time' in 0.3260 secs
+
+

Run it yourself. Work through the code line by line. Make sure you understand how it works. Don’t worry if you don’t get it, though. Decorators are advanced beings. Try to sleep on it or make a drawing of the program flow.

+ +

Debugging Code

+

The following @debug decorator will print the arguments a function is called with as well as its return value every time the function is called:

+
import functools
+
+def debug(func):
+    """Print the function signature and return value"""
+    @functools.wraps(func)
+    def wrapper_debug(*args, **kwargs):
+        args_repr = [repr(a) for a in args]                      # 1
+        kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]  # 2
+        signature = ", ".join(args_repr + kwargs_repr)           # 3
+        print(f"Calling {func.__name__}({signature})")
+        value = func(*args, **kwargs)
+        print(f"{func.__name__!r} returned {value!r}")           # 4
+        return value
+    return wrapper_debug
+
+

The signature is created by joining the string representations of all the arguments. The numbers in the following list correspond to the numbered comments in the code:

+
    +
  1. Create a list of the positional arguments. Use repr() to get a nice string representing each argument.
  2. +
  3. Create a list of the keyword arguments. The f-string formats each argument as key=value where the !r specifier means that repr() is used to represent the value.
  4. +
  5. The lists of positional and keyword arguments is joined together to one signature string with each argument separated by a comma.
  6. +
  7. The return value is printed after the function is executed.
  8. +
+

Let’s see how the decorator works in practice by applying it to a simple function with one position and one keyword argument:

+
@debug
+def make_greeting(name, age=None):
+    if age is None:
+        return f"Howdy {name}!"
+    else:
+        return f"Whoa {name}! {age} already, you are growing up!"
+
+

Note how the @debug decorator prints the signature and return value of the make_greeting() function:

+
>>>
>>> make_greeting("Benjamin")
+Calling make_greeting('Benjamin')
+'make_greeting' returned 'Howdy Benjamin!'
+'Howdy Benjamin!'
+
+>>> make_greeting("Richard", age=112)
+Calling make_greeting('Richard', age=112)
+'make_greeting' returned 'Whoa Richard! 112 already, you are growing up!'
+'Whoa Richard! 112 already, you are growing up!'
+
+>>> make_greeting(name="Dorrisile", age=116)
+Calling make_greeting(name='Dorrisile', age=116)
+'make_greeting' returned 'Whoa Dorrisile! 116 already, you are growing up!'
+'Whoa Dorrisile! 116 already, you are growing up!'
+
+

This example might not seem immediately useful since the @debug decorator just repeats what you just wrote. It’s more powerful when applied to small convenience functions that you don’t call directly yourself.

+

The following example calculates an approximation to the mathematical constant e:

+
import math
+from decorators import debug
+
+# Apply a decorator to a standard library function
+math.factorial = debug(math.factorial)
+
+def approximate_e(terms=18):
+    return sum(1 / math.factorial(n) for n in range(terms))
+
+

This example also shows how you can apply a decorator to a function that has already been defined. The approximation of e is based on the following series expansion:

+
Series for calculating mathematical constant e
+

When calling the approximate_e() function, you can see the @debug decorator at work:

+
>>>
>>> approximate_e(5)
+Calling factorial(0)
+'factorial' returned 1
+Calling factorial(1)
+'factorial' returned 1
+Calling factorial(2)
+'factorial' returned 2
+Calling factorial(3)
+'factorial' returned 6
+Calling factorial(4)
+'factorial' returned 24
+2.708333333333333
+
+

In this example, you get a decent approximation to the true value e = 2.718281828, adding only 5 terms.

+

Slowing Down Code

+

This next example might not seem very useful. Why would you want to slow down your Python code? Probably the most common use case is that you want to rate-limit a function that continuously checks whether a resource—like a web page—has changed. The @slow_down decorator will sleep one second before it calls the decorated function:

+
import functools
+import time
+
+def slow_down(func):
+    """Sleep 1 second before calling the function"""
+    @functools.wraps(func)
+    def wrapper_slow_down(*args, **kwargs):
+        time.sleep(1)
+        return func(*args, **kwargs)
+    return wrapper_slow_down
+
+@slow_down
+def countdown(from_number):
+    if from_number < 1:
+        print("Liftoff!")
+    else:
+        print(from_number)
+        countdown(from_number - 1)
+
+

To see the effect of the @slow_down decorator, you really need to run the example yourself:

+
>>>
>>> countdown(3)
+3
+2
+1
+Liftoff!
+
+ +

The @slow_down decorator always sleeps for one second. Later, you’ll see how to control the rate by passing an argument to the decorator.

+

Registering Plugins

+

Decorators don’t have to wrap the function they’re decorating. They can also simply register that a function exists and return it unwrapped. This can be used, for instance, to create a light-weight plug-in architecture:

+
import random
+PLUGINS = dict()
+
+def register(func):
+    """Register a function as a plug-in"""
+    PLUGINS[func.__name__] = func
+    return func
+
+@register
+def say_hello(name):
+    return f"Hello {name}"
+
+@register
+def be_awesome(name):
+    return f"Yo {name}, together we are the awesomest!"
+
+def randomly_greet(name):
+    greeter, greeter_func = random.choice(list(PLUGINS.items()))
+    print(f"Using {greeter!r}")
+    return greeter_func(name)
+
+

The @register decorator simply stores a reference to the decorated function in the global PLUGINS dict. Note that you do not have to write an inner function or use @functools.wraps in this example because you are returning the original function unmodified.

+

The randomly_greet() function randomly chooses one of the registered functions to use. Note that the PLUGINS dictionary already contains references to each function object that is registered as a plugin:

+
>>>
>>> PLUGINS
+{'say_hello': <function say_hello at 0x7f768eae6730>,
+ 'be_awesome': <function be_awesome at 0x7f768eae67b8>}
+
+>>> randomly_greet("Alice")
+Using 'say_hello'
+'Hello Alice'
+
+

The main benefit of this simple plugin architecture is that you do not need to maintain a list of which plugins exist. That list is created when the plugins register themselves. This makes it trivial to add a new plugin: just define the function and decorate it with @register.

+

If you are familiar with globals() in Python, you might see some similarities to how the plugin architecture works. globals() gives access to all global variables in the current scope, including your plugins:

+
>>>
>>> globals()
+{..., # Lots of variables not shown here.
+ 'say_hello': <function say_hello at 0x7f768eae6730>,
+ 'be_awesome': <function be_awesome at 0x7f768eae67b8>,
+ 'randomly_greet': <function randomly_greet at 0x7f768eae6840>}
+
+

Using the @register decorator, you can create your own curated list of interesting variables, effectively hand-picking some functions from globals().

+

Is the User Logged In?

+

The final example before moving on to some fancier decorators is commonly used when working with a web framework. In this example, we are using Flask to set up a /secret web page that should only be visible to users that are logged in or otherwise authenticated:

+
from flask import Flask, g, request, redirect, url_for
+import functools
+app = Flask(__name__)
+
+def login_required(func):
+    """Make sure user is logged in before proceeding"""
+    @functools.wraps(func)
+    def wrapper_login_required(*args, **kwargs):
+        if g.user is None:
+            return redirect(url_for("login", next=request.url))
+        return func(*args, **kwargs)
+    return wrapper_login_required
+
+@app.route("/secret")
+@login_required
+def secret():
+    ...
+
+

While this gives an idea about how to add authentication to your web framework, you should usually not write these types of decorators yourself. For Flask, you can use the Flask-Login extension instead, which adds more security and functionality.

+

Fancy Decorators

+

So far, you’ve seen how to create simple decorators. You already have a pretty good understanding of what decorators are and how they work. Feel free to take a break from this article to practice everything you’ve learned.

+

In the second part of this tutorial, we’ll explore more advanced features, including how to use the following:

+ +

Decorating Classes

+

There are two different ways you can use decorators on classes. The first one is very close to what you have already done with functions: you can decorate the methods of a class. This was one of the motivations for introducing decorators back in the day.

+

Some commonly used decorators that are even built-ins in Python are @classmethod, @staticmethod, and @property. The @classmethod and @staticmethod decorators are used to define methods inside a class namespace that are not connected to a particular instance of that class. The @property decorator is used to customize getters and setters for class attributes. Expand the box below for an example using these decorators.

+
+
+

+
+
+
+

The following definition of a Circle class uses the @classmethod, @staticmethod, and @property decorators:

+
class Circle:
+    def __init__(self, radius):
+        self._radius = radius
+
+    @property
+    def radius(self):
+        """Get value of radius"""
+        return self._radius
+
+    @radius.setter
+    def radius(self, value):
+        """Set radius, raise error if negative"""
+        if value >= 0:
+            self._radius = value
+        else:
+            raise ValueError("Radius must be positive")
+
+    @property
+    def area(self):
+        """Calculate area inside circle"""
+        return self.pi() * self.radius**2
+
+    def cylinder_volume(self, height):
+        """Calculate volume of cylinder with circle as base"""
+        return self.area * height
+
+    @classmethod
+    def unit_circle(cls):
+        """Factory method creating a circle with radius 1"""
+        return cls(1)
+
+    @staticmethod
+    def pi():
+        """Value of π, could use math.pi instead though"""
+        return 3.1415926535
+
+

In this class:

+
    +
  • .cylinder_volume() is a regular method.
  • +
  • .radius is a mutable property: it can be set to a different value. However, by defining a setter method, we can do some error testing to make sure it’s not set to a nonsensical negative number. Properties are accessed as attributes without parentheses.
  • +
  • .area is an immutable property: properties without .setter() methods can’t be changed. Even though it is defined as a method, it can be retrieved as an attribute without parentheses.
  • +
  • .unit_circle() is a class method. It’s not bound to one particular instance of Circle. Class methods are often used as factory methods that can create specific instances of the class.
  • +
  • .pi() is a static method. It’s not really dependent on the Circle class, except that it is part of its namespace. Static methods can be called on either an instance or the class.
  • +
+

The Circle class can for example be used as follows:

+
>>>
>>> c = Circle(5)
+>>> c.radius
+5
+
+>>> c.area
+78.5398163375
+
+>>> c.radius = 2
+>>> c.area
+12.566370614
+
+>>> c.area = 100
+AttributeError: can't set attribute
+
+>>> c.cylinder_volume(height=4)
+50.265482456
+
+>>> c.radius = -1
+ValueError: Radius must be positive
+
+>>> c = Circle.unit_circle()
+>>> c.radius
+1
+
+>>> c.pi()
+3.1415926535
+
+>>> Circle.pi()
+3.1415926535
+
+
+
+
+

Let’s define a class where we decorate some of its methods using the @debug and @timer decorators from earlier:

+
from decorators import debug, timer
+
+class TimeWaster:
+    @debug
+    def __init__(self, max_num):
+        self.max_num = max_num
+
+    @timer
+    def waste_time(self, num_times):
+        for _ in range(num_times):
+            sum([i**2 for i in range(self.max_num)])
+
+

Using this class, you can see the effect of the decorators:

+
>>>
>>> tw = TimeWaster(1000)
+Calling __init__(<time_waster.TimeWaster object at 0x7efccce03908>, 1000)
+'__init__' returned None
+
+>>> tw.waste_time(999)
+Finished 'waste_time' in 0.3376 secs
+
+

The other way to use decorators on classes is to decorate the whole class. This is, for example, done in the new dataclasses module in Python 3.7:

+
from dataclasses import dataclass
+
+@dataclass
+class PlayingCard:
+    rank: str
+    suit: str
+
+

The meaning of the syntax is similar to the function decorators. In the example above, you could have done the decoration by writing PlayingCard = dataclass(PlayingCard).

+

A common use of class decorators is to be a simpler alternative to some use-cases of metaclasses. In both cases, you are changing the definition of a class dynamically.

+

Writing a class decorator is very similar to writing a function decorator. The only difference is that the decorator will receive a class and not a function as an argument. In fact, all the decorators you saw above will work as class decorators. When you are using them on a class instead of a function, their effect might not be what you want. In the following example, the @timer decorator is applied to a class:

+
from decorators import timer
+
+@timer
+class TimeWaster:
+    def __init__(self, max_num):
+        self.max_num = max_num
+
+    def waste_time(self, num_times):
+        for _ in range(num_times):
+            sum([i**2 for i in range(self.max_num)])
+
+

Decorating a class does not decorate its methods. Recall that @timer is just shorthand for TimeWaster = timer(TimeWaster).

+

Here, @timer only measures the time it takes to instantiate the class:

+
>>>
>>> tw = TimeWaster(1000)
+Finished 'TimeWaster' in 0.0000 secs
+
+>>> tw.waste_time(999)
+>>>
+
+

Later, you will see an example defining a proper class decorator, namely @singleton, which ensures that there is only one instance of a class.

+

Nesting Decorators

+

You can apply several decorators to a function by stacking them on top of each other:

+
from decorators import debug, do_twice
+
+@debug
+@do_twice
+def greet(name):
+    print(f"Hello {name}")
+
+

Think about this as the decorators being executed in the order they are listed. In other words, @debug calls @do_twice, which calls greet(), or debug(do_twice(greet())):

+
>>>
>>> greet("Eva")
+Calling greet('Eva')
+Hello Eva
+Hello Eva
+'greet' returned None
+
+

Observe the difference if we change the order of @debug and @do_twice:

+
from decorators import debug, do_twice
+
+@do_twice
+@debug
+def greet(name):
+    print(f"Hello {name}")
+
+

In this case, @do_twice will be applied to @debug as well:

+
>>>
>>> greet("Eva")
+Calling greet('Eva')
+Hello Eva
+'greet' returned None
+Calling greet('Eva')
+Hello Eva
+'greet' returned None
+
+

Decorators With Arguments

+

Sometimes, it’s useful to pass arguments to your decorators. For instance, @do_twice could be extended to a @repeat(num_times) decorator. The number of times to execute the decorated function could then be given as an argument.

+

This would allow you to do something like this:

+
@repeat(num_times=4)
+def greet(name):
+    print(f"Hello {name}")
+
+
>>>
>>> greet("World")
+Hello World
+Hello World
+Hello World
+Hello World
+
+

Think about how you could achieve this.

+

So far, the name written after the @ has referred to a function object that can be called with another function. To be consistent, you then need repeat(num_times=4) to return a function object that can act as a decorator. Luckily, you already know how to return functions! In general, you want something like the following:

+
def repeat(num_times):
+    def decorator_repeat(func):
+        ...  # Create and return a wrapper function
+    return decorator_repeat
+
+

Typically, the decorator creates and returns an inner wrapper function, so writing the example out in full will give you an inner function within an inner function. While this might sound like the programming equivalent of the Inception movie, we’ll untangle it all in a moment:

+
def repeat(num_times):
+    def decorator_repeat(func):
+        @functools.wraps(func)
+        def wrapper_repeat(*args, **kwargs):
+            for _ in range(num_times):
+                value = func(*args, **kwargs)
+            return value
+        return wrapper_repeat
+    return decorator_repeat
+
+

It looks a little messy, but we have only put the same decorator pattern you have seen many times by now inside one additional def that handles the arguments to the decorator. Let’s start with the innermost function:

+
def wrapper_repeat(*args, **kwargs):
+    for _ in range(num_times):
+        value = func(*args, **kwargs)
+    return value
+
+

This wrapper_repeat() function takes arbitrary arguments and returns the value of the decorated function, func(). This wrapper function also contains the loop that calls the decorated function num_times times. This is no different from the earlier wrapper functions you have seen, except that it is using the num_times parameter that must be supplied from the outside.

+

One step out, you’ll find the decorator function:

+
def decorator_repeat(func):
+    @functools.wraps(func)
+    def wrapper_repeat(*args, **kwargs):
+        ...
+    return wrapper_repeat
+
+

Again, decorator_repeat() looks exactly like the decorator functions you have written earlier, except that it’s named differently. That’s because we reserve the base name—repeat()—for the outermost function, which is the one the user will call.

+

As you have already seen, the outermost function returns a reference to the decorator function:

+
def repeat(num_times):
+    def decorator_repeat(func):
+        ...
+    return decorator_repeat
+
+

There are a few subtle things happening in the repeat() function:

+
    +
  • Defining decorator_repeat() as an inner function means that repeat() will refer to a function object—decorator_repeat. Earlier, we used repeat without parentheses to refer to the function object. The added parentheses are necessary when defining decorators that take arguments.
  • +
  • The num_times argument is seemingly not used in repeat() itself. But by passing num_times a closure is created where the value of num_times is stored until it will be used later by wrapper_repeat().
  • +
+

With everything set up, let’s see if the results are as expected:

+
@repeat(num_times=4)
+def greet(name):
+    print(f"Hello {name}")
+
+
>>>
>>> greet("World")
+Hello World
+Hello World
+Hello World
+Hello World
+
+

Just the result we were aiming for.

+

Both Please, But Never Mind the Bread

+

With a little bit of care, you can also define decorators that can be used both with and without arguments. Most likely, you don’t need this, but it is nice to have the flexibility.

+

As you saw in the previous section, when a decorator uses arguments, you need to add an extra outer function. The challenge is for your code to figure out if the decorator has been called with or without arguments.

+

Since the function to decorate is only passed in directly if the decorator is called without arguments, the function must be an optional argument. This means that the decorator arguments must all be specified by keyword. You can enforce this with the special * syntax, which means that all following parameters are keyword-only:

+
def name(_func=None, *, kw1=val1, kw2=val2, ...):  # 1
+    def decorator_name(func):
+        ...  # Create and return a wrapper function.
+
+    if _func is None:
+        return decorator_name                      # 2
+    else:
+        return decorator_name(_func)               # 3
+
+

Here, the _func argument acts as a marker, noting whether the decorator has been called with arguments or not:

+
    +
  1. If name has been called without arguments, the decorated function will be passed in as _func. If it has been called with arguments, then _func will be None, and some of the keyword arguments may have been changed from their default values. The * in the argument list means that the remaining arguments can’t be called as positional arguments.
  2. +
  3. In this case, the decorator was called with arguments. Return a decorator function that can read and return a function.
  4. +
  5. In this case, the decorator was called without arguments. Apply the decorator to the function immediately.
  6. +
+

Using this boilerplate on the @repeat decorator in the previous section, you can write the following:

+
def repeat(_func=None, *, num_times=2):
+    def decorator_repeat(func):
+        @functools.wraps(func)
+        def wrapper_repeat(*args, **kwargs):
+            for _ in range(num_times):
+                value = func(*args, **kwargs)
+            return value
+        return wrapper_repeat
+
+    if _func is None:
+        return decorator_repeat
+    else:
+        return decorator_repeat(_func)
+
+

Compare this with the original @repeat. The only changes are the added _func parameter and the if-else at the end.

+

Recipe 9.6 of the excellent Python Cookbook shows an alternative solution using functools.partial().

+

These examples show that @repeat can now be used with or without arguments:

+
@repeat
+def say_whee():
+    print("Whee!")
+
+@repeat(num_times=3)
+def greet(name):
+    print(f"Hello {name}")
+
+

Recall that the default value of num_times is 2:

+
>>>
>>> say_whee()
+Whee!
+Whee!
+
+>>> greet("Penny")
+Hello Penny
+Hello Penny
+Hello Penny
+
+

Stateful Decorators

+

Sometimes, it’s useful to have a decorator that can keep track of state. As a simple example, we will create a decorator that counts the number of times a function is called.

+ +

In the next section, you will see how to use classes to keep state. But in simple cases, you can also get away with using function attributes:

+
import functools
+
+def count_calls(func):
+    @functools.wraps(func)
+    def wrapper_count_calls(*args, **kwargs):
+        wrapper_count_calls.num_calls += 1
+        print(f"Call {wrapper_count_calls.num_calls} of {func.__name__!r}")
+        return func(*args, **kwargs)
+    wrapper_count_calls.num_calls = 0
+    return wrapper_count_calls
+
+@count_calls
+def say_whee():
+    print("Whee!")
+
+

The state—the number of calls to the function—is stored in the function attribute .num_calls on the wrapper function. Here is the effect of using it:

+
>>>
>>> say_whee()
+Call 1 of 'say_whee'
+Whee!
+
+>>> say_whee()
+Call 2 of 'say_whee'
+Whee!
+
+>>> say_whee.num_calls
+2
+
+

Classes as Decorators

+

The typical way to maintain state is by using classes. In this section, you’ll see how to rewrite the @count_calls example from the previous section using a class as a decorator.

+

Recall that the decorator syntax @my_decorator is just an easier way of saying func = my_decorator(func). Therefore, if my_decorator is a class, it needs to take func as an argument in its .__init__() method. Furthermore, the class instance needs to be callable so that it can stand in for the decorated function.

+

For a class instance to be callable, you implement the special .__call__() method:

+
class Counter:
+    def __init__(self, start=0):
+        self.count = start
+
+    def __call__(self):
+        self.count += 1
+        print(f"Current count is {self.count}")
+
+

The .__call__() method is executed each time you try to call an instance of the class:

+
>>>
>>> counter = Counter()
+>>> counter()
+Current count is 1
+
+>>> counter()
+Current count is 2
+
+>>> counter.count
+2
+
+

Therefore, a typical implementation of a decorator class needs to implement .__init__() and .__call__():

+
import functools
+
+class CountCalls:
+    def __init__(self, func):
+        functools.update_wrapper(self, func)
+        self.func = func
+        self.num_calls = 0
+
+    def __call__(self, *args, **kwargs):
+        self.num_calls += 1
+        print(f"Call {self.num_calls} of {self.func.__name__!r}")
+        return self.func(*args, **kwargs)
+
+@CountCalls
+def say_whee():
+    print("Whee!")
+
+

The .__init__() method must store a reference to the function and can do any other necessary initialization. The .__call__() method will be called instead of the decorated function. It does essentially the same thing as the wrapper() function in our earlier examples. Note that you need to use the functools.update_wrapper() function instead of @functools.wraps.

+

This @CountCalls decorator works the same as the one in the previous section:

+
>>>
>>> say_whee()
+Call 1 of 'say_whee'
+Whee!
+
+>>> say_whee()
+Call 2 of 'say_whee'
+Whee!
+
+>>> say_whee.num_calls
+2
+
+

More Real World Examples

+

We’ve come a far way now, having figured out how to create all kinds of decorators. Let’s wrap it up, putting our newfound knowledge into creating a few more examples that might actually be useful in the real world.

+

Slowing Down Code, Revisited

+

As noted earlier, our previous implementation of @slow_down always sleeps for one second. Now you know how to add parameters to decorators, so let’s rewrite @slow_down using an optional rate argument that controls how long it sleeps:

+
import functools
+import time
+
+def slow_down(_func=None, *, rate=1):
+    """Sleep given amount of seconds before calling the function"""
+    def decorator_slow_down(func):
+        @functools.wraps(func)
+        def wrapper_slow_down(*args, **kwargs):
+            time.sleep(rate)
+            return func(*args, **kwargs)
+        return wrapper_slow_down
+
+    if _func is None:
+        return decorator_slow_down
+    else:
+        return decorator_slow_down(_func)
+
+

We’re using the boilerplate introduced in the Both Please, But Never Mind the Bread section to make @slow_down callable both with and without arguments. The same recursive countdown() function as earlier now sleeps two seconds between each count:

+
@slow_down(rate=2)
+def countdown(from_number):
+    if from_number < 1:
+        print("Liftoff!")
+    else:
+        print(from_number)
+        countdown(from_number - 1)
+
+

As before, you must run the example yourself to see the effect of the decorator:

+
>>>
>>> countdown(3)
+3
+2
+1
+Liftoff!
+
+

Creating Singletons

+

A singleton is a class with only one instance. There are several singletons in Python that you use frequently, including None, True, and False. It is the fact that None is a singleton that allows you to compare for None using the is keyword, like you saw in the Both Please section:

+
if _func is None:
+    return decorator_name
+else:
+    return decorator_name(_func)
+
+

Using is returns True only for objects that are the exact same instance. The following @singleton decorator turns a class into a singleton by storing the first instance of the class as an attribute. Later attempts at creating an instance simply return the stored instance:

+
import functools
+
+def singleton(cls):
+    """Make a class a Singleton class (only one instance)"""
+    @functools.wraps(cls)
+    def wrapper_singleton(*args, **kwargs):
+        if not wrapper_singleton.instance:
+            wrapper_singleton.instance = cls(*args, **kwargs)
+        return wrapper_singleton.instance
+    wrapper_singleton.instance = None
+    return wrapper_singleton
+
+@singleton
+class TheOne:
+    pass
+
+

As you see, this class decorator follows the same template as our function decorators. The only difference is that we are using cls instead of func as the parameter name to indicate that it is meant to be a class decorator.

+

Let’s see if it works:

+
>>>
>>> first_one = TheOne()
+>>> another_one = TheOne()
+
+>>> id(first_one)
+140094218762280
+
+>>> id(another_one)
+140094218762280
+
+>>> first_one is another_one
+True
+
+

It seems clear that first_one is indeed the exact same instance as another_one.

+ +

Caching Return Values

+

Decorators can provide a nice mechanism for caching and memoization. As an example, let’s look at a recursive definition of the Fibonacci sequence:

+
from decorators import count_calls
+
+@count_calls
+def fibonacci(num):
+    if num < 2:
+        return num
+    return fibonacci(num - 1) + fibonacci(num - 2)
+
+

While the implementation is simple, its runtime performance is terrible:

+
>>>
>>> fibonacci(10)
+<Lots of output from count_calls>
+55
+
+>>> fibonacci.num_calls
+177
+
+

To calculate the tenth Fibonacci number, you should really only need to calculate the preceding Fibonacci numbers, but this implementation somehow needs a whopping 177 calculations. It gets worse quickly: 21891 calculations are needed for fibonacci(20) and almost 2.7 million calculations for the 30th number. This is because the code keeps recalculating Fibonacci numbers that are already known.

+

The usual solution is to implement Fibonacci numbers using a for loop and a lookup table. However, simple caching of the calculations will also do the trick:

+
import functools
+from decorators import count_calls
+
+def cache(func):
+    """Keep a cache of previous function calls"""
+    @functools.wraps(func)
+    def wrapper_cache(*args, **kwargs):
+        cache_key = args + tuple(kwargs.items())
+        if cache_key not in wrapper_cache.cache:
+            wrapper_cache.cache[cache_key] = func(*args, **kwargs)
+        return wrapper_cache.cache[cache_key]
+    wrapper_cache.cache = dict()
+    return wrapper_cache
+
+@cache
+@count_calls
+def fibonacci(num):
+    if num < 2:
+        return num
+    return fibonacci(num - 1) + fibonacci(num - 2)
+
+

The cache works as a lookup table, so now fibonacci() only does the necessary calculations once:

+
>>>
>>> fibonacci(10)
+Call 1 of 'fibonacci'
+...
+Call 11 of 'fibonacci'
+55
+
+>>> fibonacci(8)
+21
+
+

Note that in the final call to fibonacci(8), no new calculations were needed, since the eighth Fibonacci number had already been calculated for fibonacci(10).

+

In the standard library, a Least Recently Used (LRU) cache is available as @functools.lru_cache.

+

This decorator has more features than the one you saw above. You should use @functools.lru_cache instead of writing your own cache decorator:

+
import functools
+
+@functools.lru_cache(maxsize=4)
+def fibonacci(num):
+    print(f"Calculating fibonacci({num})")
+    if num < 2:
+        return num
+    return fibonacci(num - 1) + fibonacci(num - 2)
+
+

The maxsize parameter specifies how many recent calls are cached. The default value is 128, but you can specify maxsize=None to cache all function calls. However, be aware that this can cause memory problems if you are caching many large objects.

+

You can use the .cache_info() method to see how the cache performs, and you can tune it if needed. In our example, we used an artificially small maxsize to see the effect of elements being removed from the cache:

+
>>>
>>> fibonacci(10)
+Calculating fibonacci(10)
+Calculating fibonacci(9)
+Calculating fibonacci(8)
+Calculating fibonacci(7)
+Calculating fibonacci(6)
+Calculating fibonacci(5)
+Calculating fibonacci(4)
+Calculating fibonacci(3)
+Calculating fibonacci(2)
+Calculating fibonacci(1)
+Calculating fibonacci(0)
+55
+
+>>> fibonacci(8)
+21
+
+>>> fibonacci(5)
+Calculating fibonacci(5)
+Calculating fibonacci(4)
+Calculating fibonacci(3)
+Calculating fibonacci(2)
+Calculating fibonacci(1)
+Calculating fibonacci(0)
+5
+
+>>> fibonacci(8)
+Calculating fibonacci(8)
+Calculating fibonacci(7)
+Calculating fibonacci(6)
+21
+
+>>> fibonacci(5)
+5
+
+>>> fibonacci.cache_info()
+CacheInfo(hits=17, misses=20, maxsize=4, currsize=4)
+
+

Adding Information About Units

+

The following example is somewhat similar to the Registering Plugins example from earlier, in that it does not really change the behavior of the decorated function. Instead, it simply adds unit as a function attribute:

+
def set_unit(unit):
+    """Register a unit on a function"""
+    def decorator_set_unit(func):
+        func.unit = unit
+        return func
+    return decorator_set_unit
+
+

The following example calculates the volume of a cylinder based on its radius and height in centimeters:

+
import math
+
+@set_unit("cm^3")
+def volume(radius, height):
+    return math.pi * radius**2 * height
+
+

This .unit function attribute can later be accessed when needed:

+
>>>
>>> volume(3, 5)
+141.3716694115407
+
+>>> volume.unit
+'cm^3'
+
+

Note that you could have achieved something similar using function annotations:

+
import math
+
+def volume(radius, height) -> "cm^3":
+    return math.pi * radius**2 * height
+
+

However, since annotations are used for type hints, it would be hard to combine such units as annotations with static type checking.

+

Units become even more powerful and fun when connected with a library that can convert between units. One such library is pint. With pint installed (pip install Pint), you can for instance convert the volume to cubic inches or gallons:

+
>>>
>>> import pint
+>>> ureg = pint.UnitRegistry()
+>>> vol = volume(3, 5) * ureg(volume.unit)
+
+>>> vol
+<Quantity(141.3716694115407, 'centimeter ** 3')>
+
+>>> vol.to("cubic inches")
+<Quantity(8.627028576414954, 'inch ** 3')>
+
+>>> vol.to("gallons").m  # Magnitude
+0.0373464440537444
+
+

You could also modify the decorator to return a pint Quantity directly. Such a Quantity is made by multiplying a value with the unit. In pint, units must be looked up in a UnitRegistry. The registry is stored as a function attribute to avoid cluttering the namespace:

+
def use_unit(unit):
+    """Have a function return a Quantity with given unit"""
+    use_unit.ureg = pint.UnitRegistry()
+    def decorator_use_unit(func):
+        @functools.wraps(func)
+        def wrapper_use_unit(*args, **kwargs):
+            value = func(*args, **kwargs)
+            return value * use_unit.ureg(unit)
+        return wrapper_use_unit
+    return decorator_use_unit
+
+@use_unit("meters per second")
+def average_speed(distance, duration):
+    return distance / duration
+
+

With the @use_unit decorator, converting units is practically effortless:

+
>>>
>>> bolt = average_speed(100, 9.58)
+>>> bolt
+<Quantity(10.438413361169102, 'meter / second')>
+
+>>> bolt.to("km per hour")
+<Quantity(37.578288100208766, 'kilometer / hour')>
+
+>>> bolt.to("mph").m  # Magnitude
+23.350065679064745
+
+

Validating JSON

+

Let’s look at one last use case. Take a quick look at the following Flask route handler:

+
@app.route("/grade", methods=["POST"])
+def update_grade():
+    json_data = request.get_json()
+    if "student_id" not in json_data:
+        abort(400)
+    # Update database
+    return "success!"
+
+

Here we ensure that the key student_id is part of the request. Although this validation works, it really does not belong in the function itself. Plus, perhaps there are other routes that use the exact same validation. So, let’s keep it DRY and abstract out any unnecessary logic with a decorator. The following @validate_json decorator will do the job:

+
from flask import Flask, request, abort
+import functools
+app = Flask(__name__)
+
+def validate_json(*expected_args):                  # 1
+    def decorator_validate_json(func):
+        @functools.wraps(func)
+        def wrapper_validate_json(*args, **kwargs):
+            json_object = request.get_json()
+            for expected_arg in expected_args:      # 2
+                if expected_arg not in json_object:
+                    abort(400)
+            return func(*args, **kwargs)
+        return wrapper_validate_json
+    return decorator_validate_json
+
+

In the above code, the decorator takes a variable length list as an argument so that we can pass in as many string arguments as necessary, each representing a key used to validate the JSON data:

+
    +
  1. The list of keys that must be present in the JSON is given as arguments to the decorator.
  2. +
  3. The wrapper function validates that each expected key is present in the JSON data.
  4. +
+

The route handler can then focus on its real job—updating grades—as it can safely assume that JSON data are valid:

+
@app.route("/grade", methods=["POST"])
+@validate_json("student_id")
+def update_grade():
+    json_data = request.get_json()
+    # Update database.
+    return "success!"
+
+

Conclusion

+

This has been quite a journey! You started this tutorial by looking a little closer at functions, particularly how they can be defined inside other functions and passed around just like any other Python object. Then you learned about decorators and how to write them such that:

+
    +
  • They can be reused.
  • +
  • They can decorate functions with arguments and return values.
  • +
  • They can use @functools.wraps to look more like the decorated function.
  • +
+

In the second part of the tutorial, you saw more advanced decorators and learned how to:

+
    +
  • Decorate classes
  • +
  • Nest decorators
  • +
  • Add arguments to decorators
  • +
  • Keep state within decorators
  • +
  • Use classes as decorators
  • +
+

You saw that, to define a decorator, you typically define a function returning a wrapper function. The wrapper function uses *args and **kwargs to pass on arguments to the decorated function. If you want your decorator to also take arguments, you need to nest the wrapper function inside another function. In this case, you usually end up with three return statements.

+

You can find the code from this tutorial online.

+

Further Reading

+

If you are still looking for more, our book Python Tricks has a section on decorators, as does the Python Cookbook by David Beazley and Brian K. Jones.

+

For a deep dive into the historical discussion on how decorators should be implemented in Python, see PEP 318 as well as the Python Decorator Wiki. More examples of decorators can be found in the Python Decorator Library. The decorator module can simplify creating your own decorators, and its documentation contains further decorator examples.

+

Also, we’ve put together a short & sweet Python decorators cheat sheet for you:

+ +
+
+

Watch Now This tutorial has a related video course created by the Real Python team. Watch it together with the written tutorial to deepen your understanding: Python Decorators 101

+
+
+
+

🐍 Python Tricks 💌

+
+
+
+
+

Get a short & sweet Python Trick delivered to your inbox every couple of days. No spam ever. Unsubscribe any time. Curated by the Real Python team.

+
+
+Python Tricks Dictionary Merge +
+
+
+
+ + +
+ +
+ +
+
+
+
+
+
+

About Geir Arne Hjelle

+
+
+
+
+Geir Arne Hjelle +Geir Arne Hjelle +
+
+

Geir Arne is an avid Pythonista and a member of the Real Python tutorial team.

+» More about Geir Arne +
+
+
+
+
+
+
+
+

Each tutorial at Real Python is created by a team of developers so that it meets our high quality standards. The team members who worked on this tutorial are:

+
+
+
+Aldren Santos +
+
+

Aldren

+
+
+Brad Solomon +
+
+

Brad

+
+
+Dan Bader +
+
+

Dan

+
+
+
+
+Joanna Jablonski +
+
+

Joanna

+
+
+Michael Herman +
+ +
+
+
+
+
+
+
+
+

Master Real-World Python Skills With Unlimited Access to Real Python

+

+

Join us and get access to hundreds of tutorials, hands-on video courses, and a community of expert Pythonistas:

+

Level Up Your Python Skills » +

+
+

Master Real-World Python Skills
With Unlimited Access to Real Python

+

+

Join us and get access to hundreds of tutorials, hands-on video courses, and a community of expert Pythonistas:

+

Level Up Your Python Skills » +

+
+
+

What Do You Think?

+
+ +Tweet +Share +Email + +
+
+
+

Real Python Comment Policy: The most useful comments are those written with the goal of learning from or helping out other readers—after reading the whole article and all the earlier comments. Complaints and insults generally won’t make the cut here.

+
+

What’s your #1 takeaway or favorite thing you learned? How are you going to put your newfound skills to use? Leave a comment below and let us know.

+
+
+
+
+
+

Keep Learning

+
+

Related Tutorial Categories: +intermediate +python +

+

Recommended Video Course: Python Decorators 101

+
+
+ +
+ +
+
+ + + + + + + + + + + + + + + + + + + + + """ + return html + + +def test_referencia_instructions_extract_correctly(real_ad_html): + + soup = BeautifulSoup(real_ad_html, "html5lib") + + referencia_instructions = ReferenciaFieldInstructions() + + referencia_instructions.scrape(soup) + referencia_instructions.validate() + + assert ( + referencia_instructions.found == True + and referencia_instructions.valid == True + and referencia_instructions.value is not None + and referencia_instructions.search_issue is None + ) + + +def test_referencia_instructions_find_nothing_in_unrelated_html(unrelated_html): + soup = BeautifulSoup(unrelated_html, "html5lib") + + referencia_instructions = ReferenciaFieldInstructions() + + referencia_instructions.scrape(soup) + referencia_instructions.validate() + + assert ( + referencia_instructions.found == False + and referencia_instructions.valid is None + and referencia_instructions.value is None + and referencia_instructions.search_issue is not None + ) + + +def test_all_instructions_extract_correctly(real_ad_html): + soup = BeautifulSoup(real_ad_html, "html5lib") + + all_instructions = [ + ReferenciaFieldInstructions(), + PrecioFieldInstructions(), + TamanoCategoricoFieldInstructions(), + M2FieldInstructions(), + TipoAnuncioFieldInstructions(), + CalleFieldInstructions(), + BarrioFieldInstructions(), + DistritoFieldInstructions(), + CiudadFieldInstructions(), + SecondaryFeaturesFieldInstructions( + field_name="cubierta", search_keyword="Cubierta" + ), + SecondaryFeaturesFieldInstructions( + field_name="puerta_auto", search_keyword="Puerta" + ), + SecondaryFeaturesFieldInstructions( + field_name="ascensor", search_keyword="ascensor" + ), + SecondaryFeaturesFieldInstructions( + field_name="alarma", search_keyword="Alarma" + ), + SecondaryFeaturesFieldInstructions( + field_name="circuito", search_keyword="Cámaras" + ), + SecondaryFeaturesFieldInstructions( + field_name="personal", search_keyword="Personal" + ), + TelefonoFieldInstructions(), + ] + + for instruction in all_instructions: + instruction.scrape(soup).validate() + + assert all( + [ + instruction.found == True + and instruction.valid == True + and instruction.value is not None + and instruction.search_issue is None + for instruction in all_instructions + ] + ) + + +def test_all_instructions_fail_on_unrelated_html(unrelated_html): + soup = BeautifulSoup(unrelated_html, "html5lib") + + all_instructions = [ + ReferenciaFieldInstructions(), + PrecioFieldInstructions(), + TamanoCategoricoFieldInstructions(), + M2FieldInstructions(), + TipoAnuncioFieldInstructions(), + CalleFieldInstructions(), + BarrioFieldInstructions(), + DistritoFieldInstructions(), + CiudadFieldInstructions(), + SecondaryFeaturesFieldInstructions( + field_name="cubierta", search_keyword="Cubierta" + ), + SecondaryFeaturesFieldInstructions( + field_name="puerta_auto", search_keyword="Puerta" + ), + SecondaryFeaturesFieldInstructions( + field_name="ascensor", search_keyword="ascensor" + ), + SecondaryFeaturesFieldInstructions( + field_name="alarma", search_keyword="Alarma" + ), + SecondaryFeaturesFieldInstructions( + field_name="circuito", search_keyword="Cámaras" + ), + SecondaryFeaturesFieldInstructions( + field_name="personal", search_keyword="Personal" + ), + TelefonoFieldInstructions(), + ] + + for instruction in all_instructions: + instruction.scrape(soup).validate() + + assert all( + [ + instruction.found == False + and (instruction.valid == False or instruction.valid == None) + and instruction.value is None + for instruction in all_instructions + ] + ) + + +def test_parsing_flow_works_for_ad_html(real_ad_html): + soup = BeautifulSoup(real_ad_html, "html5lib") + + parsing_flow = ParsingFlow() + + all_instructions = [ + ReferenciaFieldInstructions(), + PrecioFieldInstructions(), + TamanoCategoricoFieldInstructions(), + M2FieldInstructions(), + TipoAnuncioFieldInstructions(), + CalleFieldInstructions(), + BarrioFieldInstructions(), + DistritoFieldInstructions(), + CiudadFieldInstructions(), + SecondaryFeaturesFieldInstructions( + field_name="cubierta", search_keyword="Cubierta" + ), + SecondaryFeaturesFieldInstructions( + field_name="puerta_auto", search_keyword="Puerta" + ), + SecondaryFeaturesFieldInstructions( + field_name="ascensor", search_keyword="ascensor" + ), + SecondaryFeaturesFieldInstructions( + field_name="alarma", search_keyword="Alarma" + ), + SecondaryFeaturesFieldInstructions( + field_name="circuito", search_keyword="Cámaras" + ), + SecondaryFeaturesFieldInstructions( + field_name="personal", search_keyword="Personal" + ), + TelefonoFieldInstructions(), + ] + + parsing_flow.add_instructions(all_instructions) + + parsing_flow.execute_flow(soup) + + assert ( + parsing_flow.all_non_optional_fields_were_found + and parsing_flow.all_found_fields_are_valid + ) + + +def test_parsing_flow_fails_for_unrelated_html(unrelated_html): + soup = BeautifulSoup(unrelated_html, "html5lib") + + parsing_flow = ParsingFlow() + + all_instructions = [ + ReferenciaFieldInstructions(), + PrecioFieldInstructions(), + TamanoCategoricoFieldInstructions(), + M2FieldInstructions(), + TipoAnuncioFieldInstructions(), + CalleFieldInstructions(), + BarrioFieldInstructions(), + DistritoFieldInstructions(), + CiudadFieldInstructions(), + SecondaryFeaturesFieldInstructions( + field_name="cubierta", search_keyword="Cubierta" + ), + SecondaryFeaturesFieldInstructions( + field_name="puerta_auto", search_keyword="Puerta" + ), + SecondaryFeaturesFieldInstructions( + field_name="ascensor", search_keyword="ascensor" + ), + SecondaryFeaturesFieldInstructions( + field_name="alarma", search_keyword="Alarma" + ), + SecondaryFeaturesFieldInstructions( + field_name="circuito", search_keyword="Cámaras" + ), + SecondaryFeaturesFieldInstructions( + field_name="personal", search_keyword="Personal" + ), + TelefonoFieldInstructions(), + ] + + parsing_flow.add_instructions(all_instructions) + + parsing_flow.execute_flow(soup) + + assert not parsing_flow.all_non_optional_fields_were_found and len( + parsing_flow.issues + ) == len(all_instructions) From cb553b5f7ef0660b634987d8c44899934e62e63d Mon Sep 17 00:00:00 2001 From: pablo Date: Tue, 29 Dec 2020 20:42:21 +0100 Subject: [PATCH 05/13] Minor fixes in parsing utils. --- core/parsing_utils.py | 8 ++++++++ tests/parsing_utils_test.py | 1 + 2 files changed, 9 insertions(+) diff --git a/core/parsing_utils.py b/core/parsing_utils.py index 0cd7259..6b466f1 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -498,6 +498,14 @@ class ParsingFlow: for instruction in self._instructions: instruction.scrape(soup).validate() + @property + def field_values(self) -> Dict: + """ + Return the value for all fields, or None. + :return: a dict with the field names and values + """ + return {field.field_name: field.value for field in self._instructions} + @property def all_found_fields_are_valid(self) -> bool: """ diff --git a/tests/parsing_utils_test.py b/tests/parsing_utils_test.py index 533b4ba..a0a286a 100644 --- a/tests/parsing_utils_test.py +++ b/tests/parsing_utils_test.py @@ -2678,6 +2678,7 @@ def test_parsing_flow_works_for_ad_html(real_ad_html): assert ( parsing_flow.all_non_optional_fields_were_found and parsing_flow.all_found_fields_are_valid + and len(parsing_flow.field_values) == len(all_instructions) ) From b8d4893026fd89ef4edcc9f8226553d4d29f1927 Mon Sep 17 00:00:00 2001 From: pablo Date: Thu, 31 Dec 2020 18:14:44 +0100 Subject: [PATCH 06/13] Mini syntax fix. --- core/parsing_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/parsing_utils.py b/core/parsing_utils.py index 6b466f1..b732cf4 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -513,7 +513,7 @@ class ParsingFlow: :return: True if the fields are valid, False otherwise """ relevant_fields = [ - field.valid for field in self._instructions if field.found == True + field.valid for field in self._instructions if field.found is True ] return all(relevant_fields) From 2b249063e0b7ab2e9c66a13f800538b5747ce2a7 Mon Sep 17 00:00:00 2001 From: pablo Date: Thu, 31 Dec 2020 18:28:48 +0100 Subject: [PATCH 07/13] Created a new flow generator + tests for it. --- core/parsing_utils.py | 37 ++++++++++++++++++++++++++++++++++++- tests/parsing_utils_test.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/core/parsing_utils.py b/core/parsing_utils.py index b732cf4..019e729 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -1,4 +1,4 @@ -from typing import Union, Iterable, Dict, Callable +from typing import Union, Iterable, Dict, Callable, Type import re from bs4 import BeautifulSoup @@ -553,3 +553,38 @@ class ParsingFlow: issues[field.field_name] = this_field_issues return issues + + +class ParsingFlowGenerator: + """ + Class for creating multiple, empty flows based on a group of instructions. + """ + + def __init__( + self, + parsing_flow_class: Type[ParsingFlow], + instructions_to_attach_with_params: Dict[ + Type[BaseTargetFieldInstructions], dict + ], + ) -> None: + """ + Set the flow class and group of instructions to use when creating new + instances of the flow class. + :param parsing_flow_class: the flow class to instantiate + :param instructions_to_attach_with_params: a key-value pair of field + instructions class and the paramteres to use when instantiating them + """ + self._parsing_flow_class = parsing_flow_class + self._instructions_to_attach_with_params = instructions_to_attach_with_params + + def get_new_flow(self) -> ParsingFlow: + """ + Instantiate a new parsing flow with the instantiated classes attached. + :return: the new parsing flow + """ + new_parsing_flow = self._parsing_flow_class() + + for instruction, params in self._instructions_to_attach_with_params.items(): + new_parsing_flow.add_instructions(instruction(**params)) + + return new_parsing_flow diff --git a/tests/parsing_utils_test.py b/tests/parsing_utils_test.py index a0a286a..e456ab4 100644 --- a/tests/parsing_utils_test.py +++ b/tests/parsing_utils_test.py @@ -4,6 +4,7 @@ from bs4 import BeautifulSoup from core.parsing_utils import ( ParsingFlow, + ParsingFlowGenerator, ReferenciaFieldInstructions, PrecioFieldInstructions, TamanoCategoricoFieldInstructions, @@ -2725,3 +2726,30 @@ def test_parsing_flow_fails_for_unrelated_html(unrelated_html): assert not parsing_flow.all_non_optional_fields_were_found and len( parsing_flow.issues ) == len(all_instructions) + + +def test_parsing_flow_generator_returns_proper_flows(): + + four_instructions_with_params = { + ReferenciaFieldInstructions: {}, + PrecioFieldInstructions: {}, + TamanoCategoricoFieldInstructions: {}, + SecondaryFeaturesFieldInstructions: { + "field_name": "personal", + "search_keyword": "Personal", + }, + } + + parsing_flow_generator = ParsingFlowGenerator( + parsing_flow_class=ParsingFlow, + instructions_to_attach_with_params=four_instructions_with_params, + ) + + a_new_parsing_flow = parsing_flow_generator.get_new_flow() + + assert ( + isinstance(a_new_parsing_flow, ParsingFlow), + len(a_new_parsing_flow._instructions) == len(four_instructions_with_params), + all([field.found is None for field in a_new_parsing_flow._instructions]), + all([field.valid is None for field in a_new_parsing_flow._instructions]), + ) From def858ef6ae61ab84a7f03ea1adf119f2574da15 Mon Sep 17 00:00:00 2001 From: pablo Date: Thu, 31 Dec 2020 19:02:09 +0100 Subject: [PATCH 08/13] Modified input format of instructions for ParsingFlowGenerator. Previous dict wouldn't allow for more than one SecondaryFeaturesFieldInstructions class pointer. --- core/parsing_utils.py | 15 ++++++++++----- tests/parsing_utils_test.py | 18 +++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/core/parsing_utils.py b/core/parsing_utils.py index 019e729..fd51b00 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -1,4 +1,4 @@ -from typing import Union, Iterable, Dict, Callable, Type +from typing import Union, Iterable, Dict, Callable, Type, Tuple import re from bs4 import BeautifulSoup @@ -563,18 +563,23 @@ class ParsingFlowGenerator: def __init__( self, parsing_flow_class: Type[ParsingFlow], - instructions_to_attach_with_params: Dict[ - Type[BaseTargetFieldInstructions], dict + instructions_to_attach_with_params: Union[ + Tuple[Type[BaseTargetFieldInstructions], Dict], + Tuple[Tuple[Type[BaseTargetFieldInstructions], Dict]], ], ) -> None: """ Set the flow class and group of instructions to use when creating new instances of the flow class. :param parsing_flow_class: the flow class to instantiate - :param instructions_to_attach_with_params: a key-value pair of field + :param instructions_to_attach_with_params: one or more pair of field instructions class and the paramteres to use when instantiating them """ self._parsing_flow_class = parsing_flow_class + if not isinstance(instructions_to_attach_with_params, tuple): + instructions_to_attach_with_params = tuple( + instructions_to_attach_with_params + ) self._instructions_to_attach_with_params = instructions_to_attach_with_params def get_new_flow(self) -> ParsingFlow: @@ -584,7 +589,7 @@ class ParsingFlowGenerator: """ new_parsing_flow = self._parsing_flow_class() - for instruction, params in self._instructions_to_attach_with_params.items(): + for instruction, params in self._instructions_to_attach_with_params: new_parsing_flow.add_instructions(instruction(**params)) return new_parsing_flow diff --git a/tests/parsing_utils_test.py b/tests/parsing_utils_test.py index e456ab4..de0beeb 100644 --- a/tests/parsing_utils_test.py +++ b/tests/parsing_utils_test.py @@ -2730,15 +2730,15 @@ def test_parsing_flow_fails_for_unrelated_html(unrelated_html): def test_parsing_flow_generator_returns_proper_flows(): - four_instructions_with_params = { - ReferenciaFieldInstructions: {}, - PrecioFieldInstructions: {}, - TamanoCategoricoFieldInstructions: {}, - SecondaryFeaturesFieldInstructions: { - "field_name": "personal", - "search_keyword": "Personal", - }, - } + four_instructions_with_params = ( + (ReferenciaFieldInstructions, {}), + (PrecioFieldInstructions, {}), + (TamanoCategoricoFieldInstructions, {}), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "personal", "search_keyword": "Personal"}, + ), + ) parsing_flow_generator = ParsingFlowGenerator( parsing_flow_class=ParsingFlow, From e34a34acaffba4d93a12a0a5d1d216bbd76d3207 Mon Sep 17 00:00:00 2001 From: pablo Date: Sat, 2 Jan 2021 23:49:10 +0100 Subject: [PATCH 09/13] Fix in throttling test so it doesn't fail around midnight. --- tests/throttling_test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/throttling_test.py b/tests/throttling_test.py index 3a9f916..0c555af 100644 --- a/tests/throttling_test.py +++ b/tests/throttling_test.py @@ -11,8 +11,8 @@ from core.throttling_utils import ( def test_working_hours_throttling_rule_checks(): working_hours_rule = WorkingHoursThrottlingRule( working_hours={ - "start": datetime.datetime.now().time(), - "end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(), + "start": (datetime.datetime.now() + datetime.timedelta(seconds=-5)).time(), + "end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(), } ) @@ -86,8 +86,10 @@ def test_throttle_manager_checks_rules(): some_rules = [ WorkingHoursThrottlingRule( working_hours={ - "start": datetime.datetime.now().time(), - "end": (datetime.datetime.now() + datetime.timedelta(hours=1)).time(), + "start": ( + datetime.datetime.now() + datetime.timedelta(seconds=-5) + ).time(), + "end": (datetime.datetime.now() + datetime.timedelta(seconds=5)).time(), } ), CooldownThrottlingRule(cooldown_time_generator=lambda: 0), From 007f458cd516951949827a323c295e1eeb240c7d Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 3 Jan 2021 20:05:34 +0100 Subject: [PATCH 10/13] Minor fixes. --- capturer/capturer.py | 168 +++++++++++++++++++++++++++++++--------- core/parsing_utils.py | 6 +- core/scrapping_utils.py | 5 +- 3 files changed, 140 insertions(+), 39 deletions(-) diff --git a/capturer/capturer.py b/capturer/capturer.py index 3b25056..7397af6 100644 --- a/capturer/capturer.py +++ b/capturer/capturer.py @@ -1,13 +1,10 @@ import sys -sys.path.append("..") from time import sleep -from bs4 import BeautifulSoup -import re import datetime -from db_layer.capturing_tasks_interface import capturing_interface -from db_layer.capturas_interface import capturas_interface +from db_layer.capturing_tasks_interface import CapturingTasksInterface +from db_layer.capturas_interface import CapturasInterface from core.scrapping_utils import UrlAttack from core.config import working_hours, minimum_seconds_between_tries from core.throttling_utils import ( @@ -17,6 +14,7 @@ from core.throttling_utils import ( DynamicThrottlingRule, ) from refresher.refresher import Refresher +from core.parsing_utils import * import logging @@ -26,8 +24,22 @@ class Capturer: scraping and db storage. """ - def __init__(self, throttling_manager: ThrottleManager) -> None: + def __init__( + self, + throttling_manager: ThrottleManager, + capturing_tasks_interface: CapturingTasksInterface, + capturas_interface: CapturasInterface, + parsing_flow_generator: ParsingFlowGenerator, + url_acquisition_object: Type[UrlAttack], + dead_ad_checker: Callable, + ) -> None: self._throttling_manager = throttling_manager + self._capturing_tasks_interface = capturing_tasks_interface + self._capturas_interface = capturas_interface + self._parsing_flow_generator = parsing_flow_generator + self._url_acquisition_object = url_acquisition_object + self._dead_ad_checker = dead_ad_checker + self.last_try_datetime = datetime.datetime.now() def start(self) -> None: @@ -46,11 +58,17 @@ class Capturer: sleep(10) logging.info("Waiting...") - pending_task = capturing_interface.get_pending_task() + pending_task = self._capturing_tasks_interface.get_pending_task() logging.info("Got a task") - task = CapturingTask(pending_task) + task = CapturingTask( + pending_task, + capturing_interface=self._capturing_tasks_interface, + new_parsing_flow=self._parsing_flow_generator.get_new_flow(), + url_acquisition_object=self._url_acquisition_object, + dead_ad_checker=self._dead_ad_checker, + ) self.last_try_datetime = datetime.datetime.now() task.capture() @@ -60,8 +78,8 @@ class Capturer: logging.warning("Something went wrong, not adding data.") continue - capturas_interface.insert_captura(ad_data) - task._update_status("Captura inserted") + self._capturas_interface.insert_captura(ad_data) + task.update_status("Captura inserted") logging.info("New ad inserted.") @@ -73,29 +91,40 @@ class CapturingTask: sleep_time_failed_request = 180 - def __init__(self, parameters) -> None: + def __init__( + self, + task_parameters: dict, + capturing_interface: CapturingTasksInterface, + new_parsing_flow: ParsingFlow, + url_acquisition_object: Type[UrlAttack], + dead_ad_checker: Callable, + ) -> None: """ Initialize with task parameters and mark the task as being worked on in the task queue. - :param parameters: dict with the necessary parameters for the task + :param task_parameters: dict with the necessary parameters for the task """ - self.uuid = parameters["uuid"] - self.ad_url = parameters["ad_url"] - self.uuid_exploring = parameters["fk_uuid_exploring"] - self.status = parameters["status"] + self.uuid = task_parameters["uuid"] + self.ad_url = task_parameters["ad_url"] + self.uuid_exploring = task_parameters["fk_uuid_exploring"] + self.status = task_parameters["status"] self.request_failures = 1 self.html = None + self._parsing_flow = new_parsing_flow + self._capturing_interface = capturing_interface + self._url_acquistion_object = url_acquisition_object + self._is_dead_ad = dead_ad_checker - self._update_status("Loading") + self.update_status("Loading") - def _update_status(self, new_status) -> None: + def update_status(self, new_status) -> None: """ Updates the task status and persists it in the task queue. :param new_status: string describing the new status :return: None """ self.status = new_status - capturing_interface.update_capturing_task( + self._capturing_interface.update_capturing_task( self.uuid, self.uuid_exploring, self.status, self.ad_url ) @@ -103,34 +132,32 @@ class CapturingTask: """ Main flow of work """ - self._update_status("WIP") + self.update_status("WIP") while self.request_failures < 4: - attack = UrlAttack(self.ad_url) + attack = self._url_acquistion_object(self.ad_url) attack.attack() if attack.success: - self.html = attack.get_text() - self._extract_data() - self._check_data() + self._parse_html(html=attack.get_text()) return if not attack.success: try: - if Refresher.dead_ad_checker(attack.get_text()): - self._update_status("Dead ad") + if self._is_dead_ad(attack.get_text()): + self.update_status("Dead ad") return except AttributeError: logging.error( "Something went wrong when checking if the ad is gone" ) - self._update_status("Fail {}".format(self.request_failures)) + self.update_status("Fail {}".format(self.request_failures)) self.request_failures += 1 sleep(CapturingTask.sleep_time_failed_request) continue - self._update_status("Surrender") + self.update_status("Surrender") logging.warning(f"A task has surrendered. {self.ad_url}") def _extract_data(self) -> None: @@ -148,21 +175,40 @@ class CapturingTask: :return: None """ if self.parser.fields_missing(): - self._update_status("Fields missing") + self.update_status("Fields missing") return if not self.parser.all_fields_are_valid(): - self._update_status("Invalid value fields") + self.update_status("Invalid value fields") return - self._update_status("Data ready") + self.update_status("Data ready") def get_ad_data(self) -> dict: """ Returns the extracted data. :return: dictionary with the data of the ad. """ - return self.parser.get_data() + return self._parsing_flow.field_values + + def _parse_html(self, html: str) -> None: + self._parsing_flow.execute_flow(soup=BeautifulSoup(html, "html5lib")) + + if not self._parsing_flow.issues: + self.update_status("Data ready") + return + + if not self._parsing_flow.all_found_fields_are_valid: + self.update_status("Invalid value fields") + logging.warning(f"Invalid fields found in ad: {self.ad_url}") + logging.warning(f"{self._parsing_flow.issues}") + return + if not self._parsing_flow.all_non_optional_fields_were_found: + self.update_status("Fields missing") + logging.warning( + f"Couldn't scrap necessary fields: {self._parsing_flow.issues}" + ) + return class AdHtmlParser: @@ -362,7 +408,7 @@ class AdHtmlParser: else: return True - def fields_missing(self) -> None: + def fields_missing(self) -> bool: """ Reports on whether all compulsory fields are present. :return: True if some field is missing, false if not @@ -387,13 +433,65 @@ class AdHtmlParser: if __name__ == "__main__": + capturing_tasks_interface = CapturingTasksInterface() + capturas_interface = CapturasInterface() + throttling_manager = ThrottleManager() throttling_manager.add_rule(WorkingHoursThrottlingRule(working_hours)).add_rule( CooldownThrottlingRule(minimum_seconds_between_tries), required_argument_names=["last_attempt_timestamp"], ).add_rule( - DynamicThrottlingRule(lambda: bool(capturing_interface.get_pending_task())) + DynamicThrottlingRule( + lambda: bool(capturing_tasks_interface.get_pending_task()) + ) ) - capturer = Capturer(throttling_manager=throttling_manager) + parsing_flow_generator = ParsingFlowGenerator( + ParsingFlow, + ( + (ReferenciaFieldInstructions, {}), + (PrecioFieldInstructions, {}), + (TamanoCategoricoFieldInstructions, {}), + (M2FieldInstructions, {}), + (TipoAnuncioFieldInstructions, {}), + (CalleFieldInstructions, {}), + (BarrioFieldInstructions, {}), + (DistritoFieldInstructions, {}), + (CiudadFieldInstructions, {}), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "cubierta", "search_keyword": "Cubierta"}, + ), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "puerta_auto", "search_keyword": "Puerta"}, + ), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "ascensor", "search_keyword": "ascensor"}, + ), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "alarma", "search_keyword": "Alarma"}, + ), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "circuito", "search_keyword": "Cámaras"}, + ), + ( + SecondaryFeaturesFieldInstructions, + {"field_name": "personal", "search_keyword": "Personal"}, + ), + (TelefonoFieldInstructions, {}), + ), + ) + + capturer = Capturer( + throttling_manager=throttling_manager, + capturing_tasks_interface=capturing_tasks_interface, + capturas_interface=capturas_interface, + parsing_flow_generator=parsing_flow_generator, + url_acquisition_object=UrlAttack, + dead_ad_checker=Refresher.dead_ad_checker, + ) capturer.start() diff --git a/core/parsing_utils.py b/core/parsing_utils.py index fd51b00..7d81ad5 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -258,7 +258,7 @@ class TipoAnuncioFieldInstructions(BaseTargetFieldInstructions): if "venta" in soup.find("title").text: self.value = 1 self.found = True - if "alquiler" in soup.find("title").text: + if "Alquiler" in soup.find("title").text: self.value = 2 self.found = True @@ -542,11 +542,11 @@ class ParsingFlow: if (field.found or field.is_optional) and field.valid: continue this_field_issues = {} - if not field.found: + if not field.found and not field.is_optional: this_field_issues["found"] = "Not found" if field.search_issue: this_field_issues["search_issue"] = field.search_issue - if not field.valid: + if not field.valid and field.valid is not None: this_field_issues["validity"] = "Not valid" this_field_issues["value"] = field.value diff --git a/core/scrapping_utils.py b/core/scrapping_utils.py index 67a47cf..6fff33b 100644 --- a/core/scrapping_utils.py +++ b/core/scrapping_utils.py @@ -95,7 +95,10 @@ class UrlAttack: except Exception as e: self.success = False - if random.randrange(0, 100) < UrlAttack.identity_change_probability: + if ( + not self.success + or random.randrange(0, 100) < UrlAttack.identity_change_probability + ): self._change_identity() def _change_identity(self) -> None: From cf4ce06b57dcaec225c9eb3498d1dcec2948dd22 Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 3 Jan 2021 20:06:28 +0100 Subject: [PATCH 11/13] Implemented tests for CapturingTask. A few mock classes where needed. --- tests/capturer_test.py | 170 +++++++++++++++++++++++++++++ tests/capturer_tests.py | 236 ---------------------------------------- tests/mock_classes.py | 97 +++++++++++++++++ 3 files changed, 267 insertions(+), 236 deletions(-) create mode 100644 tests/capturer_test.py delete mode 100644 tests/capturer_tests.py create mode 100644 tests/mock_classes.py diff --git a/tests/capturer_test.py b/tests/capturer_test.py new file mode 100644 index 0000000..da60050 --- /dev/null +++ b/tests/capturer_test.py @@ -0,0 +1,170 @@ +from tests.mock_classes import ( + MockCapturingInterface, + MockParsingFlow, + MockUrlAttackReturnsSuccess, + MockUrlAttackReturnsFailure, +) +from capturer.capturer import CapturingTask + + +def test_capturing_task_successful_task_flow(): + + the_task_parameters = dict() + the_task_parameters["uuid"] = "test_uuid" + the_task_parameters["ad_url"] = "test_url" + the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid" + the_task_parameters["status"] = "Pending" + + fake_resulting_field_values = { + "a_field": {"a_value": 1}, + "another_field": {"another_value": 2}, + } + mock_parsing_flow = MockParsingFlow( + mock_all_found_fields_are_valid=True, + mock_all_non_optional_fields_were_found=True, + mock_field_values_to_return=fake_resulting_field_values, + ) + + mock_capturing_interface = MockCapturingInterface() + + task = CapturingTask( + task_parameters=the_task_parameters, + capturing_interface=mock_capturing_interface, + new_parsing_flow=mock_parsing_flow, + url_acquisition_object=MockUrlAttackReturnsSuccess, + dead_ad_checker=lambda: False, + ) + + task.capture() + + final_data = task.get_ad_data() + + assert ( + len(mock_capturing_interface.tasks) == 1 + and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status + == "Data ready" + and fake_resulting_field_values == final_data + ) + + +def test_capturing_task_dead_ad_task_flow(): + the_task_parameters = dict() + the_task_parameters["uuid"] = "test_uuid" + the_task_parameters["ad_url"] = "test_url" + the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid" + the_task_parameters["status"] = "Pending" + + mock_parsing_flow = MockParsingFlow( + mock_all_found_fields_are_valid=False, + issues_to_return={"some_field": {"valid": False}}, + ) + + mock_capturing_interface = MockCapturingInterface() + + task = CapturingTask( + task_parameters=the_task_parameters, + capturing_interface=mock_capturing_interface, + new_parsing_flow=mock_parsing_flow, + url_acquisition_object=MockUrlAttackReturnsFailure, + dead_ad_checker=lambda x: True, + ) + + task.capture() + + assert ( + len(mock_capturing_interface.tasks) == 1 + and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status + == "Dead ad" + ) + + +def test_capturing_task_invalid_fields_surrender_flow(): + the_task_parameters = dict() + the_task_parameters["uuid"] = "test_uuid" + the_task_parameters["ad_url"] = "test_url" + the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid" + the_task_parameters["status"] = "Pending" + + mock_parsing_flow = MockParsingFlow( + mock_all_found_fields_are_valid=False, + issues_to_return={"some_field": {"valid": False}}, + ) + + mock_capturing_interface = MockCapturingInterface() + + task = CapturingTask( + task_parameters=the_task_parameters, + capturing_interface=mock_capturing_interface, + new_parsing_flow=mock_parsing_flow, + url_acquisition_object=MockUrlAttackReturnsSuccess, + dead_ad_checker=lambda: False, + ) + + task.capture() + + assert ( + len(mock_capturing_interface.tasks) == 1 + and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status + == "Invalid value fields" + ) + + +def test_capturing_task_missing_fields_surrender_flow(): + the_task_parameters = dict() + the_task_parameters["uuid"] = "test_uuid" + the_task_parameters["ad_url"] = "test_url" + the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid" + the_task_parameters["status"] = "Pending" + + mock_parsing_flow = MockParsingFlow( + mock_all_non_optional_fields_were_found=False, + issues_to_return={"some_field": {"found": False}}, + ) + + mock_capturing_interface = MockCapturingInterface() + + task = CapturingTask( + task_parameters=the_task_parameters, + capturing_interface=mock_capturing_interface, + new_parsing_flow=mock_parsing_flow, + url_acquisition_object=MockUrlAttackReturnsSuccess, + dead_ad_checker=lambda: False, + ) + + task.capture() + + assert ( + len(mock_capturing_interface.tasks) == 1 + and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status + == "Fields missing" + ) + + +def test_capturing_task_unexpected_issue_surrender_flow(): + the_task_parameters = dict() + the_task_parameters["uuid"] = "test_uuid" + the_task_parameters["ad_url"] = "test_url" + the_task_parameters["fk_uuid_exploring"] = "test_exploring_uuid" + the_task_parameters["status"] = "Pending" + + mock_parsing_flow = MockParsingFlow() + + mock_capturing_interface = MockCapturingInterface() + + CapturingTask.sleep_time_failed_request = 0 # Override quite long sleep time + + task = CapturingTask( + task_parameters=the_task_parameters, + capturing_interface=mock_capturing_interface, + new_parsing_flow=mock_parsing_flow, + url_acquisition_object=MockUrlAttackReturnsFailure, + dead_ad_checker=lambda x: False, + ) + + task.capture() + + assert ( + len(mock_capturing_interface.tasks) == 1 + and mock_capturing_interface.tasks[the_task_parameters["uuid"]][-1].status + == "Surrender" + ) diff --git a/tests/capturer_tests.py b/tests/capturer_tests.py deleted file mode 100644 index 772b1f5..0000000 --- a/tests/capturer_tests.py +++ /dev/null @@ -1,236 +0,0 @@ -# -*- coding: utf-8 -*- -import sys - -sys.path.append("..") -from capturer.capturer import CapturingTask, Capturer, AdHtmlParser -from db_layer.capturas_interface import capturas_interface - - -def test_CapturingTask(): - parameters = { - "uuid": "testie test", - "ad_url": "https://www.idealista.com/inmueble/28252032", - "fk_uuid_exploring": None, - "status": "Pending", - } - - task = CapturingTask(parameters) - - task.capture() - print(task.get_ad_data()) - capturas_interface.insert_captura(task.get_ad_data()) - - -def test_Capturer(): - capturer = Capturer() - capturer.start() - - -def test_AdHtmlParser(): - - html = """ - - - - - - - - - - - - - - - Alquiler de Garaje en calle de Balmes, 138, La Dreta de l'Eixample, Barcelona

Alquiler de Garaje en calle de Balmes, 138 La Dreta de l'Eixample, Barcelona Ver mapa

30 €/mes
1 m²

Comentario del anunciante

Características básicas

  • 1 m²

¿Hay algún error en este anuncio?

Infórmanos para corregirlo y ayudar a otros usuarios.

Cuéntanos qué error has visto

¿Cuánto vale este inmueble?

Te enviamos un informe con la estimación de precio para este inmueble y con información de la zona.

Comprar estimación de precio

Ubicación

  • Calle de Balmes, 138
  • Urb. Eixample esquerra
  • Barrio La Dreta de l'Eixample
  • Distrito Eixample
  • Barcelona
  • Área de Barcelona, Barcelona

Estadísticas

- - - - """ - - parser = AdHtmlParser(html) - - parser.parse() - parser._validate() - - -# test_AdHtmlParser() - -test_CapturingTask() - -# test_Capturer() diff --git a/tests/mock_classes.py b/tests/mock_classes.py new file mode 100644 index 0000000..57bbbcf --- /dev/null +++ b/tests/mock_classes.py @@ -0,0 +1,97 @@ +from collections import namedtuple +from typing import Dict + +from bs4 import BeautifulSoup + +from db_layer.capturing_tasks_interface import CapturingTasksInterface +from core.parsing_utils import ParsingFlow +from core.scrapping_utils import UrlAttack + + +class MockCapturingInterface(CapturingTasksInterface): + + task_state_record = namedtuple( + "TaskStateRecord", ["uuid", "uuid_exploring", "status", "ad_url"] + ) + + def __init__(self): + self.tasks = {} + + def update_capturing_task(self, uuid, uuid_exploring, status, ad_url): + if uuid not in self.tasks: + self.tasks[uuid] = [] + + self.tasks[uuid].append( + MockCapturingInterface.task_state_record( + uuid=uuid, uuid_exploring=uuid_exploring, status=status, ad_url=ad_url + ) + ) + + +class MockParsingFlow(ParsingFlow): + def __init__( + self, + issues_to_return: Dict[str, dict] = None, + mock_all_found_fields_are_valid: bool = True, + mock_all_non_optional_fields_were_found: bool = True, + mock_field_values_to_return: Dict[str, dict] = None, + ): + args_with_empty_dict_as_default = [ + issues_to_return, + mock_field_values_to_return, + ] + for arg in args_with_empty_dict_as_default: + if arg is None: + arg = dict() + + self._issues = issues_to_return + self._mock_all_found_fields_are_valid = mock_all_found_fields_are_valid + self._mock_field_values_to_return = mock_field_values_to_return + self._mock_all_non_optional_fields_were_found = ( + mock_all_non_optional_fields_were_found + ) + + def execute_flow(self, soup: BeautifulSoup) -> None: + pass + + @property + def issues(self) -> Dict[str, dict]: + return self._issues + + @property + def all_found_fields_are_valid(self) -> bool: + return self._mock_all_found_fields_are_valid + + @property + def all_non_optional_fields_were_found(self) -> bool: + return self._mock_all_non_optional_fields_were_found + + @property + def field_values(self) -> Dict: + return self._mock_field_values_to_return + + +class MockUrlAttack(UrlAttack): + def __init__(self, url: str) -> None: + super().__init__(url=url) + + def get_text(self) -> str: + return "this_is_a_fake_html_string" + + +class MockUrlAttackReturnsSuccess(MockUrlAttack): + def __init__(self, url: str) -> None: + super().__init__(url=url) + + def attack(self) -> None: + self.success = True + self.has_been_attacked = True + + +class MockUrlAttackReturnsFailure(MockUrlAttack): + def __init__(self, url: str) -> None: + super().__init__(url=url) + + def attack(self) -> None: + self.success = False + self.has_been_attacked = True From adf2cd26ba3082021a14c2a264fdfd774143f526 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 4 Jan 2021 21:56:24 +0100 Subject: [PATCH 12/13] Minor fix regarding issue spotting in parsing. --- core/parsing_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/parsing_utils.py b/core/parsing_utils.py index 7d81ad5..b9b6b58 100644 --- a/core/parsing_utils.py +++ b/core/parsing_utils.py @@ -539,7 +539,9 @@ class ParsingFlow: issues = {} for field in self._instructions: - if (field.found or field.is_optional) and field.valid: + if (field.found or field.is_optional) and ( + field.valid is True or field.valid is None + ): continue this_field_issues = {} if not field.found and not field.is_optional: From cbf1643fb56797d93451dca0acac44bb978bcb69 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 4 Jan 2021 22:17:40 +0100 Subject: [PATCH 13/13] Formatting, docstrings and other chores. --- capturer/capturer.py | 269 +++--------------------------------- core/scrapping_utils.py | 2 +- tests/parsing_utils_test.py | 16 +-- 3 files changed, 32 insertions(+), 255 deletions(-) diff --git a/capturer/capturer.py b/capturer/capturer.py index 7397af6..6971cfb 100644 --- a/capturer/capturer.py +++ b/capturer/capturer.py @@ -1,5 +1,3 @@ -import sys - from time import sleep import datetime @@ -33,6 +31,19 @@ class Capturer: url_acquisition_object: Type[UrlAttack], dead_ad_checker: Callable, ) -> None: + """ + Receive all required objects. + :param throttling_manager: takes care of deciding whether a task should + be started + :param capturing_tasks_interface: interface to interact with the tasks + database + :param capturas_interface: interface to interact with the ad database + :param parsing_flow_generator: an object capable of generating empty + parsing flows to give each task a new one + :param url_acquisition_object: gateway to obtaining the HTML of an url + :param dead_ad_checker: callable capable of checking if an ad is dead + through its HTML + """ self._throttling_manager = throttling_manager self._capturing_tasks_interface = capturing_tasks_interface self._capturas_interface = capturas_interface @@ -103,6 +114,10 @@ class CapturingTask: Initialize with task parameters and mark the task as being worked on in the task queue. :param task_parameters: dict with the necessary parameters for the task + :param capturing_interface: interface to interact with the ad database + :param new_parsing_flow: an empty parsing flow + :param url_acquisition_object: gateway to obtaining the HTML of an url + :param dead_ad_checker: callable capable of checking if an ad is dead """ self.uuid = task_parameters["uuid"] self.ad_url = task_parameters["ad_url"] @@ -160,30 +175,6 @@ class CapturingTask: self.update_status("Surrender") logging.warning(f"A task has surrendered. {self.ad_url}") - def _extract_data(self) -> None: - """ - Parses the obtained html to extract the ad information. - :return: None - """ - self.parser = AdHtmlParser(self.html) - self.parser.parse() - - def _check_data(self) -> None: - """ - Validates that all compulsory fields have been obtained and that the - values are within the expected. Sets the status of task accordingly. - :return: None - """ - if self.parser.fields_missing(): - self.update_status("Fields missing") - return - - if not self.parser.all_fields_are_valid(): - self.update_status("Invalid value fields") - return - - self.update_status("Data ready") - def get_ad_data(self) -> dict: """ Returns the extracted data. @@ -192,6 +183,12 @@ class CapturingTask: return self._parsing_flow.field_values def _parse_html(self, html: str) -> None: + """ + Execute the complete parsing flow and report the task status depending + on the outcome. + :param html: the HTML of the ad + :return: None + """ self._parsing_flow.execute_flow(soup=BeautifulSoup(html, "html5lib")) if not self._parsing_flow.issues: @@ -211,226 +208,6 @@ class CapturingTask: return -class AdHtmlParser: - """ - Object for parsing, storing and validating the data of the HTML of an ad. - """ - - def __init__(self, html_string: str) -> None: - """ - Initializes an instance of the parser with the HTML of an ad. - :param html_string: the full HTML code of the ad page - """ - self.html = html_string - - self.ad_fields = { - "referencia": {"found": False, "optional": False, "value": None}, - "precio": {"found": False, "optional": False, "value": None}, - "tamano_categorico": {"found": False, "optional": True, "value": None}, - "m2": {"found": False, "optional": True, "value": None}, - "tipo_anuncio": {"found": False, "optional": False, "value": None}, - "calle": {"found": False, "optional": True, "value": None}, - "barrio": {"found": False, "optional": False, "value": None}, - "distrito": {"found": False, "optional": False, "value": None}, - "ciudad": {"found": False, "optional": False, "value": None}, - "cubierta": {"found": False, "optional": False, "value": None}, - "puerta_auto": {"found": False, "optional": False, "value": None}, - "ascensor": {"found": False, "optional": False, "value": None}, - "alarma": {"found": False, "optional": False, "value": None}, - "circuito": {"found": False, "optional": False, "value": None}, - "personal": {"found": False, "optional": False, "value": None}, - "telefono": {"found": False, "optional": True, "value": None}, - } - - def parse(self) -> None: - """ - Parses the HTML and stores the ad data. - :return: None - """ - - soup = BeautifulSoup(self.html, "html5lib") - - if soup.find_all("link", {"rel": "canonical"}) is not None: - self.ad_fields["referencia"]["value"] = re.findall( - r"[0-9]{5,20}", str(soup.find_all("link", {"rel": "canonical"})[0]) - )[0] - self.ad_fields["referencia"]["found"] = True - - if soup.find_all("strong", {"class": "price"}) is not None: - self.ad_fields["precio"]["value"] = "".join( - re.findall( - r"[0-9]", str(soup.find_all("strong", {"class": "price"})[0]) - ) - ) - self.ad_fields["precio"]["found"] = True - - if soup.find("div", {"class": "info-features"}) is not None: - try: - if ( - "m²" - not in soup.find("div", {"class": "info-features"}) - .find("span") - .find("span") - .text - ): - self.ad_fields["tamano_categorico"]["value"] = ( - soup.find("div", {"class": "info-features"}) - .find("span") - .find("span") - .text - ) - self.ad_fields["tamano_categorico"]["found"] = True - except: - pass - - posible_m2 = [ - tag.text - for tag in soup.find("div", {"class": "info-features"}).find_all("span") - ] - if [posible for posible in posible_m2 if "m²" in posible]: - self.ad_fields["m2"]["value"] = [ - "".join(re.findall(r"[0-9]+,*[0-9]*", posible)) - for posible in posible_m2 - if "m²" in posible - ][0].replace(",", ".") - self.ad_fields["m2"]["found"] = True - - if soup.find("title") is not None: - if "venta" in soup.find("title").text: - self.ad_fields["tipo_anuncio"]["value"] = 1 - else: - self.ad_fields["tipo_anuncio"]["value"] = 2 - self.ad_fields["tipo_anuncio"]["found"] = True - - if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 3: - self.ad_fields["calle"]["value"] = "" - self.ad_fields["ciudad"]["value"] = ( - soup.find("div", {"id": "headerMap"}).find_all("li")[-2].text.strip() - ) - self.ad_fields["ciudad"]["found"] = True - self.ad_fields["distrito"]["value"] = ( - soup.find("div", {"id": "headerMap"}).find_all("li")[-3].text.strip() - ) - self.ad_fields["distrito"]["found"] = True - self.ad_fields["barrio"]["value"] = ( - soup.find("div", {"id": "headerMap"}).find_all("li")[-4].text.strip() - ) - self.ad_fields["barrio"]["found"] = True - if len(soup.find("div", {"id": "headerMap"}).find_all("li")) > 4: - self.ad_fields["calle"]["value"] = ( - soup.find("div", {"id": "headerMap"}).find_all("li")[0].text.strip() - ) - self.ad_fields["calle"]["found"] = True - - features_lists = soup.find_all("div", {"class": "details-property_features"}) - features = [ - feature.text - for feature_list in features_lists - for feature in feature_list.find_all("li") - ] - self.ad_fields["cubierta"]["value"] = 1 * any( - "Cubierta" in feature for feature in features - ) - self.ad_fields["puerta_auto"]["value"] = 1 * any( - "Puerta" in feature for feature in features - ) - self.ad_fields["ascensor"]["value"] = 1 * any( - "ascensor" in feature for feature in features - ) - self.ad_fields["alarma"]["value"] = 1 * any( - "Alarma" in feature for feature in features - ) - self.ad_fields["circuito"]["value"] = 1 * any( - "Cámaras" in feature for feature in features - ) - self.ad_fields["personal"]["value"] = 1 * any( - "Personal" in feature for feature in features - ) - - self.ad_fields["cubierta"]["found"] = True - self.ad_fields["puerta_auto"]["found"] = True - self.ad_fields["ascensor"]["found"] = True - self.ad_fields["alarma"]["found"] = True - self.ad_fields["circuito"]["found"] = True - self.ad_fields["personal"]["found"] = True - - if soup.find("p", {"class": "txt-bold _browserPhone icon-phone"}) is not None: - self.ad_fields["telefono"]["value"] = soup.find( - "p", {"class": "txt-bold _browserPhone icon-phone"} - ).text.replace(" ", "") - self.ad_fields["telefono"]["found"] = True - - def _validate(self) -> None: - """ - Checks whether the extracted values are valid against the expected - typology. Stores the results. - :return: None - """ - self.invalid_fields = [] - - if not re.match(r"[0-9]{4,20}", self.ad_fields["referencia"]["value"]): - self.invalid_fields.append("referencia") - - if not re.match(r"[0-9]{1,20}", self.ad_fields["precio"]["value"]): - self.invalid_fields.append("precio") - - possible_values_tamano = [ - "2 coches o más", - "coche y moto", - "coche grande", - "coche pequeño", - "moto", - None, - ] - if self.ad_fields["tamano_categorico"]["value"] not in possible_values_tamano: - self.invalid_fields.append("tamano_categorico") - - if not "Barrio" in self.ad_fields["barrio"]["value"]: - self.invalid_fields.append("barrio") - - if not "Distrito" in self.ad_fields["distrito"]["value"]: - self.invalid_fields.append("distrito") - - if self.ad_fields["telefono"]["found"] and not re.match( - r"\s*\+?[0-9\s]*", self.ad_fields["telefono"]["value"] - ): - self.invalid_fields.append("telefono") - # TODO añadir + a caracteres validos - - def all_fields_are_valid(self) -> bool: - """ - Reports on whether the extracted data is valid. - :return: True if values are valid, false if not - """ - self._validate() - if self.invalid_fields: - return False - else: - return True - - def fields_missing(self) -> bool: - """ - Reports on whether all compulsory fields are present. - :return: True if some field is missing, false if not - """ - for key, contents in self.ad_fields.items(): - if not contents["optional"] and not contents["found"]: - return True - return False - - def get_data(self) -> dict: - """ - Returns the extracted data in the form of a dictionary. - :return: dictionary with the extracted data - """ - data = {} - - for ad_field in self.ad_fields.keys(): - data[ad_field] = self.ad_fields[ad_field]["value"] - - return data - - if __name__ == "__main__": capturing_tasks_interface = CapturingTasksInterface() diff --git a/core/scrapping_utils.py b/core/scrapping_utils.py index 6fff33b..3ab6386 100644 --- a/core/scrapping_utils.py +++ b/core/scrapping_utils.py @@ -92,7 +92,7 @@ class UrlAttack: if self.response.ok: self.success = True - except Exception as e: + except Exception: self.success = False if ( diff --git a/tests/parsing_utils_test.py b/tests/parsing_utils_test.py index de0beeb..9d9d56d 100644 --- a/tests/parsing_utils_test.py +++ b/tests/parsing_utils_test.py @@ -2518,8 +2518,8 @@ def test_referencia_instructions_extract_correctly(real_ad_html): referencia_instructions.validate() assert ( - referencia_instructions.found == True - and referencia_instructions.valid == True + referencia_instructions.found is True + and referencia_instructions.valid is True and referencia_instructions.value is not None and referencia_instructions.search_issue is None ) @@ -2534,7 +2534,7 @@ def test_referencia_instructions_find_nothing_in_unrelated_html(unrelated_html): referencia_instructions.validate() assert ( - referencia_instructions.found == False + referencia_instructions.found is False and referencia_instructions.valid is None and referencia_instructions.value is None and referencia_instructions.search_issue is not None @@ -2580,8 +2580,8 @@ def test_all_instructions_extract_correctly(real_ad_html): assert all( [ - instruction.found == True - and instruction.valid == True + instruction.found is True + and instruction.valid is True and instruction.value is not None and instruction.search_issue is None for instruction in all_instructions @@ -2628,8 +2628,8 @@ def test_all_instructions_fail_on_unrelated_html(unrelated_html): assert all( [ - instruction.found == False - and (instruction.valid == False or instruction.valid == None) + instruction.found is False + and (instruction.valid is False or instruction.valid is None) and instruction.value is None for instruction in all_instructions ] @@ -2725,7 +2725,7 @@ def test_parsing_flow_fails_for_unrelated_html(unrelated_html): assert not parsing_flow.all_non_optional_fields_were_found and len( parsing_flow.issues - ) == len(all_instructions) + ) == len([field for field in all_instructions if not field.is_optional]) def test_parsing_flow_generator_returns_proper_flows():