# -*- coding: utf-8 -*- import sys sys.path.append('..') import mysql.connector from core.alerts import alert_master from core.config import current_db_parameters anuncios_db_parameters = current_db_parameters.update({'database': 'anuncios'}) tasks_db_parameters = current_db_parameters.update({'database': 'tasks'}) class DatabaseWrapper(): def __init__(self, connection_parameters): self.host = connection_parameters['host'] self.database = connection_parameters['database'] self.user = connection_parameters['user'] self.password = connection_parameters['password'] self.connection = None self.ping() def connect(self): self.connection = mysql.connector.connect(host = self.host, database = self.database, user = self.user, password = self.password, autocommit = True) def disconnect(self): if self.connection.is_connected(): self.connection.disconnect() def ping(self): self.connect() self.disconnect() def query(self, query_statement, query_parameters=None, dictionary=False): self.connect() if self.connection.is_connected(): try: execution_cursor = self.connection.cursor(dictionary = dictionary) execution_cursor.execute(query_statement, query_parameters) self.disconnect() return execution_cursor except Exception as e: alert_master("SQL ERROR", """Se ha producido un error ejecutando la siguiente query: {}. Con los siguientes parametros: {} {} """.format(query_statement, query_parameters, e)) else: raise Exception("Could not connect to the database.") def query_dict(self, query_statement, query_parameters = None): return self.query(query_statement, query_parameters, dictionary = True) def get_anunciosdb(): return DatabaseWrapper(anuncios_db_parameters) def get_tasksdb(): return DatabaseWrapper(tasks_db_parameters)