From e675056e3f1aa51d73fb28ab977f8ff98b228fa3 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 26 Jan 2023 18:09:08 +0100 Subject: [PATCH] Bring over code from existing GE flows --- lolafect/data_testing.py | 139 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 lolafect/data_testing.py diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py new file mode 100644 index 0000000..7247dc4 --- /dev/null +++ b/lolafect/data_testing.py @@ -0,0 +1,139 @@ +from prefect import task + + +@task +def run_data_test_on_mysql( + name, + mysql_credentials, + query, + expectations, +): + data_context = create_in_memory_data_context(mysql_credentials) + + if expectation_configurations: + data_context = create_expectation_suite( + data_context, expectation_suite_name, expectation_configurations + ) + + data_context = create_checkpoint( + data_context, query_for_checkpoint, expectation_suite_name + ) + + results = run_checkpoint(data_context) + + return results + + +def create_in_memory_data_context(dw_connection): + + print("Preparing DataContext.") + data_context = BaseDataContext( + project_config=DataContextConfig( + datasources={ + "dw-staging": DatasourceConfig( + class_name="Datasource", + execution_engine={ + "class_name": "SqlAlchemyExecutionEngine", + "connection_string": "mysql+pymysql://%s:%s@%s:%s/staging" + % ( + dw_connection.raw_user, + urlquote(dw_connection.raw_password), + dw_connection.host, + dw_connection.port, + ), + }, + data_connectors={ + "default_runtime_data_connector_name": { + "class_name": "RuntimeDataConnector", + "batch_identifiers": ["default_identifier_name"], + }, + "default_inferred_data_connector_name": { + "class_name": "InferredAssetSqlDataConnector", + "name": "whole_table", + }, + }, + ) + }, + store_backend_defaults=S3StoreBackendDefaults( + default_bucket_name="pdo-prod-great-expectations" + ), + ) + ) + print("DataContext is ready.") + + return data_context + + +def create_expectation_suite( + data_context, expectation_suite_name, expectation_configurations +): + print("Preparing Expectation Suite.") + suite = data_context.create_expectation_suite( + expectation_suite_name, + overwrite_existing=True, + ) + + for expectation_configuration in expectation_configurations: + suite.add_expectation(expectation_configuration=expectation_configuration) + + data_context.save_expectation_suite(suite) + + print("Expectation Suite was stored.") + + return data_context + + +def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name): + + print("Preparing Checkpoint.") + + checkpoint_config = { + "name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "class_name": "Checkpoint", + "config_version": 1, + "run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", + "action_list": [ + { + "name": "store_validation_result", + "action": {"class_name": "StoreValidationResultAction"}, + }, + { + "name": "store_evaluation_params", + "action": {"class_name": "StoreEvaluationParametersAction"}, + }, + ], + "validations": [ + { + "batch_request": { + "datasource_name": "dw-staging", + "data_connector_name": "default_runtime_data_connector_name", + "data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query", + "runtime_parameters": {"query": query_for_checkpoint}, + "batch_identifiers": { + "default_identifier_name": "default_identifier" + }, + }, + "expectation_suite_name": expectation_suite_name, + } + ], + } + + # The checkpoint gets persisted. Now it can be called again in a different session. + data_context.add_checkpoint(**checkpoint_config) + + print("Checkpoint was stored.") + + return data_context + + +def run_checkpoint(the_data_context): + + print("Running Checkpoint.") + + results = the_data_context.run_checkpoint( + checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint" + ) + + print("Checkpoint finished.") + + return results \ No newline at end of file