diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 4a2e1d5..4e4f45e 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -1,36 +1,62 @@ from urllib.parse import quote_plus as urlquote +import prefect from prefect import task -from great_expectations.data_context import BaseDataContext +from great_expectations.data_context import BaseDataContext, AbstractDataContext from great_expectations.data_context.types.base import ( DataContextConfig, DatasourceConfig, S3StoreBackendDefaults, ) +from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET + @task def run_data_test_on_mysql( - name, - mysql_credentials, - query, - expectations, -): - data_context = create_in_memory_data_context(mysql_credentials) + name: str, + mysql_credentials: dict, + query: str, + expectations: dict, + great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, +) -> dict: + """ + Validate a query and an expectation suite against a given MySQL server. + + :param name: a name for the data test. + :param mysql_credentials: credentials for the MySQL instance. + :param query: the query to test against. + :param expectations: the expectations on the dataset. + :param great_expectations_s3_bucket: the bucket where Great Expectations + files live. + :return: the result of the data test. + """ + logger = prefect.context.get("logger") + + logger.info("Creating data context.") + data_context = create_in_memory_data_context( + mysql_credentials, great_expectations_s3_bucket + ) + logger.info("Data context created.") + logger.info("Creating expectation suite.") data_context = create_expectation_suite(data_context, name, expectations) + logger.info("Expectation suite created.") + logger.info("Creating checkpoint.") data_context = create_checkpoint(data_context, mysql_credentials, query, name) - results = run_checkpoint(data_context, name) + logger.info("Checkpoint created.") + logger.info("Running checkpoint.") + results = data_context.run_checkpoint(f"{name}_checkpoint") + logger.info("Checkpoint finished.") + logger.info(f"Validation result: {results['success']}") return results def create_in_memory_data_context( mysql_credentials: dict, - database_name="information_schema", - great_expectations_bucket_name="pdo-prod-great-expectations", -): + great_expectations_s3_bucket: str, +) -> AbstractDataContext: - print("Preparing DataContext.") data_context = BaseDataContext( project_config=DataContextConfig( datasources={ @@ -59,11 +85,10 @@ def create_in_memory_data_context( ) }, store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name=great_expectations_bucket_name + default_bucket_name=great_expectations_s3_bucket ), ) ) - print("DataContext is ready.") return data_context @@ -71,7 +96,6 @@ def create_in_memory_data_context( def create_expectation_suite( data_context, expectation_suite_name, expectation_configurations ): - print("Preparing Expectation Suite.") suite = data_context.create_expectation_suite( expectation_suite_name, overwrite_existing=True, @@ -82,8 +106,6 @@ def create_expectation_suite( data_context.save_expectation_suite(suite) - print("Expectation Suite was stored.") - return data_context @@ -91,8 +113,6 @@ def create_checkpoint( data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name ): - print("Preparing Checkpoint.") - checkpoint_config = { "name": f"{expectation_suite_name}_checkpoint", "class_name": "Checkpoint", @@ -127,19 +147,5 @@ def create_checkpoint( # The checkpoint gets persisted. Now it can be called again in a different session. data_context.add_checkpoint(**checkpoint_config) - print("Checkpoint was stored.") - return data_context - -def run_checkpoint(the_data_context, name): - - print("Running Checkpoint.") - - print(the_data_context) - - results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint") - - print("Checkpoint finished.") - - return results diff --git a/lolafect/defaults.py b/lolafect/defaults.py index 1d4af84..439a075 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -5,3 +5,4 @@ DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo- DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" DEFAULT_TRINO_HTTP_SCHEME = "https" +DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET = "pdo-prod-great-expectations"