WIP, still not working

This commit is contained in:
Pablo Martin 2023-01-27 13:13:08 +01:00
parent 375bd77197
commit 922952bf0f
2 changed files with 48 additions and 29 deletions

View file

@ -1,4 +1,12 @@
from urllib.parse import quote_plus as urlquote
from prefect import task from prefect import task
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
DatasourceConfig,
S3StoreBackendDefaults,
)
@task @task
@ -9,37 +17,33 @@ def run_data_test_on_mysql(
expectations, expectations,
): ):
data_context = create_in_memory_data_context(mysql_credentials) data_context = create_in_memory_data_context(mysql_credentials)
data_context = create_expectation_suite(data_context, name, expectations)
if expectation_configurations: data_context = create_checkpoint(data_context, mysql_credentials, query, name)
data_context = create_expectation_suite( results = run_checkpoint(data_context, name)
data_context, expectation_suite_name, expectation_configurations
)
data_context = create_checkpoint(
data_context, query_for_checkpoint, expectation_suite_name
)
results = run_checkpoint(data_context)
return results return results
def create_in_memory_data_context(dw_connection): def create_in_memory_data_context(
mysql_credentials,
database_name="staging",
great_expectations_bucket_name="pdo-prod-great-expectations",
):
print("Preparing DataContext.") print("Preparing DataContext.")
data_context = BaseDataContext( data_context = BaseDataContext(
project_config=DataContextConfig( project_config=DataContextConfig(
datasources={ datasources={
"dw-staging": DatasourceConfig( "HARCODED": DatasourceConfig(#f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig(
class_name="Datasource", class_name="Datasource",
execution_engine={ execution_engine={
"class_name": "SqlAlchemyExecutionEngine", "class_name": "SqlAlchemyExecutionEngine",
"connection_string": "mysql+pymysql://%s:%s@%s:%s/staging" "connection_string": f"mysql+pymysql://%s:%s@%s:%s/{database_name}"
% ( % (
dw_connection.raw_user, mysql_credentials["user"],
urlquote(dw_connection.raw_password), urlquote(mysql_credentials["password"]),
dw_connection.host, mysql_credentials["host"],
dw_connection.port, mysql_credentials["port"],
), ),
}, },
data_connectors={ data_connectors={
@ -55,7 +59,7 @@ def create_in_memory_data_context(dw_connection):
) )
}, },
store_backend_defaults=S3StoreBackendDefaults( store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name="pdo-prod-great-expectations" default_bucket_name=great_expectations_bucket_name
), ),
) )
) )
@ -83,15 +87,17 @@ def create_expectation_suite(
return data_context return data_context
def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name): def create_checkpoint(
data_context, mysql_credentials, query_for_checkpoint, expectation_suite_name
):
print("Preparing Checkpoint.") print("Preparing Checkpoint.")
checkpoint_config = { checkpoint_config = {
"name": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", "name": f"{expectation_suite_name}_checkpoint",
"class_name": "Checkpoint", "class_name": "Checkpoint",
"config_version": 1, "config_version": 1,
"run_name_template": f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint", "run_name_template": f"{expectation_suite_name}_checkpoint",
"action_list": [ "action_list": [
{ {
"name": "store_validation_result", "name": "store_validation_result",
@ -105,9 +111,9 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name
"validations": [ "validations": [
{ {
"batch_request": { "batch_request": {
"datasource_name": "dw-staging", "datasource_name": "HARCODED",
"data_connector_name": "default_runtime_data_connector_name", "data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": f"{LOLACONFIG.FLOW_NAME_UDCS}_validation_query", "data_asset_name": f"{expectation_suite_name}_validation_query",
"runtime_parameters": {"query": query_for_checkpoint}, "runtime_parameters": {"query": query_for_checkpoint},
"batch_identifiers": { "batch_identifiers": {
"default_identifier_name": "default_identifier" "default_identifier_name": "default_identifier"
@ -126,13 +132,13 @@ def create_checkpoint(data_context, query_for_checkpoint, expectation_suite_name
return data_context return data_context
def run_checkpoint(the_data_context): def run_checkpoint(the_data_context, name):
print("Running Checkpoint.") print("Running Checkpoint.")
results = the_data_context.run_checkpoint( print(the_data_context)
checkpoint_name=f"{LOLACONFIG.FLOW_NAME_UDCS}_checkpoint"
) results = the_data_context.run_checkpoint(checkpoint_name=f"{name}_checkpoint")
print("Checkpoint finished.") print("Checkpoint finished.")

View file

@ -2,6 +2,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfigu
from lolafect.lolaconfig import build_lolaconfig from lolafect.lolaconfig import build_lolaconfig
from lolafect.data_testing import run_data_test_on_mysql from lolafect.data_testing import run_data_test_on_mysql
from lolafect.connections import open_ssh_tunnel_with_s3_pkey
# __ __ _____ _ _ _____ _ _ _____ _ # __ __ _____ _ _ _____ _ _ _____ _
# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| |
@ -40,9 +41,21 @@ def test_validation_on_mysql_succeeds():
), ),
] ]
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
)
validation_result = run_data_test_on_mysql.run( validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_succeeds", name="lolafect-testing-test_validation_on_mysql_succeeds",
mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, mysql_credentials={
"host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"]
},
query=test_query, query=test_query,
expectations=test_expectations expectations=test_expectations
) )