Docstrings, typing, a bit of renaming and minor stuff.
This commit is contained in:
parent
347d3a969d
commit
7dba389eb5
3 changed files with 125 additions and 63 deletions
93
connections.py
Normal file
93
connections.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
from typing import Union, Callable
|
||||
|
||||
import mysql.connector
|
||||
import trino.dbapi
|
||||
from trino.auth import BasicAuthentication
|
||||
from trino.dbapi import connect
|
||||
|
||||
|
||||
def get_connection(connection_config: dict) -> Union[trino.dbapi.Connection]:
|
||||
"""
|
||||
Pick the right way to build a connection and pass it the connection details.
|
||||
|
||||
:param connection_config: confi
|
||||
:return:
|
||||
"""
|
||||
connection_builder = pick_connection_builder(connection_config["engine"])
|
||||
connection = connection_builder(connection_config)
|
||||
return connection
|
||||
|
||||
|
||||
def get_possible_connection_builders() -> dict:
|
||||
"""
|
||||
Get the complete list of connection builders.
|
||||
|
||||
:return: a dict where the keys are the strings that identify each
|
||||
connection engine and the values are the callable function that will return
|
||||
a valid connection object.
|
||||
"""
|
||||
return {
|
||||
"trino": get_connection_to_trino,
|
||||
"mysql": get_connection_to_mysql,
|
||||
}
|
||||
|
||||
|
||||
def pick_connection_builder(connection_engine_name: str) -> Callable:
|
||||
"""
|
||||
Get a connection builder from a string name.
|
||||
|
||||
:param connection_engine_name: the string defining the connection engine.
|
||||
:return: the connection builder function.
|
||||
"""
|
||||
possible_connection_builders = get_possible_connection_builders()
|
||||
|
||||
try:
|
||||
connection_builder = possible_connection_builders[connection_engine_name]
|
||||
except KeyError:
|
||||
raise ValueError(
|
||||
f"Connection type {connection_engine_name} is unknown. Please review config."
|
||||
)
|
||||
|
||||
return connection_builder
|
||||
|
||||
|
||||
def get_connection_to_trino(connection_config: dict) -> trino.dbapi.Connection:
|
||||
"""
|
||||
Build a connection to Trino from the passed config.
|
||||
|
||||
:param connection_config: specifies host, port, etc.
|
||||
:return: the connection object
|
||||
"""
|
||||
|
||||
return connect(
|
||||
host=connection_config["host"],
|
||||
port=connection_config["port"],
|
||||
user=connection_config["user"],
|
||||
auth=BasicAuthentication(
|
||||
connection_config["user"],
|
||||
connection_config["password"],
|
||||
),
|
||||
http_scheme=connection_config["http_scheme"],
|
||||
catalog=connection_config["catalog"],
|
||||
schema=connection_config["schema"],
|
||||
)
|
||||
|
||||
|
||||
def get_connection_to_mysql(
|
||||
connection_config,
|
||||
) -> mysql.connector.connection.MySQLConnection:
|
||||
"""
|
||||
Build a connection to MySQL from the passed config.
|
||||
|
||||
:param connection_config: specifies host, port, etc.
|
||||
:return: the connection object
|
||||
"""
|
||||
connection = mysql.connector.connect(
|
||||
host=connection_config["host"],
|
||||
port=connection_config["port"],
|
||||
user=connection_config["user"],
|
||||
password=connection_config["password"],
|
||||
database=connection_config["schema"],
|
||||
)
|
||||
|
||||
return connection
|
||||
Loading…
Add table
Add a link
Reference in a new issue