Bring over code from existing GE flows

This commit is contained in:
Pablo Martin 2023-01-26 18:09:08 +01:00
parent 3b67c64959
commit e675056e3f

139
lolafect/data_testing.py Normal file
View file

@ -0,0 +1,139 @@
from prefect import task
@task
def run_data_test_on_mysql(
name,
mysql_credentials,
query,
expectations,
):
data_context = create_in_memory_data_context(mysql_credentials)
if expectation_configurations:
data_context = create_expectation_suite(
data_context, expectation_suite_name, expectation_configurations
)
data_context = create_checkpoint(
data_context, query_for_checkpoint, expectation_suite_name
)
results = run_checkpoint(data_context)
return results
def create_in_memory_data_context(dw_connection):
print("Preparing DataContext.")
data_context = BaseDataContext(
project_config=DataContextConfig(
datasources={
"dw-staging": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": "mysql+pymysql://%s:%s@%s:%s/staging"
% (
dw_connection.raw_user,
urlquote(dw_connection.raw_password),
dw_connection.host,
dw_connection.port,
),
},
data_connectors={
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"name": "whole_table",
},
},
)
},
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name="pdo-prod-great-expectations"
),
)
)
print("DataContext is ready.")
return data_context
def create_expectation_suite(
data_context, expectation_suite_name, expectation_configurations
):
print("Preparing Expectation Suite.")
suite = data_context.create_expectation_suite(
expectation_suite_name,
overwrite_existing=True,
)
for expectation_configuration in expectation_configurations:
suite.add_expectation(expectation_configuration=expectation_configuration)
data_context.save_expectation_suite(suite)
print("Expectation Suite was stored.")
return data_context
def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name):
print("Preparing Checkpoint.")
checkpoint_config = {
"name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint",
"class_name": "Checkpoint",
"config_version": 1,
"run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
],
"validations": [
{
"batch_request": {
"datasource_name": "dw-staging",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query",
"runtime_parameters": {"query": query_for_checkpoint},
"batch_identifiers": {
"default_identifier_name": "default_identifier"
},
},
"expectation_suite_name": expectation_suite_name,
}
],
}
# The checkpoint gets persisted. Now it can be called again in a different session.
data_context.add_checkpoint(**checkpoint_config)
print("Checkpoint was stored.")
return data_context
def run_checkpoint(the_data_context):
print("Running Checkpoint.")
results = the_data_context.run_checkpoint(
checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint"
)
print("Checkpoint finished.")
return results