From 922952bf0f0e41beaa0367a7c2acf8da47080d88 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 13:13:08 +0100 Subject: [PATCH] WIP, still not working --- lolafect/data_testing.py | 62 +++++++++++---------- tests/test_integration/test_data_testing.py | 15 ++++- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 798cde1..107ceb0 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -1,4 +1,12 @@ +from urllib.parse import quote_plus as urlquote + from prefect import task +from great_expectations.data_context import BaseDataContext +from great_expectations.data_context.types.base import ( + DataContextConfig, + DatasourceConfig, + S3StoreBackendDefaults, +) @task @@ -9,37 +17,33 @@ def run_data_test_on_mysql( expectations, ): data_context = create_in_memory_data_context(mysql_credentials) - - if expectation_configurations: - data_context = create_expectation_suite( - data_context, expectation_suite_name, expectation_configurations - ) - - data_context = create_checkpoint( - data_context, query_for_checkpoint, expectation_suite_name - ) - - results = run_checkpoint(data_context) + data_context = create_expectation_suite(data_context, name, expectations) + data_context = create_checkpoint(data_context, mysql_credentials, query, name) + results = run_checkpoint(data_context, name) return results -def create_in_memory_data_context(dw_connection): +def create_in_memory_data_context( + mysql_credentials, + database_name="staging", + great_expectations_bucket_name="pdo-prod-great-expectations", +): print("Preparing DataContext.") data_context = BaseDataContext( project_config=DataContextConfig( datasources={ - "dw-staging": DatasourceConfig( + "HARCODED": DatasourceConfig(#f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig( class_name="Datasource", execution_engine={ "class_name": "SqlAlchemyExecutionEngine", - "connection_string": "mysql+pymysql://%s:%s@%s:%s/staging" + "connection_string": f"mysql+pymysql://%s:%s@%s:%s/{database_name}" % ( - dw_connection.raw_user, - urlquote(dw_connection.raw_password), - dw_connection.host, - dw_connection.port, + mysql_credentials["user"], + urlquote(mysql_credentials["password"]), + mysql_credentials["host"], + mysql_credentials["port"], ), }, data_connectors={ @@ -55,7 +59,7 @@ def create_in_memory_data_context(dw_connection): ) }, store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name="pdo-prod-great-expectations" + default_bucket_name=great_expectations_bucket_name ), ) ) @@ -83,15 +87,17 @@ def create_expectation_suite( return data_context -def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name): +def create_checkpoint( + data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name +): print("Preparing Checkpoint.") checkpoint_config = { - "name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "name": f"{expectation_suite_name}_checkpoint", "class_name": "Checkpoint", "config_version": 1, - "run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "run_name_template": f"{expectation_suite_name}_checkpoint", "action_list": [ { "name": "store_validation_result", @@ -105,9 +111,9 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name "validations": [ { "batch_request": { - "datasource_name": "dw-staging", + "datasource_name": "HARCODED", "data_connector_name": "default_runtime_data_connector_name", - "data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query", + "data_asset_name": f"{expectation_suite_name}_validation_query", "runtime_parameters": {"query": query_for_checkpoint}, "batch_identifiers": { "default_identifier_name": "default_identifier" @@ -126,13 +132,13 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name return data_context -def run_checkpoint(the_data_context): +def run_checkpoint(the_data_context, name): print("Running Checkpoint.") - results = the_data_context.run_checkpoint( - checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint" - ) + print(the_data_context) + + results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint") print("Checkpoint finished.") diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index cad3b1e..1c8dc7b 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -2,6 +2,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfigu from lolafect.lolaconfig import build_lolaconfig from lolafect.data_testing import run_data_test_on_mysql +from lolafect.connections import open_ssh_tunnel_with_s3_pkey # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -40,9 +41,21 @@ def test_validation_on_mysql_succeeds(): ), ] + ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + ) + validation_result = run_data_test_on_mysql.run( name="lolafect-testing-test_validation_on_mysql_succeeds", - mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + mysql_credentials={ + "host": ssh_tunnel.local_bind_address[0], + "port": ssh_tunnel.local_bind_address[1], + "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"] + }, query=test_query, expectations=test_expectations )