Refactors, fix tests.

This commit is contained in:
Pablo Martin 2023-02-02 17:18:33 +01:00
parent 875a667c2b
commit 52fd24ae5e
2 changed files with 43 additions and 15 deletions

View file

@ -36,15 +36,22 @@ def run_data_test_on_mysql(
logger = prefect.context.get("logger")
logger.info("Creating data context.")
data_context = _create_in_memory_data_context(
data_context = _create_in_memory_data_context_for_mysql(
mysql_credentials, great_expectations_s3_bucket
)
logger.info("Data context created.")
logger.info("Creating expectation suite.")
data_context = _create_expectation_suite(data_context, name, expectation_configurations)
data_context = _create_expectation_suite(
data_context, name, expectation_configurations
)
logger.info("Expectation suite created.")
logger.info("Creating checkpoint.")
data_context = _create_checkpoint(data_context, mysql_credentials, query, name)
data_context = _create_checkpoint(
data_context,
f"{mysql_credentials['host']}:{mysql_credentials['port']}",
query,
name,
)
logger.info("Checkpoint created.")
logger.info("Running checkpoint.")
results = data_context.run_checkpoint(f"{name}_checkpoint")
@ -54,12 +61,13 @@ def run_data_test_on_mysql(
return results
def _create_in_memory_data_context(
def _create_in_memory_data_context_for_mysql(
mysql_credentials: dict,
great_expectations_s3_bucket: str,
) -> AbstractDataContext:
"""
Create a DataContext without a YAML config file.
Create a DataContext without a YAML config file and specify a MySQL
datasource.
:param mysql_credentials: the creds to the mysql where the query will be
executed.
@ -105,15 +113,20 @@ def _create_in_memory_data_context(
def _create_expectation_suite(
data_context: AbstractDataContext, expectation_suite_name: str, expectation_configurations: List[ExpectationConfiguration]
data_context: AbstractDataContext,
expectation_suite_name: str,
expectation_configurations: List[ExpectationConfiguration],
) -> AbstractDataContext:
"""
Create a new expectation suite in the data context with the passed expectations.
Create a new expectation suite in the data context with the passed
expectations.
:param data_context:
:param expectation_suite_name:
:param expectation_configurations:
:return:
:param data_context: the current data context.
:param expectation_suite_name: the name to give to the new expectation
suite.
:param expectation_configurations: the configs of the expectations to
include in the expectation suite.
:return: the same data context, now containing the new suite.
"""
suite = data_context.create_expectation_suite(
expectation_suite_name,
@ -129,8 +142,23 @@ def _create_expectation_suite(
def _create_checkpoint(
data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name
):
data_context: AbstractDataContext,
datasource_name: str,
query_for_checkpoint: str,
expectation_suite_name: str,
) -> AbstractDataContext:
"""
Create a checkpoint in the given data context that combines the query and
the expectation suite.
:param data_context: the current data context.
:param datasource_name:
:param query_for_checkpoint: the query that will provide the data to test
in the checkpoint.
:param expectation_suite_name: the name of the expectation suite to test
against the data.
:return: the same data context, now containing the new checkpoint.
"""
checkpoint_config = {
"name": f"{expectation_suite_name}_checkpoint",
@ -150,7 +178,7 @@ def _create_checkpoint(
"validations": [
{
"batch_request": {
"datasource_name": f"{mysql_credentials['host']}:{mysql_credentials['port']}",
"datasource_name": datasource_name,
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": f"{expectation_suite_name}_validation_query",
"runtime_parameters": {"query": query_for_checkpoint},
@ -163,7 +191,6 @@ def _create_checkpoint(
],
}
# The checkpoint gets persisted. Now it can be called again in a different session.
data_context.add_checkpoint(**checkpoint_config)
return data_context

View file

@ -84,6 +84,7 @@ def test_lolaconfig_fetches_dw_creds_properly():
"datadw_user": "some_user",
"datadw_pass": "some_password",
"datadw_port": "some_port",
"datadw_default_db": "some_db"
}
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file