diff --git a/CHANGELOG.md b/CHANGELOG.md index 78124e9..32fb418 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Added Trino connection capabilities in `connections` module. + + ## [0.2.0] - 2023-01-19 ### Added diff --git a/README.md b/README.md index e7b7994..9160eb3 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,23 @@ lolaconfig = build_lolaconfig( ) ``` +**Connect to a Trino server** + +```python +from lolafect.connections import connect_to_trino, close_trino_connection + +with Flow(...) as flow: + connection = connect_to_trino.run( + trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS + ) + task_result = some_trino_related_task(trino_connection=connection) + close_trino_connection.run( + trino_connection=connection, + upstream_tasks=[task_result] + ) +``` + + **Send a warning message to slack if your tasks fails** ```python @@ -57,12 +74,30 @@ with Flow(...) as flow: ## How to test +There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our +AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The +recommended policy is: + +- Use the unit tests in any CI process you want. +- Use the unit tests frequently as you code. +- Do not use the integration tests in CI processes. +- Use the integration tests as milestone checks when finishing feature branches. +- Make sure to ensure integration tests are working before making a new release. + +When building new tests, please keep this philosophy in mind. + + IDE-agnostic: 1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`. -2. Run: `pytests tests` +2. Run: + - For all tests: `pytests tests` + - Only unit tests: `pytest tests/test_unit` + - Only integration tests: `pytest tests/test_integration` In Pycharm: - If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test - folder and allow you to run the test suite within the IDE. \ No newline at end of file + folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration + tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use + the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that. \ No newline at end of file diff --git a/lolafect/connections.py b/lolafect/connections.py new file mode 100644 index 0000000..40c450f --- /dev/null +++ b/lolafect/connections.py @@ -0,0 +1,59 @@ +import datetime + +import prefect +from prefect import task +from prefect.triggers import all_finished +from trino.auth import BasicAuthentication +import trino + +from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME + + +@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10)) +def connect_to_trino( + trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME +) -> trino.dbapi.Connection: + """ + Open a connection to the specified trino instance and return it. + + :param trino_credentials: a dict with the host, port, user and password. + :param http_schema: which http schema to use in the connection. + :return: + """ + logger = prefect.context.get("logger") + logger.info( + f"Connecting to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) + + connection = trino.dbapi.connect( + host=trino_credentials["host"], + port=trino_credentials["port"], + user=trino_credentials["user"], + http_scheme=http_schema, + auth=BasicAuthentication( + trino_credentials["user"], + trino_credentials["password"], + ), + ) + logger.info( + f"Connected to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) + + return connection + + +@task(trigger=all_finished) +def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: + """ + Close a Trino connection, or do nothing if what has been passed is not a + Trino connection. + + :param trino_connection: a trino connection. + :return: None + """ + logger = prefect.context.get("logger") + if isinstance(trino_connection, trino.dbapi.Connection): + trino_connection.close() + logger.info("Trino connection closed successfully.") + return + logger.info("No connection received.") diff --git a/lolafect/defaults.py b/lolafect/defaults.py index feb66e4..1d4af84 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" +DEFAULT_TRINO_HTTP_SCHEME = "https" diff --git a/requirements-dev.txt b/requirements-dev.txt index 40f6a2a..f92f5bc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,4 +2,5 @@ prefect==1.2.2 requests==2.28.1 boto3==1.26.40 pytest==7.2.0 -httpretty==1.1.4 \ No newline at end of file +httpretty==1.1.4 +trino==0.321.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1b6c89e..ec38921 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,5 @@ setup( package_dir={"lolafect": "lolafect"}, include_package_data=True, python_requires=">=3.7", - install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"], + install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40", "trino==0.321.0"], ) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py new file mode 100644 index 0000000..74944e0 --- /dev/null +++ b/tests/test_integration/test_connections.py @@ -0,0 +1,27 @@ +from lolafect.lolaconfig import build_lolaconfig +from lolafect.connections import connect_to_trino, close_trino_connection + +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + + +def test_that_trino_connect_and_disconnect_works_properly(): + + connection = connect_to_trino.run( + trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS + ) + + connection.cursor().execute("SELECT 1") + + close_trino_connection.run(trino_connection=connection) diff --git a/tests/test_lolaconfig.py b/tests/test_unit/test_lolaconfig.py similarity index 100% rename from tests/test_lolaconfig.py rename to tests/test_unit/test_lolaconfig.py diff --git a/tests/test_slack.py b/tests/test_unit/test_slack.py similarity index 100% rename from tests/test_slack.py rename to tests/test_unit/test_slack.py