Compare commits
No commits in common. "d9c131dcc6266d5022dfca7cdabaffbed468fac5" and "885c30184fc6c1de721a169f505322385d1d1dde" have entirely different histories.
d9c131dcc6
...
885c30184f
3 changed files with 82 additions and 240 deletions
22
README.md
22
README.md
|
|
@ -143,28 +143,6 @@ with Flow(...) as flow:
|
||||||
print("The data is bad!!!")
|
print("The data is bad!!!")
|
||||||
```
|
```
|
||||||
|
|
||||||
**Run a Great Expectations validation on a Trino query**
|
|
||||||
|
|
||||||
```python
|
|
||||||
from lolafect.data_testing import run_data_test_on_trino
|
|
||||||
|
|
||||||
with Flow(...) as flow:
|
|
||||||
|
|
||||||
my_query = """SELECT something FROM somewhere"""
|
|
||||||
my_expectations = {...} # A bunch of things you want to validate on the result of the query
|
|
||||||
|
|
||||||
validation_results = run_data_test_on_trino(
|
|
||||||
name="my-cool-validation",
|
|
||||||
trino_credentials={...},
|
|
||||||
query=my_query,
|
|
||||||
expectations=my_expectations
|
|
||||||
)
|
|
||||||
|
|
||||||
if not validation_results["success"]:
|
|
||||||
print("The data is bad!!!")
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
### Slack
|
### Slack
|
||||||
|
|
||||||
**Send a warning message to slack if your tasks fails**
|
**Send a warning message to slack if your tasks fails**
|
||||||
|
|
|
||||||
|
|
@ -60,51 +60,6 @@ def run_data_test_on_mysql(
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@task()
|
|
||||||
def run_data_test_on_trino(
|
|
||||||
name: str,
|
|
||||||
trino_credentials: dict,
|
|
||||||
query: str,
|
|
||||||
expectation_configurations: List[ExpectationConfiguration],
|
|
||||||
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
|
|
||||||
) -> dict:
|
|
||||||
"""
|
|
||||||
Validate a query and an expectation suite against a given Trino server.
|
|
||||||
|
|
||||||
:param name: a unique name for the data test.
|
|
||||||
:param trino_credentials: credentials for the Trino cluster.
|
|
||||||
:param query: the query to test against.
|
|
||||||
:param expectation_configurations: the expectations on the dataset.
|
|
||||||
:param great_expectations_s3_bucket: the bucket where Great Expectations
|
|
||||||
files live.
|
|
||||||
:return: the result of the data test.
|
|
||||||
"""
|
|
||||||
logger = prefect.context.get("logger")
|
|
||||||
|
|
||||||
logger.info("Creating data context.")
|
|
||||||
data_context = _create_in_memory_data_context_for_trino(
|
|
||||||
trino_credentials, great_expectations_s3_bucket
|
|
||||||
)
|
|
||||||
logger.info("Data context created.")
|
|
||||||
logger.info("Creating expectation suite.")
|
|
||||||
data_context = _create_expectation_suite(
|
|
||||||
data_context, name, expectation_configurations
|
|
||||||
)
|
|
||||||
logger.info("Expectation suite created.")
|
|
||||||
logger.info("Creating checkpoint.")
|
|
||||||
data_context = _create_checkpoint(
|
|
||||||
data_context,
|
|
||||||
f"{trino_credentials['host']}:{trino_credentials['port']}",
|
|
||||||
query,
|
|
||||||
name,
|
|
||||||
)
|
|
||||||
logger.info("Checkpoint created.")
|
|
||||||
logger.info("Running checkpoint.")
|
|
||||||
results = data_context.run_checkpoint(f"{name}_checkpoint")
|
|
||||||
logger.info("Checkpoint finished.")
|
|
||||||
logger.info(f"Validation result: {results['success']}")
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _create_in_memory_data_context_for_mysql(
|
def _create_in_memory_data_context_for_mysql(
|
||||||
mysql_credentials: dict,
|
mysql_credentials: dict,
|
||||||
|
|
@ -156,58 +111,6 @@ def _create_in_memory_data_context_for_mysql(
|
||||||
|
|
||||||
return data_context
|
return data_context
|
||||||
|
|
||||||
def _create_in_memory_data_context_for_trino(
|
|
||||||
trino_credentials: dict,
|
|
||||||
great_expectations_s3_bucket: str,
|
|
||||||
) -> AbstractDataContext:
|
|
||||||
"""
|
|
||||||
Create a DataContext without a YAML config file and specify a Trino
|
|
||||||
datasource.
|
|
||||||
|
|
||||||
:param trino_credentials: the creds to the mysql where the query will be
|
|
||||||
executed.
|
|
||||||
:param great_expectations_s3_bucket: the name of the bucket where Great
|
|
||||||
Exepctations files while be stored.
|
|
||||||
:return: the data context.
|
|
||||||
"""
|
|
||||||
|
|
||||||
data_context = BaseDataContext(
|
|
||||||
project_config=DataContextConfig(
|
|
||||||
datasources={
|
|
||||||
f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig(
|
|
||||||
class_name="Datasource",
|
|
||||||
execution_engine={
|
|
||||||
"class_name": "SqlAlchemyExecutionEngine",
|
|
||||||
"connection_string": f"trino://%s:%s@%s:%s/%s/%s"
|
|
||||||
% (
|
|
||||||
trino_credentials["user"],
|
|
||||||
urlquote(trino_credentials["password"]),
|
|
||||||
trino_credentials["host"],
|
|
||||||
trino_credentials["port"],
|
|
||||||
trino_credentials["catalog"],
|
|
||||||
trino_credentials["schema"],
|
|
||||||
),
|
|
||||||
},
|
|
||||||
data_connectors={
|
|
||||||
"default_runtime_data_connector_name": {
|
|
||||||
"class_name": "RuntimeDataConnector",
|
|
||||||
"batch_identifiers": ["default_identifier_name"],
|
|
||||||
},
|
|
||||||
"default_inferred_data_connector_name": {
|
|
||||||
"class_name": "InferredAssetSqlDataConnector",
|
|
||||||
"name": "whole_table",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
},
|
|
||||||
store_backend_defaults=S3StoreBackendDefaults(
|
|
||||||
default_bucket_name=great_expectations_s3_bucket
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return data_context
|
|
||||||
|
|
||||||
|
|
||||||
def _create_expectation_suite(
|
def _create_expectation_suite(
|
||||||
data_context: AbstractDataContext,
|
data_context: AbstractDataContext,
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
from great_expectations.core.expectation_configuration import ExpectationConfiguration
|
from great_expectations.core.expectation_configuration import ExpectationConfiguration
|
||||||
|
|
||||||
from lolafect.lolaconfig import build_lolaconfig
|
from lolafect.lolaconfig import build_lolaconfig
|
||||||
from lolafect.data_testing import run_data_test_on_mysql, run_data_test_on_trino
|
from lolafect.data_testing import run_data_test_on_mysql
|
||||||
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
|
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
|
||||||
|
|
||||||
# __ __ _____ _ _ _____ _ _ _____ _
|
# __ __ _____ _ _ _____ _ _ _____ _
|
||||||
|
|
@ -18,139 +18,100 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
|
||||||
# are working properly.
|
# are working properly.
|
||||||
|
|
||||||
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
|
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
|
||||||
#1 AS a_one,
|
|
||||||
# 'lol' AS a_string,
|
|
||||||
# NULL AS a_null
|
|
||||||
TEST_QUERY = """
|
|
||||||
SELECT *
|
|
||||||
from app_lm_mysql_pl.comprea.market
|
|
||||||
where id = 1
|
|
||||||
"""
|
|
||||||
|
|
||||||
TEST_EXPECTATIONS_THAT_FIT_DATA = [
|
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_be_between",
|
|
||||||
kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
|
|
||||||
),
|
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_match_like_pattern",
|
|
||||||
kwargs={"column": "a_string", "like_pattern": "%lol%"},
|
|
||||||
),
|
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_be_null",
|
|
||||||
kwargs={"column": "a_null"},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [
|
def test_validation_on_mysql_succeeds():
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_be_between",
|
|
||||||
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
|
|
||||||
),
|
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_match_like_pattern",
|
|
||||||
kwargs={"column": "a_string", "like_pattern": "%xD%"},
|
|
||||||
),
|
|
||||||
ExpectationConfiguration(
|
|
||||||
expectation_type="expect_column_values_to_not_be_null",
|
|
||||||
kwargs={"column": "a_null"},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
#
|
test_query = """
|
||||||
# def test_validation_on_mysql_succeeds():
|
SELECT 1 AS a_one,
|
||||||
# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
"lol" AS a_string,
|
||||||
# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
NULL AS a_null
|
||||||
# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
"""
|
||||||
# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
test_expectations = [
|
||||||
# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
ExpectationConfiguration(
|
||||||
# )
|
expectation_type="expect_column_values_to_be_between",
|
||||||
#
|
kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
|
||||||
# validation_result = run_data_test_on_mysql.run(
|
),
|
||||||
# name="lolafect-testing-test_validation_on_mysql_succeeds",
|
ExpectationConfiguration(
|
||||||
# mysql_credentials={
|
expectation_type="expect_column_values_to_match_like_pattern",
|
||||||
# "host": ssh_tunnel.local_bind_address[0],
|
kwargs={"column": "a_string", "like_pattern": "%lol%"},
|
||||||
# "port": ssh_tunnel.local_bind_address[1],
|
),
|
||||||
# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
ExpectationConfiguration(
|
||||||
# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
expectation_type="expect_column_values_to_be_null",
|
||||||
# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
kwargs={"column": "a_null"},
|
||||||
# },
|
),
|
||||||
# query=TEST_QUERY,
|
]
|
||||||
# expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
|
||||||
#
|
|
||||||
# data_test_passed = validation_result["success"] == True
|
|
||||||
#
|
|
||||||
# assert data_test_passed
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# def test_validation_on_mysql_fails():
|
|
||||||
# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
|
||||||
# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
|
||||||
# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
|
||||||
# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
|
||||||
# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# validation_result = run_data_test_on_mysql.run(
|
|
||||||
# name="lolafect-testing-test_validation_on_mysql_fails",
|
|
||||||
# mysql_credentials={
|
|
||||||
# "host": ssh_tunnel.local_bind_address[0],
|
|
||||||
# "port": ssh_tunnel.local_bind_address[1],
|
|
||||||
# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
|
||||||
# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
|
||||||
# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
|
||||||
# },
|
|
||||||
# query=TEST_QUERY,
|
|
||||||
# expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
|
||||||
#
|
|
||||||
# data_test_failed = validation_result["success"] == False
|
|
||||||
#
|
|
||||||
# assert data_test_failed
|
|
||||||
#
|
|
||||||
|
|
||||||
def test_validation_on_trino_succeeds():
|
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
||||||
validation_result = run_data_test_on_trino.run(
|
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
||||||
name="lolafect-testing-test_validation_on_mysql_fails",
|
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
||||||
trino_credentials={
|
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
||||||
"host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"],
|
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
||||||
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
|
|
||||||
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
|
|
||||||
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
|
|
||||||
"catalog": "data_dw",
|
|
||||||
"schema": "sandbox"
|
|
||||||
},
|
|
||||||
query=TEST_QUERY,
|
|
||||||
expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
|
|
||||||
)
|
)
|
||||||
print("###############\n" * 20)
|
|
||||||
print(validation_result)
|
validation_result = run_data_test_on_mysql.run(
|
||||||
|
name="lolafect-testing-test_validation_on_mysql_succeeds",
|
||||||
|
mysql_credentials={
|
||||||
|
"host": ssh_tunnel.local_bind_address[0],
|
||||||
|
"port": ssh_tunnel.local_bind_address[1],
|
||||||
|
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
||||||
|
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
||||||
|
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
||||||
|
},
|
||||||
|
query=test_query,
|
||||||
|
expectation_configurations=test_expectations,
|
||||||
|
)
|
||||||
|
|
||||||
|
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
||||||
|
|
||||||
data_test_passed = validation_result["success"] == True
|
data_test_passed = validation_result["success"] == True
|
||||||
|
|
||||||
assert data_test_passed
|
assert data_test_passed
|
||||||
|
|
||||||
|
|
||||||
def test_validation_on_trino_fails():
|
def test_validation_on_mysql_fails():
|
||||||
validation_result = run_data_test_on_trino.run(
|
test_query = """
|
||||||
name="lolafect-testing-test_validation_on_mysql_fails",
|
SELECT 1 AS a_one,
|
||||||
trino_credentials={
|
"lol" AS a_string,
|
||||||
"host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"],
|
NULL AS a_null
|
||||||
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
|
"""
|
||||||
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
|
test_expectations = [
|
||||||
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
|
ExpectationConfiguration(
|
||||||
"catalog": "data_dw",
|
expectation_type="expect_column_values_to_be_between",
|
||||||
"schema": "sandbox"
|
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
|
||||||
},
|
),
|
||||||
query=TEST_QUERY,
|
ExpectationConfiguration(
|
||||||
expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
|
expectation_type="expect_column_values_to_match_like_pattern",
|
||||||
|
kwargs={"column": "a_string", "like_pattern": "%xD%"},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_not_be_null",
|
||||||
|
kwargs={"column": "a_null"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
|
||||||
|
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
|
||||||
|
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
||||||
|
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
|
||||||
|
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
validation_result = run_data_test_on_mysql.run(
|
||||||
|
name="lolafect-testing-test_validation_on_mysql_fails",
|
||||||
|
mysql_credentials={
|
||||||
|
"host": ssh_tunnel.local_bind_address[0],
|
||||||
|
"port": ssh_tunnel.local_bind_address[1],
|
||||||
|
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
|
||||||
|
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
|
||||||
|
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
|
||||||
|
},
|
||||||
|
query=test_query,
|
||||||
|
expectation_configurations=test_expectations,
|
||||||
|
)
|
||||||
|
|
||||||
|
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
|
||||||
|
|
||||||
data_test_failed = validation_result["success"] == False
|
data_test_failed = validation_result["success"] == False
|
||||||
|
|
||||||
assert data_test_failed
|
assert data_test_failed
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue