# -*- coding: utf-8 -*- import sys sys.path.append("..") import mysql.connector from core.alerts import alert_master from core.config import current_db_parameters anuncios_db_parameters = {"database": "anuncios", **current_db_parameters} tasks_db_parameters = {"database": "tasks", **current_db_parameters} class DatabaseWrapper: def __init__(self, connection_parameters): self.host = connection_parameters["host"] self.database = connection_parameters["database"] self.user = connection_parameters["user"] self.password = connection_parameters["password"] self.connection = None self.ping() def connect(self): self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password, autocommit=False, ) def disconnect(self): if self.connection.is_connected(): self.connection.disconnect() def ping(self): self.connect() self.disconnect() def query(self, query_statement, query_parameters=None, dictionary=False): self.connect() if self.connection.is_connected(): try: execution_cursor = self.connection.cursor( dictionary=dictionary, buffered=True ) execution_cursor.execute(query_statement, query_parameters) self.connection.commit() self.disconnect() return execution_cursor except Exception as e: alert_master( "SQL ERROR", """Se ha producido un error ejecutando la siguiente query: {}. Con los siguientes parametros: {} {} """.format( query_statement, query_parameters, e ), ) else: raise Exception("Could not connect to the database.") def query_dict(self, query_statement, query_parameters=None): return self.query(query_statement, query_parameters, dictionary=True) def get_anunciosdb(): return DatabaseWrapper(anuncios_db_parameters) def get_tasksdb(): return DatabaseWrapper(tasks_db_parameters)