From 8cfddbf5d9e4d7fdc9eea70a0b9c042430e40d9b Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 16:48:01 +0100 Subject: [PATCH 01/18] Drafted intended API for GE. --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index 5d4b6f4..2a6f5e2 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,28 @@ with Flow(...) as flow: close_ssh_tunnel.run(tunnel=tunnel) ``` +### Use Great Expectations + +**Run a Great Expectations validation on a MySQL query** + +```python +from lolafect.connections import connect_to_mysql +from lolafect.data_testing import run_data_test_on_mysql + +with Flow(...) as flow: + + a_mysql_connection = connect_to_mysql(...) + + my_query = """SELECT something FROM somewhere""" + my_expectations = {...} # A bunch of things you want to validate on the result of the query + + validation_results = run_validation_on_mysql( + mysql_connection=a_mysql_connection, + query=my_query, + expectations=my_expectations + ) +``` + ### Slack **Send a warning message to slack if your tasks fails** From 9825ab65bd35ce4b810de7b2f0222d87ac3abcdb Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 16:56:18 +0100 Subject: [PATCH 02/18] Connection is unnecessary, GE connects itself. --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 2a6f5e2..0a6bd5b 100644 --- a/README.md +++ b/README.md @@ -124,18 +124,16 @@ with Flow(...) as flow: **Run a Great Expectations validation on a MySQL query** ```python -from lolafect.connections import connect_to_mysql from lolafect.data_testing import run_data_test_on_mysql with Flow(...) as flow: - - a_mysql_connection = connect_to_mysql(...) - + my_query = """SELECT something FROM somewhere""" my_expectations = {...} # A bunch of things you want to validate on the result of the query validation_results = run_validation_on_mysql( - mysql_connection=a_mysql_connection, + name="my-cool-validation", + mysql_credentials={...}, query=my_query, expectations=my_expectations ) From 5ca488dba5b4eb90bc28448b4469e7795ac23139 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 17:36:38 +0100 Subject: [PATCH 03/18] Tests --- tests/test_integration/test_data_testing.py | 85 +++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 tests/test_integration/test_data_testing.py diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py new file mode 100644 index 0000000..cad50a1 --- /dev/null +++ b/tests/test_integration/test_data_testing.py @@ -0,0 +1,85 @@ +from great_expectations.core.expectation_configuration import ExpectationConfiguration + +from lolafect.lolaconfig import build_lolaconfig +from lolafect.data_testing import run_data_test_on_mysql + +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. + +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + +def test_validation_on_mysql_succeeds(): + + test_query = """ + SELECT 1 AS a_one, + "lol" AS a_string, + NULL AS a_null + """ + test_expectations = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern ": "%lol%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "a_null"}, + ), + ] + + validation_result = run_data_test_on_mysql( + name="lolafect-testing-test_validation_on_mysql_succeeds", + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + query=test_query, + expectations=test_expectations + ) + + data_test_passed = validation_result["success"] == True + + assert data_test_passed + + +def test_validation_on_mysql_fails(): + test_query = """ + SELECT 1 AS a_one, + "lol" AS a_string, + NULL AS a_null + """ + test_expectations = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 2, "max_value": 2}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern ": "%xD%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_not_be_null", + kwargs={"column": "a_null"}, + ), + ] + + validation_result = run_data_test_on_mysql( + name="lolafect-testing-test_validation_on_mysql_fails", + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + query=test_query, + expectations=test_expectations + ) + + data_test_failed = validation_result["success"] == False + + assert data_test_failed From b36c0bb26db8373b917ee2bbbc9a7663007e7c82 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 17:36:46 +0100 Subject: [PATCH 04/18] Add GE to requirements --- requirements-dev.txt | 3 ++- setup.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 2adb75e..323e289 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,4 +5,5 @@ pytest==7.2.0 httpretty==1.1.4 trino==0.321.0 sshtunnel==0.4.0 -PyMySQL==1.0.2 \ No newline at end of file +PyMySQL==1.0.2 +great_expectations==0.15.45 \ No newline at end of file diff --git a/setup.py b/setup.py index 3ac4e7f..542e071 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ setup( "boto3==1.26.40", "trino==0.321.0", "sshtunnel==0.4.0", - "PyMySQL==1.0.2" + "PyMySQL==1.0.2", + "great_expectations==0.15.45" ], ) From 3b67c649594ce6655a347602f6180736ec86d874 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 17:39:04 +0100 Subject: [PATCH 05/18] Missing run since task is outside flow. --- tests/test_integration/test_data_testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index cad50a1..cad3b1e 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -40,7 +40,7 @@ def test_validation_on_mysql_succeeds(): ), ] - validation_result = run_data_test_on_mysql( + validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_succeeds", mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, query=test_query, @@ -73,7 +73,7 @@ def test_validation_on_mysql_fails(): ), ] - validation_result = run_data_test_on_mysql( + validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_fails", mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, query=test_query, From e675056e3f1aa51d73fb28ab977f8ff98b228fa3 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 18:09:08 +0100 Subject: [PATCH 06/18] Bring over code from existing GE flows --- lolafect/data_testing.py | 139 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 lolafect/data_testing.py diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py new file mode 100644 index 0000000..7247dc4 --- /dev/null +++ b/lolafect/data_testing.py @@ -0,0 +1,139 @@ +from prefect import task + + +@task +def run_data_test_on_mysql( + name, + mysql_credentials, + query, + expectations, +): + data_context = create_in_memory_data_context(mysql_credentials) + + if expectation_configurations: + data_context = create_expectation_suite( + data_context, expectation_suite_name, expectation_configurations + ) + + data_context = create_checkpoint( + data_context, query_for_checkpoint, expectation_suite_name + ) + + results = run_checkpoint(data_context) + + return results + + +def create_in_memory_data_context(dw_connection): + + print("Preparing DataContext.") + data_context = BaseDataContext( + project_config=DataContextConfig( + datasources={ + "dw-staging": DatasourceConfig( + class_name="Datasource", + execution_engine={ + "class_name": "SqlAlchemyExecutionEngine", + "connection_string": "mysql+pymysql://%s:%s@%s:%s/staging" + % ( + dw_connection.raw_user, + urlquote(dw_connection.raw_password), + dw_connection.host, + dw_connection.port, + ), + }, + data_connectors={ + "default_runtime_data_connector_name": { + "class_name": "RuntimeDataConnector", + "batch_identifiers": ["default_identifier_name"], + }, + "default_inferred_data_connector_name": { + "class_name": "InferredAssetSqlDataConnector", + "name": "whole_table", + }, + }, + ) + }, + store_backend_defaults=S3StoreBackendDefaults( + default_bucket_name="pdo-prod-great-expectations" + ), + ) + ) + print("DataContext is ready.") + + return data_context + + +def create_expectation_suite( + data_context, expectation_suite_name, expectation_configurations +): + print("Preparing Expectation Suite.") + suite = data_context.create_expectation_suite( + expectation_suite_name, + overwrite_existing=True, + ) + + for expectation_configuration in expectation_configurations: + suite.add_expectation(expectation_configuration=expectation_configuration) + + data_context.save_expectation_suite(suite) + + print("Expectation Suite was stored.") + + return data_context + + +def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name): + + print("Preparing Checkpoint.") + + checkpoint_config = { + "name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "class_name": "Checkpoint", + "config_version": 1, + "run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "action_list": [ + { + "name": "store_validation_result", + "action": {"class_name": "StoreValidationResultAction"}, + }, + { + "name": "store_evaluation_params", + "action": {"class_name": "StoreEvaluationParametersAction"}, + }, + ], + "validations": [ + { + "batch_request": { + "datasource_name": "dw-staging", + "data_connector_name": "default_runtime_data_connector_name", + "data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query", + "runtime_parameters": {"query": query_for_checkpoint}, + "batch_identifiers": { + "default_identifier_name": "default_identifier" + }, + }, + "expectation_suite_name": expectation_suite_name, + } + ], + } + + # The checkpoint gets persisted. Now it can be called again in a different session. + data_context.add_checkpoint(**checkpoint_config) + + print("Checkpoint was stored.") + + return data_context + + +def run_checkpoint(the_data_context): + + print("Running Checkpoint.") + + results = the_data_context.run_checkpoint( + checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint" + ) + + print("Checkpoint finished.") + + return results \ No newline at end of file From 375bd771976e67d14bf63a3f3699099858918a1c Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 18:09:17 +0100 Subject: [PATCH 07/18] Formatting --- lolafect/data_testing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 7247dc4..798cde1 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -136,4 +136,4 @@ def run_checkpoint(the_data_context): print("Checkpoint finished.") - return results \ No newline at end of file + return results From 922952bf0f0e41beaa0367a7c2acf8da47080d88 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 13:13:08 +0100 Subject: [PATCH 08/18] WIP, still not working --- lolafect/data_testing.py | 62 +++++++++++---------- tests/test_integration/test_data_testing.py | 15 ++++- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 798cde1..107ceb0 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -1,4 +1,12 @@ +from urllib.parse import quote_plus as urlquote + from prefect import task +from great_expectations.data_context import BaseDataContext +from great_expectations.data_context.types.base import ( + DataContextConfig, + DatasourceConfig, + S3StoreBackendDefaults, +) @task @@ -9,37 +17,33 @@ def run_data_test_on_mysql( expectations, ): data_context = create_in_memory_data_context(mysql_credentials) - - if expectation_configurations: - data_context = create_expectation_suite( - data_context, expectation_suite_name, expectation_configurations - ) - - data_context = create_checkpoint( - data_context, query_for_checkpoint, expectation_suite_name - ) - - results = run_checkpoint(data_context) + data_context = create_expectation_suite(data_context, name, expectations) + data_context = create_checkpoint(data_context, mysql_credentials, query, name) + results = run_checkpoint(data_context, name) return results -def create_in_memory_data_context(dw_connection): +def create_in_memory_data_context( + mysql_credentials, + database_name="staging", + great_expectations_bucket_name="pdo-prod-great-expectations", +): print("Preparing DataContext.") data_context = BaseDataContext( project_config=DataContextConfig( datasources={ - "dw-staging": DatasourceConfig( + "HARCODED": DatasourceConfig(#f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( class_name="Datasource", execution_engine={ "class_name": "SqlAlchemyExecutionEngine", - "connection_string": "mysql+pymysql://%s:%s@%s:%s/staging" + "connection_string": f"mysql+pymysql://%s:%s@%s:%s/{database_name}" % ( - dw_connection.raw_user, - urlquote(dw_connection.raw_password), - dw_connection.host, - dw_connection.port, + mysql_credentials["user"], + urlquote(mysql_credentials["password"]), + mysql_credentials["host"], + mysql_credentials["port"], ), }, data_connectors={ @@ -55,7 +59,7 @@ def create_in_memory_data_context(dw_connection): ) }, store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name="pdo-prod-great-expectations" + default_bucket_name=great_expectations_bucket_name ), ) ) @@ -83,15 +87,17 @@ def create_expectation_suite( return data_context -def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name): +def create_checkpoint( + data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name +): print("Preparing Checkpoint.") checkpoint_config = { - "name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "name": f"{expectation_suite_name}_checkpoint", "class_name": "Checkpoint", "config_version": 1, - "run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "run_name_template": f"{expectation_suite_name}_checkpoint", "action_list": [ { "name": "store_validation_result", @@ -105,9 +111,9 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name "validations": [ { "batch_request": { - "datasource_name": "dw-staging", + "datasource_name": "HARCODED", "data_connector_name": "default_runtime_data_connector_name", - "data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query", + "data_asset_name": f"{expectation_suite_name}_validation_query", "runtime_parameters": {"query": query_for_checkpoint}, "batch_identifiers": { "default_identifier_name": "default_identifier" @@ -126,13 +132,13 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name return data_context -def run_checkpoint(the_data_context): +def run_checkpoint(the_data_context, name): print("Running Checkpoint.") - results = the_data_context.run_checkpoint( - checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint" - ) + print(the_data_context) + + results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint") print("Checkpoint finished.") diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index cad3b1e..1c8dc7b 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -2,6 +2,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfigu from lolafect.lolaconfig import build_lolaconfig from lolafect.data_testing import run_data_test_on_mysql +from lolafect.connections import open_ssh_tunnel_with_s3_pkey # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -40,9 +41,21 @@ def test_validation_on_mysql_succeeds(): ), ] + ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + ) + validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_succeeds", - mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + mysql_credentials={ + "host": ssh_tunnel.local_bind_address[0], + "port": ssh_tunnel.local_bind_address[1], + "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"] + }, query=test_query, expectations=test_expectations ) From 3dfac07cb58897dbd4abf3fd6d48e27fc818d401 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 14:49:25 +0100 Subject: [PATCH 09/18] Tests passing --- tests/test_integration/test_data_testing.py | 27 +++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 1c8dc7b..3e8a6f3 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -2,7 +2,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfigu from lolafect.lolaconfig import build_lolaconfig from lolafect.data_testing import run_data_test_on_mysql -from lolafect.connections import open_ssh_tunnel_with_s3_pkey +from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -33,7 +33,7 @@ def test_validation_on_mysql_succeeds(): ), ExpectationConfiguration( expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern ": "%lol%"}, + kwargs={"column": "a_string", "like_pattern": "%lol%"}, ), ExpectationConfiguration( expectation_type="expect_column_values_to_be_null", @@ -60,6 +60,8 @@ def test_validation_on_mysql_succeeds(): expectations=test_expectations ) + closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) + data_test_passed = validation_result["success"] == True assert data_test_passed @@ -70,7 +72,7 @@ def test_validation_on_mysql_fails(): SELECT 1 AS a_one, "lol" AS a_string, NULL AS a_null - """ + """ test_expectations = [ ExpectationConfiguration( expectation_type="expect_column_values_to_be_between", @@ -78,7 +80,7 @@ def test_validation_on_mysql_fails(): ), ExpectationConfiguration( expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern ": "%xD%"}, + kwargs={"column": "a_string", "like_pattern": "%xD%"}, ), ExpectationConfiguration( expectation_type="expect_column_values_to_not_be_null", @@ -86,13 +88,28 @@ def test_validation_on_mysql_fails(): ), ] + ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + ) + + validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_fails", - mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + mysql_credentials={ + "host": ssh_tunnel.local_bind_address[0], + "port": ssh_tunnel.local_bind_address[1], + "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"] + }, query=test_query, expectations=test_expectations ) + closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) + data_test_failed = validation_result["success"] == False assert data_test_failed From 2f62b34543914540a46b338bacb8d9865647c065 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 15:05:53 +0100 Subject: [PATCH 10/18] DB now comes from config --- lolafect/data_testing.py | 8 ++++---- lolafect/lolaconfig.py | 1 + tests/test_integration/test_data_testing.py | 12 +++++++----- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 107ceb0..4a2e1d5 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -25,8 +25,8 @@ def run_data_test_on_mysql( def create_in_memory_data_context( - mysql_credentials, - database_name="staging", + mysql_credentials: dict, + database_name="information_schema", great_expectations_bucket_name="pdo-prod-great-expectations", ): @@ -34,11 +34,11 @@ def create_in_memory_data_context( data_context = BaseDataContext( project_config=DataContextConfig( datasources={ - "HARCODED": DatasourceConfig(#f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( + "HARCODED": DatasourceConfig( # f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( class_name="Datasource", execution_engine={ "class_name": "SqlAlchemyExecutionEngine", - "connection_string": f"mysql+pymysql://%s:%s@%s:%s/{database_name}" + "connection_string": f"mysql+pymysql://%s:%s@%s:%s/{mysql_credentials['db']}" % ( mysql_credentials["user"], urlquote(mysql_credentials["password"]), diff --git a/lolafect/lolaconfig.py b/lolafect/lolaconfig.py index 3cb05fe..36fef09 100644 --- a/lolafect/lolaconfig.py +++ b/lolafect/lolaconfig.py @@ -157,6 +157,7 @@ class LolaConfig: "user": self.ENV_DATA["datadw_user"], "password": self.ENV_DATA["datadw_pass"], "port": self.ENV_DATA["datadw_port"], + "default_db": self.ENV_DATA["datadw_default_db"] } @_needs_env_data diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 3e8a6f3..b8219d0 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -19,6 +19,7 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + def test_validation_on_mysql_succeeds(): test_query = """ @@ -54,10 +55,11 @@ def test_validation_on_mysql_succeeds(): "host": ssh_tunnel.local_bind_address[0], "port": ssh_tunnel.local_bind_address[1], "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"] + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] }, query=test_query, - expectations=test_expectations + expectations=test_expectations, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) @@ -95,17 +97,17 @@ def test_validation_on_mysql_fails(): remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], ) - validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_fails", mysql_credentials={ "host": ssh_tunnel.local_bind_address[0], "port": ssh_tunnel.local_bind_address[1], "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"] + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] }, query=test_query, - expectations=test_expectations + expectations=test_expectations, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) From a7b1f55ff19f95e6eb9c7313b2946ad04e0fdefa Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 16:39:22 +0100 Subject: [PATCH 11/18] Refactors --- lolafect/data_testing.py | 72 ++++++++++++++++++++++------------------ lolafect/defaults.py | 1 + 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 4a2e1d5..4e4f45e 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -1,36 +1,62 @@ from urllib.parse import quote_plus as urlquote +import prefect from prefect import task -from great_expectations.data_context import BaseDataContext +from great_expectations.data_context import BaseDataContext, AbstractDataContext from great_expectations.data_context.types.base import ( DataContextConfig, DatasourceConfig, S3StoreBackendDefaults, ) +from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET + @task def run_data_test_on_mysql( - name, - mysql_credentials, - query, - expectations, -): - data_context = create_in_memory_data_context(mysql_credentials) + name: str, + mysql_credentials: dict, + query: str, + expectations: dict, + great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, +) -> dict: + """ + Validate a query and an expectation suite against a given MySQL server. + + :param name: a name for the data test. + :param mysql_credentials: credentials for the MySQL instance. + :param query: the query to test against. + :param expectations: the expectations on the dataset. + :param great_expectations_s3_bucket: the bucket where Great Expectations + files live. + :return: the result of the data test. + """ + logger = prefect.context.get("logger") + + logger.info("Creating data context.") + data_context = create_in_memory_data_context( + mysql_credentials, great_expectations_s3_bucket + ) + logger.info("Data context created.") + logger.info("Creating expectation suite.") data_context = create_expectation_suite(data_context, name, expectations) + logger.info("Expectation suite created.") + logger.info("Creating checkpoint.") data_context = create_checkpoint(data_context, mysql_credentials, query, name) - results = run_checkpoint(data_context, name) + logger.info("Checkpoint created.") + logger.info("Running checkpoint.") + results = data_context.run_checkpoint(f"{name}_checkpoint") + logger.info("Checkpoint finished.") + logger.info(f"Validation result: {results['success']}") return results def create_in_memory_data_context( mysql_credentials: dict, - database_name="information_schema", - great_expectations_bucket_name="pdo-prod-great-expectations", -): + great_expectations_s3_bucket: str, +) -> AbstractDataContext: - print("Preparing DataContext.") data_context = BaseDataContext( project_config=DataContextConfig( datasources={ @@ -59,11 +85,10 @@ def create_in_memory_data_context( ) }, store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name=great_expectations_bucket_name + default_bucket_name=great_expectations_s3_bucket ), ) ) - print("DataContext is ready.") return data_context @@ -71,7 +96,6 @@ def create_in_memory_data_context( def create_expectation_suite( data_context, expectation_suite_name, expectation_configurations ): - print("Preparing Expectation Suite.") suite = data_context.create_expectation_suite( expectation_suite_name, overwrite_existing=True, @@ -82,8 +106,6 @@ def create_expectation_suite( data_context.save_expectation_suite(suite) - print("Expectation Suite was stored.") - return data_context @@ -91,8 +113,6 @@ def create_checkpoint( data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name ): - print("Preparing Checkpoint.") - checkpoint_config = { "name": f"{expectation_suite_name}_checkpoint", "class_name": "Checkpoint", @@ -127,19 +147,5 @@ def create_checkpoint( # The checkpoint gets persisted. Now it can be called again in a different session. data_context.add_checkpoint(**checkpoint_config) - print("Checkpoint was stored.") - return data_context - -def run_checkpoint(the_data_context, name): - - print("Running Checkpoint.") - - print(the_data_context) - - results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint") - - print("Checkpoint finished.") - - return results diff --git a/lolafect/defaults.py b/lolafect/defaults.py index 1d4af84..439a075 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -5,3 +5,4 @@ DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo- DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" DEFAULT_TRINO_HTTP_SCHEME = "https" +DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET = "pdo-prod-great-expectations" From df3d9f0c7e8162fe4994471c1156ee8a775b8a30 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 16:42:05 +0100 Subject: [PATCH 12/18] Remove hardcoding --- lolafect/data_testing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 4e4f45e..1d0dc11 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -23,7 +23,7 @@ def run_data_test_on_mysql( """ Validate a query and an expectation suite against a given MySQL server. - :param name: a name for the data test. + :param name: a unique name for the data test. :param mysql_credentials: credentials for the MySQL instance. :param query: the query to test against. :param expectations: the expectations on the dataset. @@ -60,7 +60,7 @@ def create_in_memory_data_context( data_context = BaseDataContext( project_config=DataContextConfig( datasources={ - "HARCODED": DatasourceConfig( # f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( + f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( class_name="Datasource", execution_engine={ "class_name": "SqlAlchemyExecutionEngine", @@ -131,7 +131,7 @@ def create_checkpoint( "validations": [ { "batch_request": { - "datasource_name": "HARCODED", + "datasource_name": f"{mysql_credentials['host']}:{mysql_credentials['port']}", "data_connector_name": "default_runtime_data_connector_name", "data_asset_name": f"{expectation_suite_name}_validation_query", "runtime_parameters": {"query": query_for_checkpoint}, From 875a667c2bd18ececd203e1cbb726238685a0e41 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 16:57:07 +0100 Subject: [PATCH 13/18] More refactor --- lolafect/data_testing.py | 40 +++++++++++++++------ tests/test_integration/test_data_testing.py | 4 +-- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 1d0dc11..dda959a 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -1,3 +1,4 @@ +from typing import List from urllib.parse import quote_plus as urlquote import prefect @@ -8,6 +9,7 @@ from great_expectations.data_context.types.base import ( DatasourceConfig, S3StoreBackendDefaults, ) +from great_expectations.expectations.expectation import ExpectationConfiguration from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET @@ -17,7 +19,7 @@ def run_data_test_on_mysql( name: str, mysql_credentials: dict, query: str, - expectations: dict, + expectation_configurations: List[ExpectationConfiguration], great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, ) -> dict: """ @@ -26,7 +28,7 @@ def run_data_test_on_mysql( :param name: a unique name for the data test. :param mysql_credentials: credentials for the MySQL instance. :param query: the query to test against. - :param expectations: the expectations on the dataset. + :param expectation_configurations: the expectations on the dataset. :param great_expectations_s3_bucket: the bucket where Great Expectations files live. :return: the result of the data test. @@ -34,15 +36,15 @@ def run_data_test_on_mysql( logger = prefect.context.get("logger") logger.info("Creating data context.") - data_context = create_in_memory_data_context( + data_context = _create_in_memory_data_context( mysql_credentials, great_expectations_s3_bucket ) logger.info("Data context created.") logger.info("Creating expectation suite.") - data_context = create_expectation_suite(data_context, name, expectations) + data_context = _create_expectation_suite(data_context, name, expectation_configurations) logger.info("Expectation suite created.") logger.info("Creating checkpoint.") - data_context = create_checkpoint(data_context, mysql_credentials, query, name) + data_context = _create_checkpoint(data_context, mysql_credentials, query, name) logger.info("Checkpoint created.") logger.info("Running checkpoint.") results = data_context.run_checkpoint(f"{name}_checkpoint") @@ -52,10 +54,19 @@ def run_data_test_on_mysql( return results -def create_in_memory_data_context( +def _create_in_memory_data_context( mysql_credentials: dict, great_expectations_s3_bucket: str, ) -> AbstractDataContext: + """ + Create a DataContext without a YAML config file. + + :param mysql_credentials: the creds to the mysql where the query will be + executed. + :param great_expectations_s3_bucket: the name of the bucket where Great + Exepctations files while be stored. + :return: the data context. + """ data_context = BaseDataContext( project_config=DataContextConfig( @@ -93,9 +104,17 @@ def create_in_memory_data_context( return data_context -def create_expectation_suite( - data_context, expectation_suite_name, expectation_configurations -): +def _create_expectation_suite( + data_context: AbstractDataContext, expectation_suite_name: str, expectation_configurations: List[ExpectationConfiguration] +) -> AbstractDataContext: + """ + Create a new expectation suite in the data context with the passed expectations. + + :param data_context: + :param expectation_suite_name: + :param expectation_configurations: + :return: + """ suite = data_context.create_expectation_suite( expectation_suite_name, overwrite_existing=True, @@ -109,7 +128,7 @@ def create_expectation_suite( return data_context -def create_checkpoint( +def _create_checkpoint( data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name ): @@ -148,4 +167,3 @@ def create_checkpoint( data_context.add_checkpoint(**checkpoint_config) return data_context - diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index b8219d0..0556735 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -59,7 +59,7 @@ def test_validation_on_mysql_succeeds(): "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] }, query=test_query, - expectations=test_expectations, + expectation_configurations=test_expectations, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) @@ -107,7 +107,7 @@ def test_validation_on_mysql_fails(): "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] }, query=test_query, - expectations=test_expectations, + expectation_configurations=test_expectations, ) closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) From 52fd24ae5efb10f15116fd059d6862915afd8f1c Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:18:33 +0100 Subject: [PATCH 14/18] Refactors, fix tests. --- lolafect/data_testing.py | 57 ++++++++++++++++++++++-------- tests/test_unit/test_lolaconfig.py | 1 + 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index dda959a..4216d4b 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -36,15 +36,22 @@ def run_data_test_on_mysql( logger = prefect.context.get("logger") logger.info("Creating data context.") - data_context = _create_in_memory_data_context( + data_context = _create_in_memory_data_context_for_mysql( mysql_credentials, great_expectations_s3_bucket ) logger.info("Data context created.") logger.info("Creating expectation suite.") - data_context = _create_expectation_suite(data_context, name, expectation_configurations) + data_context = _create_expectation_suite( + data_context, name, expectation_configurations + ) logger.info("Expectation suite created.") logger.info("Creating checkpoint.") - data_context = _create_checkpoint(data_context, mysql_credentials, query, name) + data_context = _create_checkpoint( + data_context, + f"{mysql_credentials['host']}:{mysql_credentials['port']}", + query, + name, + ) logger.info("Checkpoint created.") logger.info("Running checkpoint.") results = data_context.run_checkpoint(f"{name}_checkpoint") @@ -54,12 +61,13 @@ def run_data_test_on_mysql( return results -def _create_in_memory_data_context( +def _create_in_memory_data_context_for_mysql( mysql_credentials: dict, great_expectations_s3_bucket: str, ) -> AbstractDataContext: """ - Create a DataContext without a YAML config file. + Create a DataContext without a YAML config file and specify a MySQL + datasource. :param mysql_credentials: the creds to the mysql where the query will be executed. @@ -105,15 +113,20 @@ def _create_in_memory_data_context( def _create_expectation_suite( - data_context: AbstractDataContext, expectation_suite_name: str, expectation_configurations: List[ExpectationConfiguration] + data_context: AbstractDataContext, + expectation_suite_name: str, + expectation_configurations: List[ExpectationConfiguration], ) -> AbstractDataContext: """ - Create a new expectation suite in the data context with the passed expectations. + Create a new expectation suite in the data context with the passed + expectations. - :param data_context: - :param expectation_suite_name: - :param expectation_configurations: - :return: + :param data_context: the current data context. + :param expectation_suite_name: the name to give to the new expectation + suite. + :param expectation_configurations: the configs of the expectations to + include in the expectation suite. + :return: the same data context, now containing the new suite. """ suite = data_context.create_expectation_suite( expectation_suite_name, @@ -129,8 +142,23 @@ def _create_expectation_suite( def _create_checkpoint( - data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name -): + data_context: AbstractDataContext, + datasource_name: str, + query_for_checkpoint: str, + expectation_suite_name: str, +) -> AbstractDataContext: + """ + Create a checkpoint in the given data context that combines the query and + the expectation suite. + + :param data_context: the current data context. + :param datasource_name: + :param query_for_checkpoint: the query that will provide the data to test + in the checkpoint. + :param expectation_suite_name: the name of the expectation suite to test + against the data. + :return: the same data context, now containing the new checkpoint. + """ checkpoint_config = { "name": f"{expectation_suite_name}_checkpoint", @@ -150,7 +178,7 @@ def _create_checkpoint( "validations": [ { "batch_request": { - "datasource_name": f"{mysql_credentials['host']}:{mysql_credentials['port']}", + "datasource_name": datasource_name, "data_connector_name": "default_runtime_data_connector_name", "data_asset_name": f"{expectation_suite_name}_validation_query", "runtime_parameters": {"query": query_for_checkpoint}, @@ -163,7 +191,6 @@ def _create_checkpoint( ], } - # The checkpoint gets persisted. Now it can be called again in a different session. data_context.add_checkpoint(**checkpoint_config) return data_context diff --git a/tests/test_unit/test_lolaconfig.py b/tests/test_unit/test_lolaconfig.py index 64b6094..991eb87 100644 --- a/tests/test_unit/test_lolaconfig.py +++ b/tests/test_unit/test_lolaconfig.py @@ -84,6 +84,7 @@ def test_lolaconfig_fetches_dw_creds_properly(): "datadw_user": "some_user", "datadw_pass": "some_password", "datadw_port": "some_port", + "datadw_default_db": "some_db" } fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file From ca1d3471b53788aedffc0e50f7572ca273b1b573 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:20:46 +0100 Subject: [PATCH 15/18] Include SQLAlchemy in dependencies. --- requirements-dev.txt | 3 ++- setup.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 323e289..a873f24 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,4 +6,5 @@ httpretty==1.1.4 trino==0.321.0 sshtunnel==0.4.0 PyMySQL==1.0.2 -great_expectations==0.15.45 \ No newline at end of file +great_expectations==0.15.45 +SQLAlchemy==1.4.46 \ No newline at end of file diff --git a/setup.py b/setup.py index 542e071..a0f4dbd 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ setup( "trino==0.321.0", "sshtunnel==0.4.0", "PyMySQL==1.0.2", - "great_expectations==0.15.45" + "great_expectations==0.15.45", + "SQLAlchemy==1.4.46", ], ) From 77497f70b3c94b674a9b2af7df30fc4457d7b4ff Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:21:06 +0100 Subject: [PATCH 16/18] Formatting. --- tests/test_integration/test_data_testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index 0556735..8b456f2 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -56,7 +56,7 @@ def test_validation_on_mysql_succeeds(): "port": ssh_tunnel.local_bind_address[1], "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], }, query=test_query, expectation_configurations=test_expectations, @@ -104,7 +104,7 @@ def test_validation_on_mysql_fails(): "port": ssh_tunnel.local_bind_address[1], "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], - "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"] + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], }, query=test_query, expectation_configurations=test_expectations, From a0e7983a17a69969c7fb809179ea658a46722a6c Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:23:15 +0100 Subject: [PATCH 17/18] Improve readme --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0a6bd5b..0486671 100644 --- a/README.md +++ b/README.md @@ -131,12 +131,15 @@ with Flow(...) as flow: my_query = """SELECT something FROM somewhere""" my_expectations = {...} # A bunch of things you want to validate on the result of the query - validation_results = run_validation_on_mysql( + validation_results = run_data_test_on_mysql( name="my-cool-validation", mysql_credentials={...}, query=my_query, expectations=my_expectations ) + + if not validation_results["success"]: + print("The data is bad!!!") ``` ### Slack From 5fdcbf9bc15ec396faa258499a81074269e71d99 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:26:38 +0100 Subject: [PATCH 18/18] Update changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 008da48..f17e4f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Started the `data_testing` module with the first task to run data tests on + MySQL: `run_data_test_on_mysql`. +- The `DW_CREDENTIALS` attribute in `LOLACONFIG` now also includes a `default_db` key. + + ## [0.3.0] - 2023-01-27 ### Added