More refactor

This commit is contained in:
Pablo Martin 2023-02-02 16:57:07 +01:00
parent df3d9f0c7e
commit 875a667c2b
2 changed files with 31 additions and 13 deletions

View file

@ -1,3 +1,4 @@
from typing import List
from urllib.parse import quote_plus as urlquote
import prefect
@ -8,6 +9,7 @@ from great_expectations.data_context.types.base import (
DatasourceConfig,
S3StoreBackendDefaults,
)
from great_expectations.expectations.expectation import ExpectationConfiguration
from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET
@ -17,7 +19,7 @@ def run_data_test_on_mysql(
name: str,
mysql_credentials: dict,
query: str,
expectations: dict,
expectation_configurations: List[ExpectationConfiguration],
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
) -> dict:
"""
@ -26,7 +28,7 @@ def run_data_test_on_mysql(
:param name: a unique name for the data test.
:param mysql_credentials: credentials for the MySQL instance.
:param query: the query to test against.
:param expectations: the expectations on the dataset.
:param expectation_configurations: the expectations on the dataset.
:param great_expectations_s3_bucket: the bucket where Great Expectations
files live.
:return: the result of the data test.
@ -34,15 +36,15 @@ def run_data_test_on_mysql(
logger = prefect.context.get("logger")
logger.info("Creating data context.")
data_context = create_in_memory_data_context(
data_context = _create_in_memory_data_context(
mysql_credentials, great_expectations_s3_bucket
)
logger.info("Data context created.")
logger.info("Creating expectation suite.")
data_context = create_expectation_suite(data_context, name, expectations)
data_context = _create_expectation_suite(data_context, name, expectation_configurations)
logger.info("Expectation suite created.")
logger.info("Creating checkpoint.")
data_context = create_checkpoint(data_context, mysql_credentials, query, name)
data_context = _create_checkpoint(data_context, mysql_credentials, query, name)
logger.info("Checkpoint created.")
logger.info("Running checkpoint.")
results = data_context.run_checkpoint(f"{name}_checkpoint")
@ -52,10 +54,19 @@ def run_data_test_on_mysql(
return results
def create_in_memory_data_context(
def _create_in_memory_data_context(
mysql_credentials: dict,
great_expectations_s3_bucket: str,
) -> AbstractDataContext:
"""
Create a DataContext without a YAML config file.
:param mysql_credentials: the creds to the mysql where the query will be
executed.
:param great_expectations_s3_bucket: the name of the bucket where Great
Exepctations files while be stored.
:return: the data context.
"""
data_context = BaseDataContext(
project_config=DataContextConfig(
@ -93,9 +104,17 @@ def create_in_memory_data_context(
return data_context
def create_expectation_suite(
data_context, expectation_suite_name, expectation_configurations
):
def _create_expectation_suite(
data_context: AbstractDataContext, expectation_suite_name: str, expectation_configurations: List[ExpectationConfiguration]
) -> AbstractDataContext:
"""
Create a new expectation suite in the data context with the passed expectations.
:param data_context:
:param expectation_suite_name:
:param expectation_configurations:
:return:
"""
suite = data_context.create_expectation_suite(
expectation_suite_name,
overwrite_existing=True,
@ -109,7 +128,7 @@ def create_expectation_suite(
return data_context
def create_checkpoint(
def _create_checkpoint(
data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name
):
@ -148,4 +167,3 @@ def create_checkpoint(
data_context.add_checkpoint(**checkpoint_config)
return data_context

View file

@ -59,7 +59,7 @@ def test_validation_on_mysql_succeeds():
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"]
},
query=test_query,
expectations=test_expectations,
expectation_configurations=test_expectations,
)
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
@ -107,7 +107,7 @@ def test_validation_on_mysql_fails():
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"]
},
query=test_query,
expectations=test_expectations,
expectation_configurations=test_expectations,
)
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)