query-performance-gauge/query_performance_gauge.py
2022-07-21 11:56:41 +02:00

93 lines
2.9 KiB
Python

import time
import traceback
from typing import Union, Callable
import trino.dbapi
from trino.dbapi import connect
from trino.auth import BasicAuthentication
import mysql.connector
def main(config: dict) -> None:
print("Starting the measuring session.")
connection = get_connection(config)
for query_config in config["queries_to_measure"]:
try:
query = TestableQuery(
name=query_config["name"], query_string=query_config["query_string"]
)
measure_query_runtime(connection, query)
except Exception as e:
print(f"""Something went wrong with query {query_config["name"]}.""")
print(f"{traceback.format_exc()}")
print("Finished the measuring session.")
class TestableQuery:
def __init__(self, name: str, query_string: str):
self.name = name
self.query_string = query_string
def measure_query_runtime(connection: trino.dbapi.Connection, query: TestableQuery):
start_time = time.time()
cur = connection.cursor()
cur.execute(query.query_string)
rows = cur.fetchall()
print(f"Query '{query.name}' took {int(time.time() - start_time)} seconds to run.")
def get_connection(config: dict) -> Union[trino.dbapi.Connection]:
connection_builder = pick_connection_builder(config["connection_details"]["engine"])
connection = connection_builder(config)
return connection
def get_possible_connection_builders() -> dict:
return {
"trino": get_connection_to_trino,
"mysql": get_connection_to_mysql,
}
def pick_connection_builder(connection_engine_name: str) -> Callable:
possible_connection_builders = get_possible_connection_builders()
try:
connection_builder = possible_connection_builders[connection_engine_name]
except KeyError:
raise ValueError(
f"Connection type {connection_engine_name} is unknown. Please review config."
)
return connection_builder
def get_connection_to_trino(config):
return connect(
host=config["connection_details"]["host"],
port=config["connection_details"]["port"],
user=config["connection_details"]["user"],
auth=BasicAuthentication(
config["connection_details"]["user"],
config["connection_details"]["password"],
),
http_scheme=config["connection_details"]["http_scheme"],
catalog=config["connection_details"]["catalog"],
schema=config["connection_details"]["schema"],
)
def get_connection_to_mysql(config) -> mysql.connector.connection.MySQLConnection:
connection = mysql.connector.connect(
host=config["connection_details"]["host"],
port=config["connection_details"]["port"],
user=config["connection_details"]["user"],
password=config["connection_details"]["password"],
database=config["connection_details"]["schema"],
)
return connection