Compare commits

...
Sign in to create a new pull request.

7 commits

Author SHA1 Message Date
Pablo Martin
d9c131dcc6 Unfinished work 2023-04-20 15:36:16 +02:00
Pablo Martin
5066fe4382 Use single quotes so that query works both in MySQL and Trino. 2023-03-30 13:58:15 +02:00
Pablo Martin
19211eee73 Naive duplication of MySQL elements but for Trino. Not working yet. 2023-03-29 17:28:16 +02:00
Pablo Martin
5b97864e8d Signature for new task 2023-03-29 17:18:41 +02:00
Pablo Martin
814a376e02 Add entry in readme showcasing new trino feature 2023-03-29 17:18:14 +02:00
Pablo Martin
b51c810a39 Add tests for trino feature 2023-03-29 17:18:00 +02:00
Pablo Martin
c3507cccec Refactor shared elements to follow DRY 2023-03-29 17:17:44 +02:00
3 changed files with 237 additions and 79 deletions

View file

@ -143,6 +143,28 @@ with Flow(...) as flow:
print("The data is bad!!!") print("The data is bad!!!")
``` ```
**Run a Great Expectations validation on a Trino query**
```python
from lolafect.data_testing import run_data_test_on_trino
with Flow(...) as flow:
my_query = """SELECT something FROM somewhere"""
my_expectations = {...} # A bunch of things you want to validate on the result of the query
validation_results = run_data_test_on_trino(
name="my-cool-validation",
trino_credentials={...},
query=my_query,
expectations=my_expectations
)
if not validation_results["success"]:
print("The data is bad!!!")
```
### Slack ### Slack
**Send a warning message to slack if your tasks fails** **Send a warning message to slack if your tasks fails**

View file

