# -*- coding: utf-8 -*- import sys from typing import Union sys.path.append("..") import mysql.connector from core.alerts import alert_master from core.config import current_db_parameters anuncios_db_parameters = {"database": "anuncios", **current_db_parameters} tasks_db_parameters = {"database": "tasks", **current_db_parameters} class DatabaseWrapper: """ Wrapper to mantain a connection to the database and launch queries to it in a more convenient way. """ def __init__(self, connection_parameters: dict) -> None: """ Initializes object with parameters and sends a ping to the database to confirm it is working. :param connection_parameters: a dictionary containing the connection parameters for the database. """ self.host = connection_parameters["host"] self.database = connection_parameters["database"] self.user = connection_parameters["user"] self.password = connection_parameters["password"] self.connection = None self.ping() def connect(self) -> None: """ Starts a connection to the database. :return: None """ self.connection = mysql.connector.connect( host=self.host, database=self.database, user=self.user, password=self.password, autocommit=False, ) def disconnect(self) -> None: """ Ends the current connection to the database. :return: None """ if self.connection.is_connected(): self.connection.disconnect() def ping(self) -> None: """ Connects and disconnects to the database to test whether it is up. :return: None """ self.connect() self.disconnect() def query( self, query_statement: str, query_parameters: Union[dict, None] = None, dictionary: bool = False, ) -> mysql.connector.connection.MySQLCursor: """ Connects to the database, sends a query and returns the cursor. :param query_statement: string with the query to execute :param query_parameters: parameters to substitute in the query string :param dictionary: whether the parameters are structured as a dict or not :return: cursor for the query """ self.connect() if not self.connection.is_connected(): raise Exception("Could not connect to the database.") try: execution_cursor = self.connection.cursor( dictionary=dictionary, buffered=True ) execution_cursor.execute(query_statement, query_parameters) self.connection.commit() self.disconnect() return execution_cursor except Exception as e: alert_master( "SQL ERROR", """Se ha producido un error ejecutando la siguiente query: {}. Con los siguientes parametros: {} {} """.format( query_statement, query_parameters, e ), ) def get_anunciosdb(): return DatabaseWrapper(anuncios_db_parameters) def get_tasksdb(): return DatabaseWrapper(tasks_db_parameters)