Merge pull request #9 from lolamarket/feature/run-ge
Run data tests on top of a MySQL query with GE
This commit is contained in:
commit
e62087e562
9 changed files with 354 additions and 2 deletions
|
|
@ -2,6 +2,15 @@
|
||||||
|
|
||||||
All notable changes to this project will be documented in this file.
|
All notable changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Started the `data_testing` module with the first task to run data tests on
|
||||||
|
MySQL: `run_data_test_on_mysql`.
|
||||||
|
- The `DW_CREDENTIALS` attribute in `LOLACONFIG` now also includes a `default_db` key.
|
||||||
|
|
||||||
|
|
||||||
## [0.3.0] - 2023-01-27
|
## [0.3.0] - 2023-01-27
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
|
||||||
23
README.md
23
README.md
|
|
@ -120,6 +120,29 @@ with Flow(...) as flow:
|
||||||
close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed])
|
close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed])
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Use Great Expectations
|
||||||
|
|
||||||
|
**Run a Great Expectations validation on a MySQL query**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from lolafect.data_testing import run_data_test_on_mysql
|
||||||
|
|
||||||
|
with Flow(...) as flow:
|
||||||
|
|
||||||
|
my_query = """SELECT something FROM somewhere"""
|
||||||
|
my_expectations = {...} # A bunch of things you want to validate on the result of the query
|
||||||
|
|
||||||
|
validation_results = run_data_test_on_mysql(
|
||||||
|
name="my-cool-validation",
|
||||||
|
mysql_credentials={...},
|
||||||
|
query=my_query,
|
||||||
|
expectations=my_expectations
|
||||||
|
)
|
||||||
|
|
||||||
|
if not validation_results["success"]:
|
||||||
|
print("The data is bad!!!")
|
||||||
|
```
|
||||||
|
|
||||||
### Slack
|
### Slack
|
||||||
|
|
||||||
**Send a warning message to slack if your tasks fails**
|
**Send a warning message to slack if your tasks fails**
|
||||||
|
|
|
||||||
196
lolafect/data_testing.py
Normal file
196
lolafect/data_testing.py
Normal file
|
|
@ -0,0 +1,196 @@
|
||||||
|
from typing import List
|
||||||
|
from urllib.parse import quote_plus as urlquote
|
||||||
|
|
||||||
|
import prefect
|
||||||
|
from prefect import task
|
||||||
|
from great_expectations.data_context import BaseDataContext, AbstractDataContext
|
||||||
|
from great_expectations.data_context.types.base import (
|
||||||
|
DataContextConfig,
|
||||||
|
DatasourceConfig,
|
||||||
|
S3StoreBackendDefaults,
|
||||||
|
)
|
||||||
|
from great_expectations.expectations.expectation import ExpectationConfiguration
|
||||||
|
|
||||||
|
from lolafect.defaults import DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def run_data_test_on_mysql(
|
||||||
|
name: str,
|
||||||
|
mysql_credentials: dict,
|
||||||
|
query: str,
|
||||||
|
expectation_configurations: List[ExpectationConfiguration],
|
||||||
|
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
Validate a query and an expectation suite against a given MySQL server.
|
||||||
|
|
||||||
|
:param name: a unique name for the data test.
|
||||||
|
:param mysql_credentials: credentials for the MySQL instance.
|
||||||
|
:param query: the query to test against.
|
||||||
|
:param expectation_configurations: the expectations on the dataset.
|
||||||
|
:param great_expectations_s3_bucket: the bucket where Great Expectations
|
||||||
|
files live.
|
||||||
|
:return: the result of the data test.
|
||||||
|
"""
|
||||||
|
logger = prefect.context.get("logger")
|
||||||
|
|
||||||
|
logger.info("Creating data context.")
|
||||||
|
data_context = _create_in_memory_data_context_for_mysql(
|
||||||
|
mysql_credentials, great_expectations_s3_bucket
|
||||||
|
)
|
||||||
|
logger.info("Data context created.")
|
||||||
|
logger.info("Creating expectation suite.")
|
||||||
|
data_context = _create_expectation_suite(
|
||||||
|
data_context, name, expectation_configurations
|
||||||
|
)
|
||||||
|
logger.info("Expectation suite created.")
|
||||||
|
logger.info("Creating checkpoint.")
|
||||||
|
data_context = _create_checkpoint(
|
||||||
|
data_context,
|
||||||
|
f"{mysql_credentials['host']}:{mysql_credentials['port']}",
|
||||||
|
query,
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
logger.info("Checkpoint created.")
|
||||||
|
logger.info("Running checkpoint.")
|
||||||
|
results = data_context.run_checkpoint(f"{name}_checkpoint")
|
||||||
|
logger.info("Checkpoint finished.")
|
||||||
|
logger.info(f"Validation result: {results['success']}")
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def _create_in_memory_data_context_for_mysql(
|
||||||
|
mysql_credentials: dict,
|
||||||
|
great_expectations_s3_bucket: str,
|
||||||
|
) -> AbstractDataContext:
|
||||||
|
"""
|
||||||
|
Create a DataContext without a YAML config file and specify a MySQL
|
||||||
|
datasource.
|
||||||
|
|
||||||
|
:param mysql_credentials: the creds to the mysql where the query will be
|
||||||
|
executed.
|
||||||
|
:param great_expectations_s3_bucket: the name of the bucket where Great
|
||||||
|
Exepctations files while be stored.
|
||||||
|
:return: the data context.
|
||||||
|
"""
|
||||||
|
|
||||||
|
data_context = BaseDataContext(
|
||||||
|
project_config=DataContextConfig(
|
||||||
|
datasources={
|
||||||
|
f"{mysql_credentials['host']}:{mysql_credentials['port']}": DatasourceConfig(
|
||||||
|
class_name="Datasource",
|
||||||
|
execution_engine={
|
||||||
|
"class_name": "SqlAlchemyExecutionEngine",
|
||||||
|
"connection_string": f"mysql+pymysql://%s:%s@%s:%s/{mysql_credentials['db']}"
|
||||||
|
% (
|
||||||
|
mysql_credentials["user"],
|
||||||
|
urlquote(mysql_credentials["password"]),
|
||||||
|
mysql_credentials["host"],
|
||||||
|
mysql_credentials["port"],
|
||||||
|
),
|
||||||
|
},
|
||||||
|
data_connectors={
|
||||||
|
"default_runtime_data_connector_name": {
|
||||||
|
"class_name": "RuntimeDataConnector",
|
||||||
|
"batch_identifiers": ["default_identifier_name"],
|
||||||
|
},
|
||||||
|
"default_inferred_data_connector_name": {
|
||||||
|
"class_name": "InferredAssetSqlDataConnector",
|
||||||
|
"name": "whole_table",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
store_backend_defaults=S3StoreBackendDefaults(
|
||||||
|
default_bucket_name=great_expectations_s3_bucket
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return data_context
|
||||||
|
|
||||||
|
|
||||||
|
def _create_expectation_suite(
|
||||||
|
data_context: AbstractDataContext,
|
||||||
|
expectation_suite_name: str,
|
||||||
|
expectation_configurations: List[ExpectationConfiguration],
|
||||||
|
) -> AbstractDataContext:
|
||||||
|
"""
|
||||||
|
Create a new expectation suite in the data context with the passed
|
||||||
|
expectations.
|
||||||
|
|
||||||
|
:param data_context: the current data context.
|
||||||
|
:param expectation_suite_name: the name to give to the new expectation
|
||||||
|
suite.
|
||||||
|
:param expectation_configurations: the configs of the expectations to
|
||||||
|
include in the expectation suite.
|
||||||
|
:return: the same data context, now containing the new suite.
|
||||||
|
"""
|
||||||
|
suite = data_context.create_expectation_suite(
|
||||||
|
expectation_suite_name,
|
||||||
|
overwrite_existing=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for expectation_configuration in expectation_configurations:
|
||||||
|
suite.add_expectation(expectation_configuration=expectation_configuration)
|
||||||
|
|
||||||
|
data_context.save_expectation_suite(suite)
|
||||||
|
|
||||||
|
return data_context
|
||||||
|
|
||||||
|
|
||||||
|
def _create_checkpoint(
|
||||||
|
data_context: AbstractDataContext,
|
||||||
|
datasource_name: str,
|
||||||
|
query_for_checkpoint: str,
|
||||||
|
expectation_suite_name: str,
|
||||||
|
) -> AbstractDataContext:
|
||||||
|
"""
|
||||||
|
Create a checkpoint in the given data context that combines the query and
|
||||||
|
the expectation suite.
|
||||||
|
|
||||||
|
:param data_context: the current data context.
|
||||||
|
:param datasource_name:
|
||||||
|
:param query_for_checkpoint: the query that will provide the data to test
|
||||||
|
in the checkpoint.
|
||||||
|
:param expectation_suite_name: the name of the expectation suite to test
|
||||||
|
against the data.
|
||||||
|
:return: the same data context, now containing the new checkpoint.
|
||||||
|
"""
|
||||||
|
|
||||||
|
checkpoint_config = {
|
||||||
|
"name": f"{expectation_suite_name}_checkpoint",
|
||||||
|
"class_name": "Checkpoint",
|
||||||
|
"config_version": 1,
|
||||||
|
"run_name_template": f"{expectation_suite_name}_checkpoint",
|
||||||
|
"action_list": [
|
||||||
|
{
|
||||||
|
"name": "store_validation_result",
|
||||||
|
"action": {"class_name": "StoreValidationResultAction"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "store_evaluation_params",
|
||||||
|
"action": {"class_name": "StoreEvaluationParametersAction"},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
"validations": [
|
||||||
|
{
|
||||||
|
"batch_request": {
|
||||||
|
"datasource_name": datasource_name,
|
||||||
|
"data_connector_name": "default_runtime_data_connector_name",
|
||||||
|
"data_asset_name": f"{expectation_suite_name}_validation_query",
|
||||||
|
"runtime_parameters": {"query": query_for_checkpoint},
|
||||||
|
"batch_identifiers": {
|
||||||
|
"default_identifier_name": "default_identifier"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"expectation_suite_name": expectation_suite_name,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
data_context.add_checkpoint(**checkpoint_config)
|
||||||
|
|
||||||
|
return data_context
|
||||||
|
|
@ -7,3 +7,4 @@ DEFAULT_KUBERNETES_IMAGE = (
|
||||||
DEFAULT_KUBERNETES_LABELS = ["k8s"]
|
DEFAULT_KUBERNETES_LABELS = ["k8s"]
|
||||||
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
|
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
|
||||||
DEFAULT_TRINO_HTTP_SCHEME = "https"
|
DEFAULT_TRINO_HTTP_SCHEME = "https"
|
||||||
|
DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET = "pdo-prod-great-expectations"
|
||||||
|
|
|
||||||
|
|
@ -157,6 +157,7 @@ class LolaConfig:
|
||||||
"user": self.ENV_DATA["datadw_user"],
|
"user": self.ENV_DATA["datadw_user"],
|
||||||
"password": self.ENV_DATA["datadw_pass"],
|
"password": self.ENV_DATA["datadw_pass"],
|
||||||
"port": self.ENV_DATA["datadw_port"],
|
"port": self.ENV_DATA["datadw_port"],
|
||||||
|
"default_db": self.ENV_DATA["datadw_default_db"]
|
||||||
}
|
}
|
||||||
|
|
||||||
@_needs_env_data
|
@_needs_env_data
|
||||||
|
|
|
||||||
|
|
@ -6,3 +6,5 @@ httpretty==1.1.4
|
||||||
trino==0.321.0
|
trino==0.321.0
|
||||||
sshtunnel==0.4.0
|
sshtunnel==0.4.0
|
||||||
PyMySQL==1.0.2
|
PyMySQL==1.0.2
|
||||||
|
great_expectations==0.15.45
|
||||||
|
SQLAlchemy==1.4.46
|
||||||
4
setup.py
4
setup.py
|
|
@ -29,6 +29,8 @@ setup(
|
||||||
"boto3==1.26.40",
|
"boto3==1.26.40",
|
||||||
"trino==0.321.0",
|
"trino==0.321.0",
|
||||||
"sshtunnel==0.4.0",
|
"sshtunnel==0.4.0",
|
||||||
"PyMySQL==1.0.2"
|
"PyMySQL==1.0.2",
|
||||||
|
"great_expectations==0.15.45",
|
||||||
|
"SQLAlchemy==1.4.46",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
||||||
117
tests/test_integration/test_data_testing.py
Normal file
117
tests/test_integration/test_data_testing.py
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
from great_expectations.core.expectation_configuration import ExpectationConfiguration
|
||||||
|
|
||||||
|
from lolafect.lolaconfig import build_lolaconfig
|
||||||
|
from lolafect.data_testing import run_data_test_on_mysql
|
||||||
|
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
|
||||||
|
|
||||||
|
# __ __ _____ _ _ _____ _ _ _____ _
|
||||||
|
# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| |
|
||||||
|
# \ \ /\ / / \ | |__) | \| | | | | \| | | __| |
|
||||||
|
# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | |
|
||||||
|
# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_|
|
||||||
|
# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_)
|
||||||
|
# This testing suite requires:
|
||||||
|
# - The calling shell to have permission in AWS
|
||||||
|
# - The calling shell to be within the Mercadão network
|
||||||
|
# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and
|
||||||
|
# rely external resources. Instead, use them manually to check yourself that things
|
||||||
|
# are working properly.
|
||||||
|
|
||||||
|
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
|
||||||
|
|
||||||
|
|
||||||
|
def test_validation_on_mysql_succeeds():
|
||||||
|
|
||||||
|
test_query = """
|
||||||
|
SELECT 1 AS a_one,
|
||||||
|
"lol" AS a_string,
|
||||||
|
NULL AS a_null
|
||||||
|
"""
|
||||||
|
test_expectations = [
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_be_between",
|
||||||
|
kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_match_like_pattern",
|
||||||
|
kwargs={"column": "a_string", "like_pattern": "%lol%"},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_be_null",
|
||||||
|
kwargs={"column": "a_null"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
||||||
|
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
||||||
|
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
||||||
|
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
||||||
|
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
||||||
|
)
|
||||||
|
|
||||||
|
validation_result = run_data_test_on_mysql.run(
|
||||||
|
name="lolafect-testing-test_validation_on_mysql_succeeds",
|
||||||
|
mysql_credentials={
|
||||||
|
"host": ssh_tunnel.local_bind_address[0],
|
||||||
|
"port": ssh_tunnel.local_bind_address[1],
|
||||||
|
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
||||||
|
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
||||||
|
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
||||||
|
},
|
||||||
|
query=test_query,
|
||||||
|
expectation_configurations=test_expectations,
|
||||||
|
)
|
||||||
|
|
||||||
|
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
||||||
|
|
||||||
|
data_test_passed = validation_result["success"] == True
|
||||||
|
|
||||||
|
assert data_test_passed
|
||||||
|
|
||||||
|
|
||||||
|
def test_validation_on_mysql_fails():
|
||||||
|
test_query = """
|
||||||
|
SELECT 1 AS a_one,
|
||||||
|
"lol" AS a_string,
|
||||||
|
NULL AS a_null
|
||||||
|
"""
|
||||||
|
test_expectations = [
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_be_between",
|
||||||
|
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_match_like_pattern",
|
||||||
|
kwargs={"column": "a_string", "like_pattern": "%xD%"},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_not_be_null",
|
||||||
|
kwargs={"column": "a_null"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
||||||
|
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
||||||
|
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
||||||
|
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
||||||
|
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
||||||
|
)
|
||||||
|
|
||||||
|
validation_result = run_data_test_on_mysql.run(
|
||||||
|
name="lolafect-testing-test_validation_on_mysql_fails",
|
||||||
|
mysql_credentials={
|
||||||
|
"host": ssh_tunnel.local_bind_address[0],
|
||||||
|
"port": ssh_tunnel.local_bind_address[1],
|
||||||
|
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
||||||
|
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
||||||
|
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
||||||
|
},
|
||||||
|
query=test_query,
|
||||||
|
expectation_configurations=test_expectations,
|
||||||
|
)
|
||||||
|
|
||||||
|
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
||||||
|
|
||||||
|
data_test_failed = validation_result["success"] == False
|
||||||
|
|
||||||
|
assert data_test_failed
|
||||||
|
|
@ -84,6 +84,7 @@ def test_lolaconfig_fetches_dw_creds_properly():
|
||||||
"datadw_user": "some_user",
|
"datadw_user": "some_user",
|
||||||
"datadw_pass": "some_password",
|
"datadw_pass": "some_password",
|
||||||
"datadw_port": "some_port",
|
"datadw_port": "some_port",
|
||||||
|
"datadw_default_db": "some_db"
|
||||||
}
|
}
|
||||||
|
|
||||||
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
|
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue