From c3507cccec0fea4c46677919df06f38919d7c01e Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 29 Mar 2023 17:17:44 +0200 Subject: [PATCH 01/18] Refactor shared elements to follow DRY --- tests/test_integration/test_data_testing.py | 85 ++++++++++----------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 8b456f2..5894dd1 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -19,29 +19,44 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") +TEST_QUERY = """ +SELECT 1 AS a_one, + "lol" AS a_string, + NULL AS a_null +""" + +TEST_EXPECTATIONS_THAT_FIT_DATA = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern": "%lol%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "a_null"}, + ), +] + +TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 2, "max_value": 2}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern": "%xD%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_not_be_null", + kwargs={"column": "a_null"}, + ), +] + def test_validation_on_mysql_succeeds(): - - test_query = """ - SELECT 1 AS a_one, - "lol" AS a_string, - NULL AS a_null - """ - test_expectations = [ - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_between", - kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern": "%lol%"}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_null", - kwargs={"column": "a_null"}, - ), - ] - ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, @@ -58,8 +73,8 @@ def test_validation_on_mysql_succeeds(): "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], }, - query=test_query, - expectation_configurations=test_expectations, + query=TEST_QUERY, + expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) @@ -70,26 +85,6 @@ def test_validation_on_mysql_succeeds(): def test_validation_on_mysql_fails(): - test_query = """ - SELECT 1 AS a_one, - "lol" AS a_string, - NULL AS a_null - """ - test_expectations = [ - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_between", - kwargs={"column": "a_one", "min_value": 2, "max_value": 2}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern": "%xD%"}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_not_be_null", - kwargs={"column": "a_null"}, - ), - ] - ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, @@ -106,8 +101,8 @@ def test_validation_on_mysql_fails(): "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], }, - query=test_query, - expectation_configurations=test_expectations, + query=TEST_QUERY, + expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) From b51c810a394909e74f4ec6aac6d6b9214cf255ab Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 29 Mar 2023 17:18:00 +0200 Subject: [PATCH 02/18] Add tests for trino feature --- tests/test_integration/test_data_testing.py | 40 ++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 5894dd1..919903d 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -1,7 +1,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfiguration from lolafect.lolaconfig import build_lolaconfig -from lolafect.data_testing import run_data_test_on_mysql +from lolafect.data_testing import run_data_test_on_mysql, run_data_test_on_trino from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel # __ __ _____ _ _ _____ _ _ _____ _ @@ -110,3 +110,41 @@ def test_validation_on_mysql_fails(): data_test_failed = validation_result["success"] == False assert data_test_failed + + +def test_validation_on_trino_succeeds(): + validation_result = run_data_test_on_trino.run( + name="lolafect-testing-test_validation_on_mysql_fails", + trino_credentials={ + "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], + "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], + "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], + }, + query=TEST_QUERY, + expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, + ) + + data_test_passed = validation_result["success"] == True + + assert data_test_passed + + +def test_validation_on_trino_fails(): + validation_result = run_data_test_on_trino.run( + name="lolafect-testing-test_validation_on_mysql_fails", + trino_credentials={ + "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], + "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], + "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], + }, + query=TEST_QUERY, + expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, + ) + + data_test_failed = validation_result["success"] == False + + assert data_test_failed From 814a376e02818f08671075dd3532fd272bc1a709 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 29 Mar 2023 17:18:14 +0200 Subject: [PATCH 03/18] Add entry in readme showcasing new trino feature --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index d9cbbf1..134b28d 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,28 @@ with Flow(...) as flow: print("The data is bad!!!") ``` +**Run a Great Expectations validation on a Trino query** + +```python +from lolafect.data_testing import run_data_test_on_trino + +with Flow(...) as flow: + + my_query = """SELECT something FROM somewhere""" + my_expectations = {...} # A bunch of things you want to validate on the result of the query + + validation_results = run_data_test_on_trino( + name="my-cool-validation", + trino_credentials={...}, + query=my_query, + expectations=my_expectations + ) + + if not validation_results["success"]: + print("The data is bad!!!") +``` + + ### Slack **Send a warning message to slack if your tasks fails** From 5b97864e8d3d458c39375bb3f08ed7d4e1ca088a Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 29 Mar 2023 17:18:41 +0200 Subject: [PATCH 04/18] Signature for new task --- lolafect/data_testing.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 4216d4b..863d6fe 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -60,6 +60,15 @@ def run_data_test_on_mysql( return results +@task() +def run_data_test_on_trino( + name: str, + trino_credentials: dict, + query: str, + expectation_configurations: List[ExpectationConfiguration], + great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, +) -> dict: + raise NotImplementedError("WIP") def _create_in_memory_data_context_for_mysql( mysql_credentials: dict, From 19211eee735575678dff3df6b336272c4bf0e0ed Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 29 Mar 2023 17:28:16 +0200 Subject: [PATCH 05/18] Naive duplication of MySQL elements but for Trino. Not working yet. --- lolafect/data_testing.py | 88 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 863d6fe..b1fe584 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -68,7 +68,43 @@ def run_data_test_on_trino( expectation_configurations: List[ExpectationConfiguration], great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, ) -> dict: - raise NotImplementedError("WIP") + """ + Validate a query and an expectation suite against a given Trino server. + + :param name: a unique name for the data test. + :param trino_credentials: credentials for the Trino cluster. + :param query: the query to test against. + :param expectation_configurations: the expectations on the dataset. + :param great_expectations_s3_bucket: the bucket where Great Expectations + files live. + :return: the result of the data test. + """ + logger = prefect.context.get("logger") + + logger.info("Creating data context.") + data_context = _create_in_memory_data_context_for_trino( + trino_credentials, great_expectations_s3_bucket + ) + logger.info("Data context created.") + logger.info("Creating expectation suite.") + data_context = _create_expectation_suite( + data_context, name, expectation_configurations + ) + logger.info("Expectation suite created.") + logger.info("Creating checkpoint.") + data_context = _create_checkpoint( + data_context, + f"{trino_credentials['host']}:{trino_credentials['port']}", + query, + name, + ) + logger.info("Checkpoint created.") + logger.info("Running checkpoint.") + results = data_context.run_checkpoint(f"{name}_checkpoint") + logger.info("Checkpoint finished.") + logger.info(f"Validation result: {results['success']}") + + return results def _create_in_memory_data_context_for_mysql( mysql_credentials: dict, @@ -120,6 +156,56 @@ def _create_in_memory_data_context_for_mysql( return data_context +def _create_in_memory_data_context_for_trino( + trino_credentials: dict, + great_expectations_s3_bucket: str, +) -> AbstractDataContext: + """ + Create a DataContext without a YAML config file and specify a Trino + datasource. + + :param trino_credentials: the creds to the mysql where the query will be + executed. + :param great_expectations_s3_bucket: the name of the bucket where Great + Exepctations files while be stored. + :return: the data context. + """ + + data_context = BaseDataContext( + project_config=DataContextConfig( + datasources={ + f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig( + class_name="Datasource", + execution_engine={ + "class_name": "SqlAlchemyExecutionEngine", + "connection_string": f"trino://%s:%s@%s:%s/{trino_credentials['db']}/{'SCHEMA_GOES_HERE'}" + % ( + trino_credentials["user"], + urlquote(trino_credentials["password"]), + trino_credentials["host"], + trino_credentials["port"], + ), + }, + data_connectors={ + "default_runtime_data_connector_name": { + "class_name": "RuntimeDataConnector", + "batch_identifiers": ["default_identifier_name"], + }, + "default_inferred_data_connector_name": { + "class_name": "InferredAssetSqlDataConnector", + "name": "whole_table", + }, + }, + ) + }, + store_backend_defaults=S3StoreBackendDefaults( + default_bucket_name=great_expectations_s3_bucket + ), + ) + ) + + return data_context + def _create_expectation_suite( data_context: AbstractDataContext, From 5066fe438221d38442f1e422b2fa45da6a0e7568 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 30 Mar 2023 13:58:15 +0200 Subject: [PATCH 06/18] Use single quotes so that query works both in MySQL and Trino. --- tests/test_integration/test_data_testing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 919903d..bcd8d28 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -21,7 +21,7 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") TEST_QUERY = """ SELECT 1 AS a_one, - "lol" AS a_string, + 'lol' AS a_string, NULL AS a_null """ From d9c131dcc6266d5022dfca7cdabaffbed468fac5 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 20 Apr 2023 15:36:16 +0200 Subject: [PATCH 07/18] Unfinished work --- lolafect/data_testing.py | 4 +- tests/test_integration/test_data_testing.py | 130 ++++++++++---------- 2 files changed, 71 insertions(+), 63 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index b1fe584..9e3e777 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -178,12 +178,14 @@ def _create_in_memory_data_context_for_trino( class_name="Datasource", execution_engine={ "class_name": "SqlAlchemyExecutionEngine", - "connection_string": f"trino://%s:%s@%s:%s/{trino_credentials['db']}/{'SCHEMA_GOES_HERE'}" + "connection_string": f"trino://%s:%s@%s:%s/%s/%s" % ( trino_credentials["user"], urlquote(trino_credentials["password"]), trino_credentials["host"], trino_credentials["port"], + trino_credentials["catalog"], + trino_credentials["schema"], ), }, data_connectors={ diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index bcd8d28..a6c564a 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -18,11 +18,13 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel # are working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") - +#1 AS a_one, +# 'lol' AS a_string, +# NULL AS a_null TEST_QUERY = """ -SELECT 1 AS a_one, - 'lol' AS a_string, - NULL AS a_null +SELECT * +from app_lm_mysql_pl.comprea.market +where id = 1 """ TEST_EXPECTATIONS_THAT_FIT_DATA = [ @@ -55,62 +57,62 @@ TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [ ), ] - -def test_validation_on_mysql_succeeds(): - ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( - s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, - ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, - remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], - remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], - ) - - validation_result = run_data_test_on_mysql.run( - name="lolafect-testing-test_validation_on_mysql_succeeds", - mysql_credentials={ - "host": ssh_tunnel.local_bind_address[0], - "port": ssh_tunnel.local_bind_address[1], - "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], - }, - query=TEST_QUERY, - expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, - ) - - closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) - - data_test_passed = validation_result["success"] == True - - assert data_test_passed - - -def test_validation_on_mysql_fails(): - ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( - s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, - ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, - remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], - remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], - ) - - validation_result = run_data_test_on_mysql.run( - name="lolafect-testing-test_validation_on_mysql_fails", - mysql_credentials={ - "host": ssh_tunnel.local_bind_address[0], - "port": ssh_tunnel.local_bind_address[1], - "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], - }, - query=TEST_QUERY, - expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, - ) - - closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) - - data_test_failed = validation_result["success"] == False - - assert data_test_failed - +# +# def test_validation_on_mysql_succeeds(): +# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( +# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, +# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, +# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], +# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], +# ) +# +# validation_result = run_data_test_on_mysql.run( +# name="lolafect-testing-test_validation_on_mysql_succeeds", +# mysql_credentials={ +# "host": ssh_tunnel.local_bind_address[0], +# "port": ssh_tunnel.local_bind_address[1], +# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], +# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], +# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], +# }, +# query=TEST_QUERY, +# expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, +# ) +# +# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) +# +# data_test_passed = validation_result["success"] == True +# +# assert data_test_passed +# +# +# def test_validation_on_mysql_fails(): +# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( +# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, +# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, +# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], +# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], +# ) +# +# validation_result = run_data_test_on_mysql.run( +# name="lolafect-testing-test_validation_on_mysql_fails", +# mysql_credentials={ +# "host": ssh_tunnel.local_bind_address[0], +# "port": ssh_tunnel.local_bind_address[1], +# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], +# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], +# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], +# }, +# query=TEST_QUERY, +# expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, +# ) +# +# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) +# +# data_test_failed = validation_result["success"] == False +# +# assert data_test_failed +# def test_validation_on_trino_succeeds(): validation_result = run_data_test_on_trino.run( @@ -120,11 +122,14 @@ def test_validation_on_trino_succeeds(): "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], + "catalog": "data_dw", + "schema": "sandbox" }, query=TEST_QUERY, expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, ) + print("###############\n" * 20) + print(validation_result) data_test_passed = validation_result["success"] == True @@ -139,7 +144,8 @@ def test_validation_on_trino_fails(): "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], + "catalog": "data_dw", + "schema": "sandbox" }, query=TEST_QUERY, expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, From 8f1f3d75e1b0772d1ef5f0b7ea5b570d21f37754 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 21 Apr 2023 12:22:32 +0200 Subject: [PATCH 08/18] Implemented the tasks. --- lolafect/utils.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/lolafect/utils.py b/lolafect/utils.py index 621e59d..608f27b 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -1,5 +1,8 @@ import json +from typing import Any +import prefect +from prefect import task class S3FileReader: """ @@ -22,3 +25,38 @@ class S3FileReader: .read() .decode("utf-8") ) + +@task() +def begin_sql_transaction(connection: Any) -> None: + """ + Start a SQL transaction in the passed connection. The task is agnostic to + the SQL engine being used. As long as it implements a begin() method, this + will work. + + :param connection: the connection to some database. + :return: None + """ + logger = prefect.context.get("logger") + logger.info(f"Starting SQL transaction with connection: {connection}.") + connection.begin() + + +@task() +def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: + """ + Finish a SQL transaction, either by rolling it back or by committing it. + + :param connection: the connection to some database. + :param dry_run: a flag indicating if persistence is desired. If dry_run + is True, changes will be rollbacked. + :return: None + """ + logger = prefect.context.get("logger") + logger.info(f"Using connection: {connection}.") + + if dry_run: + connection.rollback() + logger.info("Dry-run mode activated. Rolling back the transaction.") + else: + logger.info("Committing the transaction.") + connection.commit() From 5b53b71fd87a8315195076e65cbbb65afe503a11 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 21 Apr 2023 12:37:37 +0200 Subject: [PATCH 09/18] Testing plan --- tests/test_integration/test_utils.py | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/test_integration/test_utils.py diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py new file mode 100644 index 0000000..4b33844 --- /dev/null +++ b/tests/test_integration/test_utils.py @@ -0,0 +1,41 @@ +from lolafect.lolaconfig import build_lolaconfig + +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. + + +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + + +def test_sql_transaction_persists_changes_properly(): + # Connect + # Create table in Sandbox + # Check that table is empty + # Start transaction + # Insert value in table + # Commit transaction + # Check that value is there + # assert that the table was initially empty and afterwards it got a record + pass + + +def test_sql_transaction_rollbacks_changes_properly(): + # Connect + # Create table in Sandbox + # Check that table is empty + # Start transaction + # Insert value in table + # Commit transaction + # Check that the table is still empty + # assert that the table was initially empty and afterwards as well + pass \ No newline at end of file From 47a3e8b1e755428a44a7277b536f1f0ea316ddab Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 11:38:51 +0200 Subject: [PATCH 10/18] Test working --- tests/test_integration/test_utils.py | 134 +++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 18 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index 4b33844..ba6023a 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -1,4 +1,12 @@ from lolafect.lolaconfig import build_lolaconfig +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, + get_local_bind_address_from_ssh_tunnel, + close_ssh_tunnel, + connect_to_mysql, + close_mysql_connection, +) +from lolafect.utils import begin_sql_transaction, end_sql_transaction # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -18,24 +26,114 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") def test_sql_transaction_persists_changes_properly(): - # Connect - # Create table in Sandbox - # Check that table is empty - # Start transaction - # Insert value in table - # Commit transaction - # Check that value is there - # assert that the table was initially empty and afterwards it got a record - pass + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, + ) + + connection = connect_to_mysql.run( + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel + ), + ) + + cursor = connection.cursor() + cursor.execute(""" + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False + + begin_sql_transaction.run(connection=connection) + cursor.execute(""" + INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) + VALUES (1) + """) + end_sql_transaction.run(connection=connection, dry_run=False) + + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + assert table_is_empty_at_first and table_has_a_record_after_transaction def test_sql_transaction_rollbacks_changes_properly(): - # Connect - # Create table in Sandbox - # Check that table is empty - # Start transaction - # Insert value in table - # Commit transaction - # Check that the table is still empty - # assert that the table was initially empty and afterwards as well - pass \ No newline at end of file + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, + ) + + connection = connect_to_mysql.run( + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel + ), + ) + + cursor = connection.cursor() + cursor.execute(""" + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False + + begin_sql_transaction.run(connection=connection) + cursor.execute(""" + INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) + VALUES (1) + """) + end_sql_transaction.run(connection=connection, dry_run=True) + + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_after_rollback = not bool(cursor.fetchall()) # An tuple yields False + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + assert table_is_empty_at_first and table_is_empty_after_rollback From 2090ec7c916e08eeefdab22a3d5e33e59e772882 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:41:15 +0200 Subject: [PATCH 11/18] Refactor setup and teardown of connection and test table into a fixture. --- tests/test_integration/test_utils.py | 86 +++++++++++----------------- 1 file changed, 32 insertions(+), 54 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index ba6023a..e195626 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -1,3 +1,5 @@ +import pytest + from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, @@ -25,7 +27,8 @@ from lolafect.utils import begin_sql_transaction, end_sql_transaction TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") -def test_sql_transaction_persists_changes_properly(): +@pytest.fixture +def connection_with_test_table(): test_local_bind_host = "127.0.0.1" test_local_bind_port = 12345 @@ -44,26 +47,41 @@ def test_sql_transaction_persists_changes_properly(): tunnel=tunnel ), ) - cursor = connection.cursor() cursor.execute(""" - CREATE TABLE sandbox.lolafect_transaction_test_table - ( - a_test_column INT - ) - """) + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + + # Connection and table ready for tests + yield connection # Test happens now + # Test finished, time to remove stuff and close connection + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + +def test_sql_transaction_persists_changes_properly(connection_with_test_table): + cursor = connection_with_test_table.cursor() + cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False - begin_sql_transaction.run(connection=connection) + begin_sql_transaction.run(connection=connection_with_test_table) cursor.execute(""" INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) VALUES (1) """) - end_sql_transaction.run(connection=connection, dry_run=False) + end_sql_transaction.run(connection=connection_with_test_table, dry_run=False) cursor.execute(""" SELECT a_test_column @@ -71,69 +89,29 @@ def test_sql_transaction_persists_changes_properly(): """) table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True - cursor.execute(""" - DROP TABLE sandbox.lolafect_transaction_test_table - """ - ) - - close_mysql_connection.run(connection=connection) - close_ssh_tunnel.run(tunnel=tunnel) - assert table_is_empty_at_first and table_has_a_record_after_transaction -def test_sql_transaction_rollbacks_changes_properly(): - test_local_bind_host = "127.0.0.1" - test_local_bind_port = 12345 +def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): + cursor = connection_with_test_table.cursor() - tunnel = open_ssh_tunnel_with_s3_pkey.run( - s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, - ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, - remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], - remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], - local_bind_host=test_local_bind_host, - local_bind_port=test_local_bind_port, - ) - - connection = connect_to_mysql.run( - mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, - overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( - tunnel=tunnel - ), - ) - - cursor = connection.cursor() - cursor.execute(""" - CREATE TABLE sandbox.lolafect_transaction_test_table - ( - a_test_column INT - ) - """) cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False - begin_sql_transaction.run(connection=connection) + begin_sql_transaction.run(connection=connection_with_test_table) cursor.execute(""" INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) VALUES (1) """) - end_sql_transaction.run(connection=connection, dry_run=True) + end_sql_transaction.run(connection=connection_with_test_table, dry_run=True) cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_is_empty_after_rollback = not bool(cursor.fetchall()) # An tuple yields False - - cursor.execute(""" - DROP TABLE sandbox.lolafect_transaction_test_table - """ - ) - - close_mysql_connection.run(connection=connection) - close_ssh_tunnel.run(tunnel=tunnel) + table_is_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False assert table_is_empty_at_first and table_is_empty_after_rollback From 467d70fcdc0744343a573b2e715da8de26fb6fc4 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:44:11 +0200 Subject: [PATCH 12/18] Docstrings and small name improvements --- tests/test_integration/test_utils.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index e195626..1ce5f85 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -29,6 +29,12 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") @pytest.fixture def connection_with_test_table(): + """ + Connects to DW, creates a test table in the sandbox env, and yields the + connection to the test. + + After the test, the table is dropped and the connection is closed. + """ test_local_bind_host = "127.0.0.1" test_local_bind_port = 12345 @@ -87,9 +93,9 @@ def test_sql_transaction_persists_changes_properly(connection_with_test_table): SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True + table_has_a_record_after_commit = bool(cursor.fetchall()) # A non-empty tuple yields True - assert table_is_empty_at_first and table_has_a_record_after_transaction + assert table_is_empty_at_first and table_has_a_record_after_commit def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): @@ -112,6 +118,6 @@ def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_is_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False + table_is_still_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False - assert table_is_empty_at_first and table_is_empty_after_rollback + assert table_is_empty_at_first and table_is_still_empty_after_rollback From 9201c236af9302026aa480ef13d69a783860f316 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:48:33 +0200 Subject: [PATCH 13/18] Typo. --- lolafect/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/utils.py b/lolafect/utils.py index 608f27b..2aa82a0 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -48,7 +48,7 @@ def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: :param connection: the connection to some database. :param dry_run: a flag indicating if persistence is desired. If dry_run - is True, changes will be rollbacked. + is True, changes will be rolledback. :return: None """ logger = prefect.context.get("logger") From 4ba1b4c007e54bcf09c0a71fd9a39c5a9bded9e1 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:55:29 +0200 Subject: [PATCH 14/18] Improve docstrings --- lolafect/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lolafect/utils.py b/lolafect/utils.py index 2aa82a0..2b7e460 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -30,8 +30,8 @@ class S3FileReader: def begin_sql_transaction(connection: Any) -> None: """ Start a SQL transaction in the passed connection. The task is agnostic to - the SQL engine being used. As long as it implements a begin() method, this - will work. + the SQL engine being used. As long as the connection object implements a + begin() method, this task will work. :param connection: the connection to some database. :return: None @@ -45,10 +45,13 @@ def begin_sql_transaction(connection: Any) -> None: def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: """ Finish a SQL transaction, either by rolling it back or by committing it. + The task is agnostic to the SQL engine being used. As long as the + connection object implements a `commit` and a `rollback` method, this task + will work. :param connection: the connection to some database. :param dry_run: a flag indicating if persistence is desired. If dry_run - is True, changes will be rolledback. + is True, changes will be rolledback. Otherwise, they will be committed. :return: None """ logger = prefect.context.get("logger") From 58261b1a692f5867d322b07089398206ebb9aba6 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:56:30 +0200 Subject: [PATCH 15/18] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 525db12..4dc06d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Added tasks `begin_sql_transaction` and `end_sql_transaction` to the `utils`module. These enable the management of SQL + transactions in flows. It also allows for dry running SQL statements. + ## [0.4.0] - 2023-02-08 ### Added From 52ad64076608787b0b52c864742b19385b276025 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 14:22:45 +0200 Subject: [PATCH 16/18] Add example in readme. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index d9cbbf1..e3abcf9 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,31 @@ with Flow(...) as flow: close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed]) ``` +**Use SQL transactions and dry running** + +```python +from lolafect.connections import connect_to_mysql, close_mysql_connection +from lolafect.utils import begin_sql_transaction, end_sql_transaction + +with Flow(...) as flow: + connection = connect_to_mysql( + mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS + ) + transaction_started = begin_sql_transaction(connection) + task_result = some_task_that_needs_mysql( + connection=connection, + upstream_task=[transaction_started] + ) + transaction_finished = end_sql_transaction( + connection, + dry_run=False, # True means rollback, False means commit changes + upstream_tasks=[task_result] + ) + + close_mysql_connection(connection=connection, upstream_tasks=[transaction_finished]) + +``` + ### Use Great Expectations **Run a Great Expectations validation on a MySQL query** From 897145ac199746ac64b7d8f757f42d1c5d7364f0 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 26 Apr 2023 09:42:30 +0200 Subject: [PATCH 17/18] Bumped version --- lolafect/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/__version__.py b/lolafect/__version__.py index 6a9beea..3d18726 100644 --- a/lolafect/__version__.py +++ b/lolafect/__version__.py @@ -1 +1 @@ -__version__ = "0.4.0" +__version__ = "0.5.0" From 054de6ab3937a91d88551f176edb05d0884db8d9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 26 Apr 2023 09:43:01 +0200 Subject: [PATCH 18/18] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dc06d5..30b1c5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. -## [Unreleased] +## [0.5.0] - 2023-04-26 ### Added