diff --git a/README.md b/README.md index 134b28d..d9cbbf1 100644 --- a/README.md +++ b/README.md @@ -143,28 +143,6 @@ with Flow(...) as flow: print("The data is bad!!!") ``` -**Run a Great Expectations validation on a Trino query** - -```python -from lolafect.data_testing import run_data_test_on_trino - -with Flow(...) as flow: - - my_query = """SELECT something FROM somewhere""" - my_expectations = {...} # A bunch of things you want to validate on the result of the query - - validation_results = run_data_test_on_trino( - name="my-cool-validation", - trino_credentials={...}, - query=my_query, - expectations=my_expectations - ) - - if not validation_results["success"]: - print("The data is bad!!!") -``` - - ### Slack **Send a warning message to slack if your tasks fails** diff --git a/lolafect/data_testing.py b/lolafect/data_testing.py index 9e3e777..4216d4b 100644 --- a/lolafect/data_testing.py +++ b/lolafect/data_testing.py @@ -60,51 +60,6 @@ def run_data_test_on_mysql( return results -@task() -def run_data_test_on_trino( - name: str, - trino_credentials: dict, - query: str, - expectation_configurations: List[ExpectationConfiguration], - great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET, -) -> dict: - """ - Validate a query and an expectation suite against a given Trino server. - - :param name: a unique name for the data test. - :param trino_credentials: credentials for the Trino cluster. - :param query: the query to test against. - :param expectation_configurations: the expectations on the dataset. - :param great_expectations_s3_bucket: the bucket where Great Expectations - files live. - :return: the result of the data test. - """ - logger = prefect.context.get("logger") - - logger.info("Creating data context.") - data_context = _create_in_memory_data_context_for_trino( - trino_credentials, great_expectations_s3_bucket - ) - logger.info("Data context created.") - logger.info("Creating expectation suite.") - data_context = _create_expectation_suite( - data_context, name, expectation_configurations - ) - logger.info("Expectation suite created.") - logger.info("Creating checkpoint.") - data_context = _create_checkpoint( - data_context, - f"{trino_credentials['host']}:{trino_credentials['port']}", - query, - name, - ) - logger.info("Checkpoint created.") - logger.info("Running checkpoint.") - results = data_context.run_checkpoint(f"{name}_checkpoint") - logger.info("Checkpoint finished.") - logger.info(f"Validation result: {results['success']}") - - return results def _create_in_memory_data_context_for_mysql( mysql_credentials: dict, @@ -156,58 +111,6 @@ def _create_in_memory_data_context_for_mysql( return data_context -def _create_in_memory_data_context_for_trino( - trino_credentials: dict, - great_expectations_s3_bucket: str, -) -> AbstractDataContext: - """ - Create a DataContext without a YAML config file and specify a Trino - datasource. - - :param trino_credentials: the creds to the mysql where the query will be - executed. - :param great_expectations_s3_bucket: the name of the bucket where Great - Exepctations files while be stored. - :return: the data context. - """ - - data_context = BaseDataContext( - project_config=DataContextConfig( - datasources={ - f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig( - class_name="Datasource", - execution_engine={ - "class_name": "SqlAlchemyExecutionEngine", - "connection_string": f"trino://%s:%s@%s:%s/%s/%s" - % ( - trino_credentials["user"], - urlquote(trino_credentials["password"]), - trino_credentials["host"], - trino_credentials["port"], - trino_credentials["catalog"], - trino_credentials["schema"], - ), - }, - data_connectors={ - "default_runtime_data_connector_name": { - "class_name": "RuntimeDataConnector", - "batch_identifiers": ["default_identifier_name"], - }, - "default_inferred_data_connector_name": { - "class_name": "InferredAssetSqlDataConnector", - "name": "whole_table", - }, - }, - ) - }, - store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name=great_expectations_s3_bucket - ), - ) - ) - - return data_context - def _create_expectation_suite( data_context: AbstractDataContext, diff --git a/tests/test_integration/test_data_testing.py b/tests/test_integration/test_data_testing.py index a6c564a..8b456f2 100644 --- a/tests/test_integration/test_data_testing.py +++ b/tests/test_integration/test_data_testing.py @@ -1,7 +1,7 @@ from great_expectations.core.expectation_configuration import ExpectationConfiguration from lolafect.lolaconfig import build_lolaconfig -from lolafect.data_testing import run_data_test_on_mysql, run_data_test_on_trino +from lolafect.data_testing import run_data_test_on_mysql from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel # __ __ _____ _ _ _____ _ _ _____ _ @@ -18,139 +18,100 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel # are working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") -#1 AS a_one, -# 'lol' AS a_string, -# NULL AS a_null -TEST_QUERY = """ -SELECT * -from app_lm_mysql_pl.comprea.market -where id = 1 -""" -TEST_EXPECTATIONS_THAT_FIT_DATA = [ - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_between", - kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern": "%lol%"}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_null", - kwargs={"column": "a_null"}, - ), -] -TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [ - ExpectationConfiguration( - expectation_type="expect_column_values_to_be_between", - kwargs={"column": "a_one", "min_value": 2, "max_value": 2}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_match_like_pattern", - kwargs={"column": "a_string", "like_pattern": "%xD%"}, - ), - ExpectationConfiguration( - expectation_type="expect_column_values_to_not_be_null", - kwargs={"column": "a_null"}, - ), -] +def test_validation_on_mysql_succeeds(): -# -# def test_validation_on_mysql_succeeds(): -# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( -# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, -# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, -# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], -# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], -# ) -# -# validation_result = run_data_test_on_mysql.run( -# name="lolafect-testing-test_validation_on_mysql_succeeds", -# mysql_credentials={ -# "host": ssh_tunnel.local_bind_address[0], -# "port": ssh_tunnel.local_bind_address[1], -# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], -# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], -# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], -# }, -# query=TEST_QUERY, -# expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, -# ) -# -# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) -# -# data_test_passed = validation_result["success"] == True -# -# assert data_test_passed -# -# -# def test_validation_on_mysql_fails(): -# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( -# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, -# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, -# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], -# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], -# ) -# -# validation_result = run_data_test_on_mysql.run( -# name="lolafect-testing-test_validation_on_mysql_fails", -# mysql_credentials={ -# "host": ssh_tunnel.local_bind_address[0], -# "port": ssh_tunnel.local_bind_address[1], -# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], -# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], -# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], -# }, -# query=TEST_QUERY, -# expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, -# ) -# -# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) -# -# data_test_failed = validation_result["success"] == False -# -# assert data_test_failed -# + test_query = """ + SELECT 1 AS a_one, + "lol" AS a_string, + NULL AS a_null + """ + test_expectations = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern": "%lol%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "a_null"}, + ), + ] -def test_validation_on_trino_succeeds(): - validation_result = run_data_test_on_trino.run( - name="lolafect-testing-test_validation_on_mysql_fails", - trino_credentials={ - "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], - "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], - "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], - "catalog": "data_dw", - "schema": "sandbox" - }, - query=TEST_QUERY, - expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, + ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], ) - print("###############\n" * 20) - print(validation_result) + + validation_result = run_data_test_on_mysql.run( + name="lolafect-testing-test_validation_on_mysql_succeeds", + mysql_credentials={ + "host": ssh_tunnel.local_bind_address[0], + "port": ssh_tunnel.local_bind_address[1], + "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], + }, + query=test_query, + expectation_configurations=test_expectations, + ) + + closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) data_test_passed = validation_result["success"] == True assert data_test_passed -def test_validation_on_trino_fails(): - validation_result = run_data_test_on_trino.run( - name="lolafect-testing-test_validation_on_mysql_fails", - trino_credentials={ - "host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], - "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], - "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], - "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], - "catalog": "data_dw", - "schema": "sandbox" - }, - query=TEST_QUERY, - expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, +def test_validation_on_mysql_fails(): + test_query = """ + SELECT 1 AS a_one, + "lol" AS a_string, + NULL AS a_null + """ + test_expectations = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "a_one", "min_value": 2, "max_value": 2}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "a_string", "like_pattern": "%xD%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_not_be_null", + kwargs={"column": "a_null"}, + ), + ] + + ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], ) + validation_result = run_data_test_on_mysql.run( + name="lolafect-testing-test_validation_on_mysql_fails", + mysql_credentials={ + "host": ssh_tunnel.local_bind_address[0], + "port": ssh_tunnel.local_bind_address[1], + "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], + "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], + "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], + }, + query=test_query, + expectation_configurations=test_expectations, + ) + + closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) + data_test_failed = validation_result["success"] == False assert data_test_failed