From 7962bd11e693001e1044768c592258b0d15297ae Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:27:39 +0100 Subject: [PATCH 01/15] Added trino client do dependencies. --- requirements-dev.txt | 3 ++- setup.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 40f6a2a..f92f5bc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,4 +2,5 @@ prefect==1.2.2 requests==2.28.1 boto3==1.26.40 pytest==7.2.0 -httpretty==1.1.4 \ No newline at end of file +httpretty==1.1.4 +trino==0.321.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1b6c89e..ec38921 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,5 @@ setup( package_dir={"lolafect": "lolafect"}, include_package_data=True, python_requires=">=3.7", - install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"], + install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40", "trino==0.321.0"], ) From caa69d85cdd69f27781a043e9a6144a7644659a9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:42:57 +0100 Subject: [PATCH 02/15] Add trino task. --- lolafect/connections.py | 35 +++++++++++++++++++++++++++++++++++ lolafect/defaults.py | 1 + 2 files changed, 36 insertions(+) create mode 100644 lolafect/connections.py diff --git a/lolafect/connections.py b/lolafect/connections.py new file mode 100644 index 0000000..fc0854a --- /dev/null +++ b/lolafect/connections.py @@ -0,0 +1,35 @@ +import datetime + +from prefect import task +from trino.auth import BasicAuthentication +import trino + +from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME + + +@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10)) +def connect_to_trino( + trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME +) -> trino.dbapi.Connection: + """ + Open a connection to the specified trino instance and return it. + + + :param trino_credentials: a dict with the host, port, user and password. + :param http_schema: which http schema to use in the connection. + :return: + """ + print("Connecting to Trino.") + connection = trino.dbapi.connect( + host=trino_credentials["host"], + port=trino_credentials["port"], + user=trino_credentials["user"], + http_scheme=http_schema, + auth=BasicAuthentication( + trino_credentials["user"], + trino_credentials["password"], + ), + ) + print("Connected to Trino.") + + return connection diff --git a/lolafect/defaults.py b/lolafect/defaults.py index feb66e4..1d4af84 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" +DEFAULT_TRINO_HTTP_SCHEME = "https" From 28831bea8c996635bcd425d2f3036e4248958b7e Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:45:57 +0100 Subject: [PATCH 03/15] Improve logging. --- lolafect/connections.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index fc0854a..c2fec0c 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -1,5 +1,6 @@ import datetime +import prefect from prefect import task from trino.auth import BasicAuthentication import trino @@ -14,12 +15,15 @@ def connect_to_trino( """ Open a connection to the specified trino instance and return it. - :param trino_credentials: a dict with the host, port, user and password. :param http_schema: which http schema to use in the connection. :return: """ - print("Connecting to Trino.") + logger = prefect.context.get("logger") + logger.info( + f"Connecting to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) + connection = trino.dbapi.connect( host=trino_credentials["host"], port=trino_credentials["port"], @@ -30,6 +34,8 @@ def connect_to_trino( trino_credentials["password"], ), ) - print("Connected to Trino.") + logger.info( + f"Connected to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) return connection From 1e0faf702f3f32d851f6eca33018a88fb79ff536 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 20 Jan 2023 15:01:07 +0100 Subject: [PATCH 04/15] Added integration test for connecting to Trino --- tests/test_integration/test_connections.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 tests/test_integration/test_connections.py diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py new file mode 100644 index 0000000..5b79a46 --- /dev/null +++ b/tests/test_integration/test_connections.py @@ -0,0 +1,18 @@ +import trino + +from lolafect.lolaconfig import build_lolaconfig +from lolafect.connections import connect_to_trino + +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + + +def test_that_trino_connector_works_properly(): + + connection = connect_to_trino.run(TEST_LOLACONFIG.TRINO_CREDENTIALS) + + connection.cursor().execute("SELECT 1") + + assert type(connection) == trino.dbapi.Connection From a15b2f4a861fbbd5c0a79ed4c9a9af353cc62c52 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 20 Jan 2023 15:53:45 +0100 Subject: [PATCH 05/15] Add close connection, update tests. --- lolafect/connections.py | 11 +++++++++++ tests/test_integration/test_connections.py | 8 ++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index c2fec0c..eb14237 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -2,6 +2,7 @@ import datetime import prefect from prefect import task +from prefect.triggers import all_finished from trino.auth import BasicAuthentication import trino @@ -39,3 +40,13 @@ def connect_to_trino( ) return connection + + +@task(trigger=all_finished) +def close_trino_connection(trino_connection: trino.dbapi.Connection): + logger = prefect.context.get("logger") + if isinstance(trino_connection, trino.dbapi.Connection): + trino_connection.close() + logger.info("Trino connection closed successfully.") + return + logger.info("No connection received.") diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 5b79a46..bbf8ddd 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,7 +1,7 @@ import trino from lolafect.lolaconfig import build_lolaconfig -from lolafect.connections import connect_to_trino +from lolafect.connections import connect_to_trino, close_trino_connection # This testing suite requires: # - The calling shell to have permission in AWS @@ -9,10 +9,10 @@ from lolafect.connections import connect_to_trino TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") -def test_that_trino_connector_works_properly(): +def test_that_trino_connect_and_disconnect_works_properly(): - connection = connect_to_trino.run(TEST_LOLACONFIG.TRINO_CREDENTIALS) + connection = connect_to_trino.run(trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS) connection.cursor().execute("SELECT 1") - assert type(connection) == trino.dbapi.Connection + close_trino_connection.run(trino_connection=connection) \ No newline at end of file From 26d1f10a003edf1df88296043c38223e47057f66 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 12:46:56 +0100 Subject: [PATCH 06/15] Improved comments --- tests/test_integration/test_connections.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index bbf8ddd..cd3c083 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,11 +1,18 @@ -import trino - from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import connect_to_trino, close_trino_connection +#__ __ _____ _ _ _____ _ _ _____ _ +#\ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) # This testing suite requires: # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely on many external. Instead, use them manually to check yourself that things are +# working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") From 910373f19ce611c47b47bc79e31cf8d2a4100134 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:06:19 +0100 Subject: [PATCH 07/15] Docstrings and typing. --- lolafect/connections.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index eb14237..40c450f 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -43,7 +43,14 @@ def connect_to_trino( @task(trigger=all_finished) -def close_trino_connection(trino_connection: trino.dbapi.Connection): +def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: + """ + Close a Trino connection, or do nothing if what has been passed is not a + Trino connection. + + :param trino_connection: a trino connection. + :return: None + """ logger = prefect.context.get("logger") if isinstance(trino_connection, trino.dbapi.Connection): trino_connection.close() From 73eae76739ab5194d92fcc6d751f61f89a771185 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:06:48 +0100 Subject: [PATCH 08/15] Formatting --- tests/test_integration/test_connections.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index cd3c083..5156f52 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,8 +1,8 @@ from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import connect_to_trino, close_trino_connection -#__ __ _____ _ _ _____ _ _ _____ _ -#\ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | # \ \ /\ / / \ | |__) | \| | | | | \| | | __| | # \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | # \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| @@ -18,8 +18,10 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") def test_that_trino_connect_and_disconnect_works_properly(): - connection = connect_to_trino.run(trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS) + connection = connect_to_trino.run( + trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS + ) connection.cursor().execute("SELECT 1") - close_trino_connection.run(trino_connection=connection) \ No newline at end of file + close_trino_connection.run(trino_connection=connection) From aabdbe0508be192065496f25aacd04fed0950675 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:07:22 +0100 Subject: [PATCH 09/15] Fixed comments --- tests/test_integration/test_connections.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 5156f52..b8e32be 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -11,8 +11,8 @@ from lolafect.connections import connect_to_trino, close_trino_connection # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network # - Do not use this tests as part of CI/CD pipelines since they are not idempotent and -# rely on many external. Instead, use them manually to check yourself that things are -# working properly. +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") From d9b7c9a9e25a527cbc4b3acc2dc9e7eda0a5ab61 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:07:35 +0100 Subject: [PATCH 10/15] Fixed comments --- tests/test_integration/test_connections.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index b8e32be..74944e0 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -3,10 +3,10 @@ from lolafect.connections import connect_to_trino, close_trino_connection # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | -# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | -# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | -# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| -# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) # This testing suite requires: # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network From 738c4515181f42583d34325dfef1878e5e547cc4 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:08:21 +0100 Subject: [PATCH 11/15] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78124e9..32fb418 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Added Trino connection capabilities in `connections` module. + + ## [0.2.0] - 2023-01-19 ### Added From 6cefb7ed03ce3e5cd6bfd1f371ccd5407232b7a3 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:27:17 +0100 Subject: [PATCH 12/15] Update readme with connection example --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index e7b7994..d04f3c8 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,19 @@ lolaconfig = build_lolaconfig( ) ``` +**Connect to a Trino server** + +```python +from lolafect.connections import connect_to_trino, close_trino_connection + + connection = connect_to_trino.run( + trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS + ) + connection.cursor().execute("SELECT 1") + close_trino_connection.run(trino_connection=connection) +``` + + **Send a warning message to slack if your tasks fails** ```python From bc524358ff6b8911c2054e6fadb792c1932029ab Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:28:08 +0100 Subject: [PATCH 13/15] Moved unit tests to folder --- tests/{ => test_unit}/test_lolaconfig.py | 0 tests/{ => test_unit}/test_slack.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/{ => test_unit}/test_lolaconfig.py (100%) rename tests/{ => test_unit}/test_slack.py (100%) diff --git a/tests/test_lolaconfig.py b/tests/test_unit/test_lolaconfig.py similarity index 100% rename from tests/test_lolaconfig.py rename to tests/test_unit/test_lolaconfig.py diff --git a/tests/test_slack.py b/tests/test_unit/test_slack.py similarity index 100% rename from tests/test_slack.py rename to tests/test_unit/test_slack.py From e355e771b0308855049bdccf80ac6fa26955064e Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:35:37 +0100 Subject: [PATCH 14/15] Updated docs on how to run tests. --- README.md | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d04f3c8..581b955 100644 --- a/README.md +++ b/README.md @@ -70,12 +70,30 @@ with Flow(...) as flow: ## How to test +There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our +AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The +recommended policy is: + +- Use the unit tests in any CI process you want. +- Use the unit tests frequently as you code. +- Do not use the integration tests in CI processes. +- Use the integration tests as milestone checks when finishing feature branches. +- Make sure to ensure integration tests are working before making a new release. + +When building new tests, please keep this philosophy in mind. + + IDE-agnostic: 1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`. -2. Run: `pytests tests` +2. Run: + - For all tests: `pytests tests` + - Only unit tests: `pytest tests/test_unit` + - Only integration tests: `pytest tests/test_integration` In Pycharm: - If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test - folder and allow you to run the test suite within the IDE. \ No newline at end of file + folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration + tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use + the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that. \ No newline at end of file From ee489edd6193859c0bae96dac7a5cbb5fc2f4020 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:37:12 +0100 Subject: [PATCH 15/15] Improved code example --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 581b955..9160eb3 100644 --- a/README.md +++ b/README.md @@ -41,11 +41,15 @@ lolaconfig = build_lolaconfig( ```python from lolafect.connections import connect_to_trino, close_trino_connection +with Flow(...) as flow: connection = connect_to_trino.run( trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS ) - connection.cursor().execute("SELECT 1") - close_trino_connection.run(trino_connection=connection) + task_result = some_trino_related_task(trino_connection=connection) + close_trino_connection.run( + trino_connection=connection, + upstream_tasks=[task_result] + ) ```