From 5eedb037ed4cef096d459234936ba6bb1a4392e7 Mon Sep 17 00:00:00 2001 From: pablomartincalvo Date: Thu, 30 Aug 2018 19:38:31 +0200 Subject: [PATCH] Correcciones en wrapper_mysql y avance en metodos de explorer. Iniciado modulo de alertas. --- .gitignore | 1 + .idea/workspace.xml | 156 +++++++++++++++++++++++--- core/alerts.py | 39 +++++++ core/mysql_wrapper.py | 30 +++-- core/scrapping_utils.py | 2 +- explorer/explorer.py | 236 +++++++++++++++++++++++++++------------- 6 files changed, 359 insertions(+), 105 deletions(-) create mode 100644 core/alerts.py diff --git a/.gitignore b/.gitignore index e69de29..f7ac2f6 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +/data_backups diff --git a/.idea/workspace.xml b/.idea/workspace.xml index bcfc20d..6b869d7 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -1,7 +1,12 @@ - + + + + + + - - + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + + + + + + + + + + + queue_retries + + @@ -40,6 +111,10 @@ @@ -96,6 +171,9 @@ + + + @@ -106,34 +184,82 @@ + + + + + + + + + - - + - - - - + - + + + + + + + + + + + + + + + + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/alerts.py b/core/alerts.py new file mode 100644 index 0000000..c91cc36 --- /dev/null +++ b/core/alerts.py @@ -0,0 +1,39 @@ +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +import smtplib + + +my_adress = 'drogonalerts@gmail.com' +master_address = 'pablomartincalvo@gmail.com' + +def alert_master(header, message): + #TODO Acabar la alerta de email + + msg = MIMEMultipart() + + message = "Thank you" + + # setup the parameters of the message + password = "your_password" + msg['From'] = "your_address" + msg['To'] = "to_address" + msg['Subject'] = "Subscription" + + # add in the message body + msg.attach(MIMEText(message, 'plain')) + + # create server + server = smtplib.SMTP('smtp.gmail.com: 587') + + server.starttls() + + # Login Credentials for sending the mail + server.login(msg['From'], password) + + # send the message via the server. + server.sendmail(msg['From'], msg['To'], msg.as_string()) + + server.quit() + +print +"successfully sent email to %s:" % (msg['To']) \ No newline at end of file diff --git a/core/mysql_wrapper.py b/core/mysql_wrapper.py index 33a23eb..1fe3d97 100644 --- a/core/mysql_wrapper.py +++ b/core/mysql_wrapper.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- import mysql.connector -anuncios_db_parameters = {'host': '46.183.115.154', +anuncios_db_parameters = {'host': '185.166.215.170', 'database': 'anuncios', - 'user': 'pablo', - 'password': 'noesfacilvivirsinpin'} + 'user': 'drogon', + 'password': 'noesfacilvivirsindrogon'} + +tasks_db_parameters = {'host': '185.166.215.170', + 'database': 'tasks', + 'user': 'drogon', + 'password': 'noesfacilvivirsindrogon'} class DatabaseWrapper(): @@ -18,14 +23,11 @@ class DatabaseWrapper(): self.ping() def connect(self): - try: - self.connection = mysql.connector.connect(host = self.host, - database = self.database, - user = self.user, - password = self.password) - except Exception as e: - print("Could not connect to the database.") - print(e) + self.connection = mysql.connector.connect(host = self.host, + database = self.database, + user = self.user, + password = self.password, + autocommit = True) def disconnect(self): if self.connection.is_connected(): @@ -42,13 +44,19 @@ class DatabaseWrapper(): execution_cursor.execute(query_statement, query_parameters) self.disconnect() return execution_cursor + else: + raise Exception("Could not connect to the database.") def query_dict(self, query_statement, query_parameters = None): return self.query(query_statement, query_parameters, dictionary = True) + def get_anunciosdb(): return DatabaseWrapper(anuncios_db_parameters) +def get_tasksdb(): + return DatabaseWrapper(tasks_db_parameters) + \ No newline at end of file diff --git a/core/scrapping_utils.py b/core/scrapping_utils.py index b37834a..b4d6abf 100644 --- a/core/scrapping_utils.py +++ b/core/scrapping_utils.py @@ -33,4 +33,4 @@ class UrlAttack(): def get_text(self): if self.success: - return self.response.text() \ No newline at end of file + return self.response.text \ No newline at end of file diff --git a/explorer/explorer.py b/explorer/explorer.py index eddd850..d551ffe 100644 --- a/explorer/explorer.py +++ b/explorer/explorer.py @@ -2,36 +2,35 @@ import sys sys.path.append('..') import uuid -import datetime +from datetime import datetime from time import sleep from bs4 import BeautifulSoup -from core.mysql_wrapper import get_anunciosdb -from core.scrapping_utils import UrlAttack +import re +from random import randint +from core.mysql_wrapper import get_anunciosdb, get_tasksdb +from core.scrapping_utils import UrlAttack +import core.alerts class Explorer(): sleep_time_no_work = 60 sleep_time_no_service = 600 - + working_hours = {start: datetime.time(9, 0, 0), + end: datetime.time(18, 0, 0)} + monthly_capture_target = 1000 def __init__(self): try: self.anunciosdb = get_anunciosdb() - except: print("Could not connect to anuncios DB") - try: - self.task_log_db = #get_task_log_db() - except: - print("Could not connect to task log DB") - + self.max_db_retries = 3 self.db_retries = 0 self.max_queue_retries = 3 self.queue_retries = 0 def start(self): - #Arrancar el servicio while True: if not self.there_is_work(): @@ -45,16 +44,7 @@ class Explorer(): break current_task = ExploringTask(self.compose_listing_url) - if current_task.is_ready_to_explore: - current_task.explore() - else: - break - if current_task.status == 'referencias ready': - current_referencias = current_task.get_referencias() - - for referencia in current_referencias: - self.post_task_to_queue(referencia) - current_task.update_status('Sent to queue') + current_task.explore() continue @@ -64,12 +54,22 @@ class Explorer(): def stop(self): #TODO #Detener el servicio + pass def there_is_work(self): - #TODO - #Comprueba si hay trabajo por hacer - #Mirando en la bd de tasks cuantas se han hecho ultimamente, mensualmente - #etc. + """ + Funcion que agrupa las condiciones que se deben cumplir para poder trabajar + """ + if not self.in_working_hours(): + return False + + if self.get_referencias_acquired_today() >= self.get_max_referencias_for_today(): + return False + + if self.get_tasks_created_today() >= self.get_max_tasks_today(): + return False + + return True def database_is_up(self): while self.db_retries <= self.max_db_retries: @@ -82,8 +82,7 @@ class Explorer(): self.db_retries = self.db_retries + 1 return False - - + def queue_is_up(self): #TODO while self.queue_retries <= self.max_queue_retries: @@ -95,77 +94,140 @@ class Explorer(): sleep(sleep_time_no_service) self.queue_retries = self.queue_retries + 1 - return False + return False + def in_working_hours(self): + return working_hours['start'] <= datetime.now().time() <= working_hours['end'] + + def get_referencias_acquired_today(self): + """ + Cuenta cuantas nuevas referencias han aparecido en las ultimas 24 horas + """ + + query_statement = """ SELECT count(referencias) + FROM primera_captura_full + WHERE fecha_captura >= now() - INTERVAL 1 DAY; + """ + + cursor_result = self.anunciosdb.query(query_statement) + + return cursor_result.fetchone() + + def get_max_referencias_for_today(self): + """ + Calcula la cantidad objetivo para las ultimas 24 horas en base a la + diferencia con el objetivo mensual + """ + query_statement = """ SELECT count(referencias) + FROM primera_captura_full + WHERE fecha_captura >= now() - INTERVAL 30 DAY; + """ + cursor_result = self.anunciosdb.query(query_statement) + new_referencias_last_30 = cursor_result.fetchone() + + deviation = (monthly_capture_target - new_referencias_last_30) / monthly_capture_target + max_referencias = (monthly_capture_target/30) * (1 + (deviation)) + + return max_referencias + + def get_tasks_created_today(self): + """ + Mira en el task log cuantas tareas se han iniciado en las ultimas 24 horas + """ + query_statement = """ SELECT count(uuid) + FROM exploring_tasks + WHERE status = 'Attacked' + AND write_time >= now() - INTERVAL 1 DAY; + """ + cursor_result = self.tasksdb.query(query_statement) + tasks_created_today = cursor_result.fetchone() + + return tasks_created_today + + def get_max_tasks_today(self): + """ + Calcula el maximo diario de intentos en forma de tareas, en base al + maximo de capturas mas un multiplicador + """ + return (self.get_max_referencias_for_today() / 30) * 6 + def compose_listing_url(self): - #TODO - #Decide que url hay que componer y la compone + """ + Genera URLs de manera aleatoria + :return: + """ raiz = 'https://www.idealista.com/' - tipo = #Logica random + tipo = randint(1,2) ciudad = 'barcelona' - numero = #logica random - url = raiz + tipo + '-garajes/' + ciudad + '-' + ciudad + '/' + + numero = randint(1,30) + url = raiz + tipo + '-garajes/' + ciudad + '-' + ciudad + '/' + \ 'pagina-' + numero + '.htm' return url - - def post_task_to_queue(self, referencia): - #TODO - #Manda la task a la cola redis - class ExploringTask(): def __init__(self, url): + self.anunciosdb = get_anunciosdb() + self.tasksdb = get_tasksdb() self.target_url = url self.id = str(uuid.uuid4()) - self.update_status('Pending') + self._update_status('Pending') - try: - self.anunciosdb = get_anunciosdb() - except: - self.anunciosdb = None - self.update_status('Unable to connect to anuncios DB') - - try: - #TODO - #Pendiente de implementar wraper para MongoDB - #self.task_log_db = - except: - self.update_status('Unable to connect to task log DB') - #self.task_log_db = None - - def update_status(self, new_status): + def _update_status(self, new_status): self.status = new_status - self._log_in_taskdb() - - - def is_ready_to_explore(self): - if self.anunciosdb is not None and self.task_log_db is not None: - return True - else: - return False + self._log_in_tasksdb() def explore(self): - attack = UrlAttack(self.url) + attack = UrlAttack(self.target_url) attack.attack() + self._update_status('Attacked') if attack.success: + self._validate_referencias(attack.get_text()) self._extract_referencias(attack.get_text()) - if self.new_listings: - self.update_status('referencias ready') + if self.referencias: + self._update_status('Referencias ready') + self._post_tasks_to_queue() + self._update_status('Sent to Queue') + elif self.there_are_referencias: + self._update_status('Failure - No new referencias in HTML') else: - self.update_status('Failure - No listings in HTML') + self._update_status('Failure - HTML with no referencias') else: - self.update_status('Failure - Bad request') + self._update_status('Failure - Bad request') - def get_referencias(self): - return self.referencias - def _log_in_taskdb(self): - #TODO - #Funcion que grabe estado y demas en una mongodb o argo azin + def _log_in_tasksdb(self): + """ + Graba en la base de datos de tareas un registro con el UUID de la tarea, + un timestamp y el status + """ + + query_statement = """INSERT INTO exploring_tasks_logs + (uuid, write_time, status) + VALUES (%(uuid)s, NOW(), %(status)s)""" + + query_parameters = {'uuid': self.id, + 'status': self.status} + + self.tasksdb.query(query_statement, query_parameters) + + def _validate_referencias(self, html): + """ + Comprueba que las etiquetas sigan el formato de un anuncio. + Lanza una advertencia si no es así. + """ + soup = BeautifulSoup(html, 'html5lib') + ads = soup.find_all(class_ = "item") + pattern = "^[0-9]{3,20}$" + + for ad in ads: + if not re.match(pattern, ad["data-adid"]): + #TODO Levantar marron + pass + def _extract_referencias(self, html): """ @@ -173,17 +235,19 @@ class ExploringTask(): de capturas, y guarda si han aparecido listings y si hay alguno nuevo """ - soup = BeautifulSoup(self.html, 'html5lib') - ads = sopa.find_all(class_ = "item") + soup = BeautifulSoup(html, 'html5lib') + ads = soup.find_all(class_ = "item") + self.there_are_referencias = bool(ads) self.referencias = [] for ad in ads: if self._is_new_listing(ad["data-adid"]): self.referencias.append(ad["data-adid"]) - self.new_listings = bool(self.referencias) + def _is_new_listing(self, referencia): - #TODO - #Comprobar contra base de datos si la referencia existe en base de datos + """ + Comprueba si el listing ya existe en la base de datos de anuncios + """ query_statement = """SELECT count(referencia) FROM capturas WHERE referencia = %s""" @@ -195,4 +259,20 @@ class ExploringTask(): return False else: return True - + + def _post_tasks_to_queue(self): + #TODO Mandar las referencias a redis + pass + + +def testear_exploring_task(): + url = 'https://www.idealista.com/venta-garajes/barcelona-barcelona/' + task = ExploringTask(url) + task.explore() + + print(task.referencias) + + +testear_exploring_task() + +