Merge pull request #6 from lolamarket/feature/trino_connection

Feature/trino connection
This commit is contained in:
pablolola 2023-01-23 14:37:54 +01:00 committed by GitHub
commit dc732fc34d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 4 deletions

View file

@ -2,6 +2,13 @@
All notable changes to this project will be documented in this file.
## [Unreleased]
### Added
- Added Trino connection capabilities in `connections` module.
## [0.2.0] - 2023-01-19
### Added

View file

@ -36,6 +36,23 @@ lolaconfig = build_lolaconfig(
)
```
**Connect to a Trino server**
```python
from lolafect.connections import connect_to_trino, close_trino_connection
with Flow(...) as flow:
connection = connect_to_trino.run(
trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS
)
task_result = some_trino_related_task(trino_connection=connection)
close_trino_connection.run(
trino_connection=connection,
upstream_tasks=[task_result]
)
```
**Send a warning message to slack if your tasks fails**
```python
@ -57,12 +74,30 @@ with Flow(...) as flow:
## How to test
There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our
AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The
recommended policy is:
- Use the unit tests in any CI process you want.
- Use the unit tests frequently as you code.
- Do not use the integration tests in CI processes.
- Use the integration tests as milestone checks when finishing feature branches.
- Make sure to ensure integration tests are working before making a new release.
When building new tests, please keep this philosophy in mind.
IDE-agnostic:
1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`.
2. Run: `pytests tests`
2. Run:
- For all tests: `pytests tests`
- Only unit tests: `pytest tests/test_unit`
- Only integration tests: `pytest tests/test_integration`
In Pycharm:
- If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test
folder and allow you to run the test suite within the IDE.
folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration
tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use
the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that.

59
lolafect/connections.py Normal file
View file

@ -0,0 +1,59 @@
import datetime
import prefect
from prefect import task
from prefect.triggers import all_finished
from trino.auth import BasicAuthentication
import trino
from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME
@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def connect_to_trino(
trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME
) -> trino.dbapi.Connection:
"""
Open a connection to the specified trino instance and return it.
:param trino_credentials: a dict with the host, port, user and password.
:param http_schema: which http schema to use in the connection.
:return:
"""
logger = prefect.context.get("logger")
logger.info(
f"Connecting to Trino at {trino_credentials['host']}:{trino_credentials['port']}."
)
connection = trino.dbapi.connect(
host=trino_credentials["host"],
port=trino_credentials["port"],
user=trino_credentials["user"],
http_scheme=http_schema,
auth=BasicAuthentication(
trino_credentials["user"],
trino_credentials["password"],
),
)
logger.info(
f"Connected to Trino at {trino_credentials['host']}:{trino_credentials['port']}."
)
return connection
@task(trigger=all_finished)
def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None:
"""
Close a Trino connection, or do nothing if what has been passed is not a
Trino connection.
:param trino_connection: a trino connection.
:return: None
"""
logger = prefect.context.get("logger")
if isinstance(trino_connection, trino.dbapi.Connection):
trino_connection.close()
logger.info("Trino connection closed successfully.")
return
logger.info("No connection received.")

View file

@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json"
DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production"
DEFAULT_KUBERNETES_LABELS = ["k8s"]
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
DEFAULT_TRINO_HTTP_SCHEME = "https"

View file

@ -3,3 +3,4 @@ requests==2.28.1
boto3==1.26.40
pytest==7.2.0
httpretty==1.1.4
trino==0.321.0

View file

@ -23,5 +23,5 @@ setup(
package_dir={"lolafect": "lolafect"},
include_package_data=True,
python_requires=">=3.7",
install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"],
install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40", "trino==0.321.0"],
)

View file

@ -0,0 +1,27 @@
from lolafect.lolaconfig import build_lolaconfig
from lolafect.connections import connect_to_trino, close_trino_connection
# __ __ _____ _ _ _____ _ _ _____ _
# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| |
# \ \ /\ / / \ | |__) | \| | | | | \| | | __| |
# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | |
# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_|
# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_)
# This testing suite requires:
# - The calling shell to have permission in AWS
# - The calling shell to be within the Mercadão network
# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and
# rely external resources. Instead, use them manually to check yourself that things
# are working properly.
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
def test_that_trino_connect_and_disconnect_works_properly():
connection = connect_to_trino.run(
trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS
)
connection.cursor().execute("SELECT 1")
close_trino_connection.run(trino_connection=connection)