41 lines
1.2 KiB
Python
41 lines
1.2 KiB
Python
import datetime
|
|
|
|
import prefect
|
|
from prefect import task
|
|
from trino.auth import BasicAuthentication
|
|
import trino
|
|
|
|
from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME
|
|
|
|
|
|
@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10))
|
|
def connect_to_trino(
|
|
trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME
|
|
) -> trino.dbapi.Connection:
|
|
"""
|
|
Open a connection to the specified trino instance and return it.
|
|
|
|
:param trino_credentials: a dict with the host, port, user and password.
|
|
:param http_schema: which http schema to use in the connection.
|
|
:return:
|
|
"""
|
|
logger = prefect.context.get("logger")
|
|
logger.info(
|
|
f"Connecting to Trino at {trino_credentials['host']}:{trino_credentials['port']}."
|
|
)
|
|
|
|
connection = trino.dbapi.connect(
|
|
host=trino_credentials["host"],
|
|
port=trino_credentials["port"],
|
|
user=trino_credentials["user"],
|
|
http_scheme=http_schema,
|
|
auth=BasicAuthentication(
|
|
trino_credentials["user"],
|
|
trino_credentials["password"],
|
|
),
|
|
)
|
|
logger.info(
|
|
f"Connected to Trino at {trino_credentials['host']}:{trino_credentials['port']}."
|
|
)
|
|
|
|
return connection
|