Refactors

This commit is contained in:
Pablo Martin 2023-02-02 16:39:22 +01:00
parent 2f62b34543
commit a7b1f55ff1
2 changed files with 40 additions and 33 deletions

View file

@ -1,36 +1,62 @@
from urllib.parse import quote_plus as urlquote from urllib.parse import quote_plus as urlquote
import prefect
from prefect import task from prefect import task
from great_expectations.data_context import BaseDataContext from great_expectations.data_context import BaseDataContext, AbstractDataContext
from great_expectations.data_context.types.base import ( from great_expectations.data_context.types.base import (
DataContextConfig, DataContextConfig,
DatasourceConfig, DatasourceConfig,
S3StoreBackendDefaults, S3StoreBackendDefaults,
) )
from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET
@task @task
def run_data_test_on_mysql( def run_data_test_on_mysql(
name, name: str,
mysql_credentials, mysql_credentials: dict,
query, query: str,
expectations, expectations: dict,
): great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
data_context = create_in_memory_data_context(mysql_credentials) ) -> dict:
"""
Validate a query and an expectation suite against a given MySQL server.
:param name: a name for the data test.
:param mysql_credentials: credentials for the MySQL instance.
:param query: the query to test against.
:param expectations: the expectations on the dataset.
:param great_expectations_s3_bucket: the bucket where Great Expectations
files live.
:return: the result of the data test.
"""
logger = prefect.context.get("logger")
logger.info("Creating data context.")
data_context = create_in_memory_data_context(
mysql_credentials, great_expectations_s3_bucket
)
logger.info("Data context created.")
logger.info("Creating expectation suite.")
data_context = create_expectation_suite(data_context, name, expectations) data_context = create_expectation_suite(data_context, name, expectations)
logger.info("Expectation suite created.")
logger.info("Creating checkpoint.")
data_context = create_checkpoint(data_context, mysql_credentials, query, name) data_context = create_checkpoint(data_context, mysql_credentials, query, name)
results = run_checkpoint(data_context, name) logger.info("Checkpoint created.")
logger.info("Running checkpoint.")
results = data_context.run_checkpoint(f"{name}_checkpoint")
logger.info("Checkpoint finished.")
logger.info(f"Validation result: {results['success']}")
return results return results
def create_in_memory_data_context( def create_in_memory_data_context(
mysql_credentials: dict, mysql_credentials: dict,
database_name="information_schema", great_expectations_s3_bucket: str,
great_expectations_bucket_name="pdo-prod-great-expectations", ) -> AbstractDataContext:
):
print("Preparing DataContext.")
data_context = BaseDataContext( data_context = BaseDataContext(
project_config=DataContextConfig( project_config=DataContextConfig(
datasources={ datasources={
@ -59,11 +85,10 @@ def create_in_memory_data_context(
) )
}, },
store_backend_defaults=S3StoreBackendDefaults( store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=great_expectations_bucket_name default_bucket_name=great_expectations_s3_bucket
), ),
) )
) )
print("DataContext is ready.")
return data_context return data_context
@ -71,7 +96,6 @@ def create_in_memory_data_context(
def create_expectation_suite( def create_expectation_suite(
data_context, expectation_suite_name, expectation_configurations data_context, expectation_suite_name, expectation_configurations
): ):
print("Preparing Expectation Suite.")
suite = data_context.create_expectation_suite( suite = data_context.create_expectation_suite(
expectation_suite_name, expectation_suite_name,
overwrite_existing=True, overwrite_existing=True,
@ -82,8 +106,6 @@ def create_expectation_suite(
data_context.save_expectation_suite(suite) data_context.save_expectation_suite(suite)
print("Expectation Suite was stored.")
return data_context return data_context
@ -91,8 +113,6 @@ def create_checkpoint(
data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name
): ):
print("Preparing Checkpoint.")
checkpoint_config = { checkpoint_config = {
"name": f"{expectation_suite_name}_checkpoint", "name": f"{expectation_suite_name}_checkpoint",
"class_name": "Checkpoint", "class_name": "Checkpoint",
@ -127,19 +147,5 @@ def create_checkpoint(
# The checkpoint gets persisted. Now it can be called again in a different session. # The checkpoint gets persisted. Now it can be called again in a different session.
data_context.add_checkpoint(**checkpoint_config) data_context.add_checkpoint(**checkpoint_config)
print("Checkpoint was stored.")
return data_context return data_context
def run_checkpoint(the_data_context, name):
print("Running Checkpoint.")
print(the_data_context)
results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint")
print("Checkpoint finished.")
return results

View file

@ -5,3 +5,4 @@ DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-
DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_KUBERNETES_LABELS = ["k8s"]
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
DEFAULT_TRINO_HTTP_SCHEME = "https" DEFAULT_TRINO_HTTP_SCHEME = "https"
DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET = "pdo-prod-great-expectations"