Add trino task.

This commit is contained in:
Pablo Martin 2023-01-19 17:42:57 +01:00
parent 7962bd11e6
commit caa69d85cd
2 changed files with 36 additions and 0 deletions

35
lolafect/connections.py Normal file
View file

@ -0,0 +1,35 @@
import datetime
from prefect import task
from trino.auth import BasicAuthentication
import trino
from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME
@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def connect_to_trino(
trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME
) -> trino.dbapi.Connection:
"""
Open a connection to the specified trino instance and return it.
:param trino_credentials: a dict with the host, port, user and password.
:param http_schema: which http schema to use in the connection.
:return:
"""
print("Connecting to Trino.")
connection = trino.dbapi.connect(
host=trino_credentials["host"],
port=trino_credentials["port"],
user=trino_credentials["user"],
http_scheme=http_schema,
auth=BasicAuthentication(
trino_credentials["user"],
trino_credentials["password"],
),
)
print("Connected to Trino.")
return connection

View file

@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json"
DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production"
DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_KUBERNETES_LABELS = ["k8s"]
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
DEFAULT_TRINO_HTTP_SCHEME = "https"