@ -60,6 +60,51 @@ def run_data_test_on_mysql(
return results return results
@task()
def run_data_test_on_trino(
name: str,
trino_credentials: dict,
query: str,
expectation_configurations: List[ExpectationConfiguration],
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
) -> dict:
"""
Validate a query and an expectation suite against a given Trino server.
:param name: a unique name for the data test.
:param trino_credentials: credentials for the Trino cluster.
:param query: the query to test against.
:param expectation_configurations: the expectations on the dataset.
:param great_expectations_s3_bucket: the bucket where Great Expectations
files live.
:return: the result of the data test.
"""
logger = prefect.context.get("logger")
logger.info("Creating data context.")
data_context = _create_in_memory_data_context_for_trino(
trino_credentials, great_expectations_s3_bucket
)
logger.info("Data context created.")
logger.info("Creating expectation suite.")
data_context = _create_expectation_suite(
data_context, name, expectation_configurations
)
logger.info("Expectation suite created.")
logger.info("Creating checkpoint.")
data_context = _create_checkpoint(
data_context,
f"{trino_credentials['host']}:{trino_credentials['port']}",
query,
name,
)
logger.info("Checkpoint created.")
logger.info("Running checkpoint.")
results = data_context.run_checkpoint(f"{name}_checkpoint")
logger.info("Checkpoint finished.")
logger.info(f"Validation result: {results['success']}")
return results
def _create_in_memory_data_context_for_mysql( def _create_in_memory_data_context_for_mysql(
mysql_credentials: dict, mysql_credentials: dict,
@ -111,6 +156,58 @@ def _create_in_memory_data_context_for_mysql(
return data_context return data_context
def _create_in_memory_data_context_for_trino(
trino_credentials: dict,
great_expectations_s3_bucket: str,
) -> AbstractDataContext:
"""
Create a DataContext without a YAML config file and specify a Trino
datasource.
:param trino_credentials: the creds to the mysql where the query will be
executed.
:param great_expectations_s3_bucket: the name of the bucket where Great
Exepctations files while be stored.
:return: the data context.
"""
data_context = BaseDataContext(
project_config=DataContextConfig(
datasources={
f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": f"trino://%s:%s@%s:%s/%s/%s"
% (
trino_credentials["user"],
urlquote(trino_credentials["password"]),
trino_credentials["host"],
trino_credentials["port"],
trino_credentials["catalog"],
trino_credentials["schema"],
),
},
data_connectors={
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"name": "whole_table",
},
},
)
},
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=great_expectations_s3_bucket
),
)
)
return data_context
def _create_expectation_suite( def _create_expectation_suite(
data_context: AbstractDataContext, data_context: AbstractDataContext,

View file

@ -1,7 +1,7 @@
from great_expectations.core.expectation_configuration import ExpectationConfiguration from great_expectations.core.expectation_configuration import ExpectationConfiguration
from lolafect.lolaconfig import build_lolaconfig from lolafect.lolaconfig import build_lolaconfig
from lolafect.data_testing import run_data_test_on_mysql from lolafect.data_testing import run_data_test_on_mysql, run_data_test_on_trino
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
# __ __ _____ _ _ _____ _ _ _____ _ # __ __ _____ _ _ _____ _ _ _____ _
@ -18,100 +18,139 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
# are working properly. # are working properly.
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
#1 AS a_one,
# 'lol' AS a_string,
# NULL AS a_null
TEST_QUERY = """
SELECT *
from app_lm_mysql_pl.comprea.market
where id = 1
"""
TEST_EXPECTATIONS_THAT_FIT_DATA = [
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%lol%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_null",
kwargs={"column": "a_null"},
),
]
def test_validation_on_mysql_succeeds(): TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%xD%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "a_null"},
),
]
test_query = """ #
SELECT 1 AS a_one, # def test_validation_on_mysql_succeeds():
"lol" AS a_string, # ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
NULL AS a_null # s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
""" # ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
test_expectations = [ # remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
ExpectationConfiguration( # remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
expectation_type="expect_column_values_to_be_between", # )
kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, #
), # validation_result = run_data_test_on_mysql.run(
ExpectationConfiguration( # name="lolafect-testing-test_validation_on_mysql_succeeds",
expectation_type="expect_column_values_to_match_like_pattern", # mysql_credentials={
kwargs={"column": "a_string", "like_pattern": "%lol%"}, # "host": ssh_tunnel.local_bind_address[0],
), # "port": ssh_tunnel.local_bind_address[1],
ExpectationConfiguration( # "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
expectation_type="expect_column_values_to_be_null", # "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
kwargs={"column": "a_null"}, # "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
), # },
] # query=TEST_QUERY,
# expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
# )
#
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
# data_test_passed = validation_result["success"] == True
#
# assert data_test_passed
#
#
# def test_validation_on_mysql_fails():
# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
# )
#
# validation_result = run_data_test_on_mysql.run(
# name="lolafect-testing-test_validation_on_mysql_fails",
# mysql_credentials={
# "host": ssh_tunnel.local_bind_address[0],
# "port": ssh_tunnel.local_bind_address[1],
# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
# },
# query=TEST_QUERY,
# expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
# )
#
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
# data_test_failed = validation_result["success"] == False
#
# assert data_test_failed
#
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( def test_validation_on_trino_succeeds():
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, validation_result = run_data_test_on_trino.run(
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, name="lolafect-testing-test_validation_on_mysql_fails",
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], trino_credentials={
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"],
) "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
validation_result = run_data_test_on_mysql.run( "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
name="lolafect-testing-test_validation_on_mysql_succeeds", "catalog": "data_dw",
mysql_credentials={ "schema": "sandbox"
"host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
}, },
query=test_query, query=TEST_QUERY,
expectation_configurations=test_expectations, expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
) )
print("###############\n" * 20)
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) print(validation_result)
data_test_passed = validation_result["success"] == True data_test_passed = validation_result["success"] == True
assert data_test_passed assert data_test_passed
def test_validation_on_mysql_fails(): def test_validation_on_trino_fails():
test_query = """ validation_result = run_data_test_on_trino.run(
SELECT 1 AS a_one,
"lol" AS a_string,
NULL AS a_null
"""
test_expectations = [
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%xD%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "a_null"},
),
]
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
)
validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_fails", name="lolafect-testing-test_validation_on_mysql_fails",
mysql_credentials={ trino_credentials={
"host": ssh_tunnel.local_bind_address[0], "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"],
"port": ssh_tunnel.local_bind_address[1], "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], "catalog": "data_dw",
"schema": "sandbox"
}, },
query=test_query, query=TEST_QUERY,
expectation_configurations=test_expectations, expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
) )
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
data_test_failed = validation_result["success"] == False data_test_failed = validation_result["success"] == False
assert data_test_failed assert data_test_failed