From 52fd24ae5efb10f15116fd059d6862915afd8f1c Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 2 Feb 2023 17:18:33 +0100 Subject: [PATCH] Refactors, fix tests. --- lolafect/data_testing.py | 57 ++++++++++++++++++++++-------- tests/test_unit/test_lolaconfig.py | 1 + 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index dda959a..4216d4b 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -36,15 +36,22 @@ def run_data_test_on_mysql( logger = prefect.context.get("logger") logger.info("Creating data context.") - data_context = _create_in_memory_data_context( + data_context = _create_in_memory_data_context_for_mysql( mysql_credentials, great_expectations_s3_bucket ) logger.info("Data context created.") logger.info("Creating expectation suite.") - data_context = _create_expectation_suite(data_context, name, expectation_configurations) + data_context = _create_expectation_suite( + data_context, name, expectation_configurations + ) logger.info("Expectation suite created.") logger.info("Creating checkpoint.") - data_context = _create_checkpoint(data_context, mysql_credentials, query, name) + data_context = _create_checkpoint( + data_context, + f"{mysql_credentials['host']}:{mysql_credentials['port']}", + query, + name, + ) logger.info("Checkpoint created.") logger.info("Running checkpoint.") results = data_context.run_checkpoint(f"{name}_checkpoint") @@ -54,12 +61,13 @@ def run_data_test_on_mysql( return results -def _create_in_memory_data_context( +def _create_in_memory_data_context_for_mysql( mysql_credentials: dict, great_expectations_s3_bucket: str, ) -> AbstractDataContext: """ - Create a DataContext without a YAML config file. + Create a DataContext without a YAML config file and specify a MySQL + datasource. :param mysql_credentials: the creds to the mysql where the query will be executed. @@ -105,15 +113,20 @@ def _create_in_memory_data_context( def _create_expectation_suite( - data_context: AbstractDataContext, expectation_suite_name: str, expectation_configurations: List[ExpectationConfiguration] + data_context: AbstractDataContext, + expectation_suite_name: str, + expectation_configurations: List[ExpectationConfiguration], ) -> AbstractDataContext: """ - Create a new expectation suite in the data context with the passed expectations. + Create a new expectation suite in the data context with the passed + expectations. - :param data_context: - :param expectation_suite_name: - :param expectation_configurations: - :return: + :param data_context: the current data context. + :param expectation_suite_name: the name to give to the new expectation + suite. + :param expectation_configurations: the configs of the expectations to + include in the expectation suite. + :return: the same data context, now containing the new suite. """ suite = data_context.create_expectation_suite( expectation_suite_name, @@ -129,8 +142,23 @@ def _create_expectation_suite( def _create_checkpoint( - data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name -): + data_context: AbstractDataContext, + datasource_name: str, + query_for_checkpoint: str, + expectation_suite_name: str, +) -> AbstractDataContext: + """ + Create a checkpoint in the given data context that combines the query and + the expectation suite. + + :param data_context: the current data context. + :param datasource_name: + :param query_for_checkpoint: the query that will provide the data to test + in the checkpoint. + :param expectation_suite_name: the name of the expectation suite to test + against the data. + :return: the same data context, now containing the new checkpoint. + """ checkpoint_config = { "name": f"{expectation_suite_name}_checkpoint", @@ -150,7 +178,7 @@ def _create_checkpoint( "validations": [ { "batch_request": { - "datasource_name": f"{mysql_credentials['host']}:{mysql_credentials['port']}", + "datasource_name": datasource_name, "data_connector_name": "default_runtime_data_connector_name", "data_asset_name": f"{expectation_suite_name}_validation_query", "runtime_parameters": {"query": query_for_checkpoint}, @@ -163,7 +191,6 @@ def _create_checkpoint( ], } - # The checkpoint gets persisted. Now it can be called again in a different session. data_context.add_checkpoint(**checkpoint_config) return data_context diff --git a/tests/test_unit/test_lolaconfig.py b/tests/test_unit/test_lolaconfig.py index 64b6094..991eb87 100644 --- a/tests/test_unit/test_lolaconfig.py +++ b/tests/test_unit/test_lolaconfig.py @@ -84,6 +84,7 @@ def test_lolaconfig_fetches_dw_creds_properly(): "datadw_user": "some_user", "datadw_pass": "some_password", "datadw_port": "some_port", + "datadw_default_db": "some_db" } fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file