From caa69d85cdd69f27781a043e9a6144a7644659a9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:42:57 +0100 Subject: [PATCH] Add trino task. --- lolafect/connections.py | 35 +++++++++++++++++++++++++++++++++++ lolafect/defaults.py | 1 + 2 files changed, 36 insertions(+) create mode 100644 lolafect/connections.py diff --git a/lolafect/connections.py b/lolafect/connections.py new file mode 100644 index 0000000..fc0854a --- /dev/null +++ b/lolafect/connections.py @@ -0,0 +1,35 @@ +import datetime + +from prefect import task +from trino.auth import BasicAuthentication +import trino + +from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME + + +@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10)) +def connect_to_trino( + trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME +) -> trino.dbapi.Connection: + """ + Open a connection to the specified trino instance and return it. + + + :param trino_credentials: a dict with the host, port, user and password. + :param http_schema: which http schema to use in the connection. + :return: + """ + print("Connecting to Trino.") + connection = trino.dbapi.connect( + host=trino_credentials["host"], + port=trino_credentials["port"], + user=trino_credentials["user"], + http_scheme=http_schema, + auth=BasicAuthentication( + trino_credentials["user"], + trino_credentials["password"], + ), + ) + print("Connected to Trino.") + + return connection diff --git a/lolafect/defaults.py b/lolafect/defaults.py index feb66e4..1d4af84 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" +DEFAULT_TRINO_HTTP_SCHEME = "https"