data-jupyter-notebooks/utils/dwh_utils.py
Oriol Roqué Paniagua 38f63afbf7 Merged PR 5371: Flagging w. random predictor + DWH connection improvements + restructure
Connecting to DWH:
* Any existing notebook (AB, Flagging & Template) now have an initial simplified block to connect to the DWH. This is done to handle the DRY, as we're going to start adding more and more experiment notebooks very soon (and we have already 4 notebooks).
* This reads from a new `utils/dwh_utils.py` in which we handle the connection and test it accordingly.
* This also requires an optional `settings.json` path configuration to avoid warnings (not errors) when reading from `dwh_utils`.

Flagging:
* All flagging notebooks now go within the folder `data_driven_risk_assessment`. The already existing notebook `flagging_performance_monitoring` has also been moved here.
* There's a new `experiments` folder to store the different experiments on flagging.
* A new notebook has been added containing a straight-forward baseline: a random predictor, which randomly flags as risk bookings on a test set based on the observed booking claim rate on a previous train dataset.

I confirm that all existing notebooks work well after the connection changes.

Once merged, or to review, you will need to re-install requirements.txt as I added sklearn.

Related work items: #30804
2025-06-10 05:59:12 +00:00

43 lines
No EOL
1.3 KiB
Python

import pathlib
import yaml
import pandas as pd
from sqlalchemy import create_engine
# Path to credentials YAML file
CREDS_FILEPATH = pathlib.Path.home() / ".superhog-dwh" / "credentials.yml"
def read_credentials(yaml_path=CREDS_FILEPATH, env="prd"):
with open(yaml_path, "r") as file:
credentials = yaml.safe_load(file)
return credentials["envs"][env]
def create_postgres_engine(creds: dict):
user = creds["user"]
password = creds["password"]
host = creds["host"]
port = creds["port"]
database = creds["database"]
connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
return create_engine(connection_string)
def query_to_dataframe(engine, query: str):
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
# Optional test function to validate setup
def test_connection():
print(f"🔌 Testing connection using credentials at: {CREDS_FILEPATH}")
try:
creds = read_credentials()
engine = create_postgres_engine(creds)
df = query_to_dataframe(engine, "SELECT 1;")
print("✅ Connection successful.")
except Exception as e:
print("❌ Connection failed:")
print(e)
# Only run the test if this script is executed directly
if __name__ == "__main__":
test_connection()