Compare commits

..

13 commits

Author SHA1 Message Date
pablolola
b3d5d26f15
Merge pull request #16 from lolamarket/release/0.5.0
Release/0.5.0
2023-04-26 17:43:35 +02:00
Pablo Martin
054de6ab39 Update changelog 2023-04-26 09:43:01 +02:00
Pablo Martin
897145ac19 Bumped version 2023-04-26 09:42:30 +02:00
pablolola
ce6c879d99
Merge pull request #15 from lolamarket/feature/sql_transactions
Feature/sql transactions
2023-04-26 09:36:39 +02:00
Pablo Martin
52ad640766 Add example in readme. 2023-04-24 14:22:45 +02:00
Pablo Martin
58261b1a69 Update changelog 2023-04-24 13:56:30 +02:00
Pablo Martin
4ba1b4c007 Improve docstrings 2023-04-24 13:55:29 +02:00
Pablo Martin
9201c236af Typo. 2023-04-24 13:48:33 +02:00
Pablo Martin
467d70fcdc Docstrings and small name improvements 2023-04-24 13:44:11 +02:00
Pablo Martin
2090ec7c91 Refactor setup and teardown of connection and test table into a fixture. 2023-04-24 13:41:15 +02:00
Pablo Martin
47a3e8b1e7 Test working 2023-04-24 11:38:51 +02:00
Pablo Martin
5b53b71fd8 Testing plan 2023-04-21 12:37:37 +02:00
Pablo Martin
8f1f3d75e1 Implemented the tasks. 2023-04-21 12:22:32 +02:00
7 changed files with 279 additions and 241 deletions

View file

@ -2,6 +2,13 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [0.5.0] - 2023-04-26
### Added
- Added tasks `begin_sql_transaction` and `end_sql_transaction` to the `utils`module. These enable the management of SQL
transactions in flows. It also allows for dry running SQL statements.
## [0.4.0] - 2023-02-08 ## [0.4.0] - 2023-02-08
### Added ### Added

View file

@ -120,6 +120,31 @@ with Flow(...) as flow:
close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed]) close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed])
``` ```
**Use SQL transactions and dry running**
```python
from lolafect.connections import connect_to_mysql, close_mysql_connection
from lolafect.utils import begin_sql_transaction, end_sql_transaction
with Flow(...) as flow:
connection = connect_to_mysql(
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
)
transaction_started = begin_sql_transaction(connection)
task_result = some_task_that_needs_mysql(
connection=connection,
upstream_task=[transaction_started]
)
transaction_finished = end_sql_transaction(
connection,
dry_run=False, # True means rollback, False means commit changes
upstream_tasks=[task_result]
)
close_mysql_connection(connection=connection, upstream_tasks=[transaction_finished])
```
### Use Great Expectations ### Use Great Expectations
**Run a Great Expectations validation on a MySQL query** **Run a Great Expectations validation on a MySQL query**
@ -143,28 +168,6 @@ with Flow(...) as flow:
print("The data is bad!!!") print("The data is bad!!!")
``` ```
**Run a Great Expectations validation on a Trino query**
```python
from lolafect.data_testing import run_data_test_on_trino
with Flow(...) as flow:
my_query = """SELECT something FROM somewhere"""
my_expectations = {...} # A bunch of things you want to validate on the result of the query
validation_results = run_data_test_on_trino(
name="my-cool-validation",
trino_credentials={...},
query=my_query,
expectations=my_expectations
)
if not validation_results["success"]:
print("The data is bad!!!")
```
### Slack ### Slack
**Send a warning message to slack if your tasks fails** **Send a warning message to slack if your tasks fails**

View file

@ -1 +1 @@
__version__ = "0.4.0" __version__ = "0.5.0"

View file

@ -60,51 +60,6 @@ def run_data_test_on_mysql(
return results return results
@task()
def run_data_test_on_trino(
name: str,
trino_credentials: dict,
query: str,
expectation_configurations: List[ExpectationConfiguration],
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
) -> dict:
"""
Validate a query and an expectation suite against a given Trino server.
:param name: a unique name for the data test.
:param trino_credentials: credentials for the Trino cluster.
:param query: the query to test against.
:param expectation_configurations: the expectations on the dataset.
:param great_expectations_s3_bucket: the bucket where Great Expectations
files live.
:return: the result of the data test.
"""
logger = prefect.context.get("logger")
logger.info("Creating data context.")
data_context = _create_in_memory_data_context_for_trino(
trino_credentials, great_expectations_s3_bucket
)
logger.info("Data context created.")
logger.info("Creating expectation suite.")
data_context = _create_expectation_suite(
data_context, name, expectation_configurations
)
logger.info("Expectation suite created.")
logger.info("Creating checkpoint.")
data_context = _create_checkpoint(
data_context,
f"{trino_credentials['host']}:{trino_credentials['port']}",
query,
name,
)
logger.info("Checkpoint created.")
logger.info("Running checkpoint.")
results = data_context.run_checkpoint(f"{name}_checkpoint")
logger.info("Checkpoint finished.")
logger.info(f"Validation result: {results['success']}")
return results
def _create_in_memory_data_context_for_mysql( def _create_in_memory_data_context_for_mysql(
mysql_credentials: dict, mysql_credentials: dict,
@ -156,58 +111,6 @@ def _create_in_memory_data_context_for_mysql(
return data_context return data_context
def _create_in_memory_data_context_for_trino(
trino_credentials: dict,
great_expectations_s3_bucket: str,
) -> AbstractDataContext:
"""
Create a DataContext without a YAML config file and specify a Trino
datasource.
:param trino_credentials: the creds to the mysql where the query will be
executed.
:param great_expectations_s3_bucket: the name of the bucket where Great
Exepctations files while be stored.
:return: the data context.
"""
data_context = BaseDataContext(
project_config=DataContextConfig(
datasources={
f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": f"trino://%s:%s@%s:%s/%s/%s"
% (
trino_credentials["user"],
urlquote(trino_credentials["password"]),
trino_credentials["host"],
trino_credentials["port"],
trino_credentials["catalog"],
trino_credentials["schema"],
),
},
data_connectors={
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"name": "whole_table",
},
},
)
},
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=great_expectations_s3_bucket
),
)
)
return data_context
def _create_expectation_suite( def _create_expectation_suite(
data_context: AbstractDataContext, data_context: AbstractDataContext,

View file

@ -1,5 +1,8 @@
import json import json
from typing import Any
import prefect
from prefect import task
class S3FileReader: class S3FileReader:
""" """
@ -22,3 +25,41 @@ class S3FileReader:
.read() .read()
.decode("utf-8") .decode("utf-8")
) )
@task()
def begin_sql_transaction(connection: Any) -> None:
"""
Start a SQL transaction in the passed connection. The task is agnostic to
the SQL engine being used. As long as the connection object implements a
begin() method, this task will work.
:param connection: the connection to some database.
:return: None
"""
logger = prefect.context.get("logger")
logger.info(f"Starting SQL transaction with connection: {connection}.")
connection.begin()
@task()
def end_sql_transaction(connection: Any, dry_run: bool = False) -> None:
"""
Finish a SQL transaction, either by rolling it back or by committing it.
The task is agnostic to the SQL engine being used. As long as the
connection object implements a `commit` and a `rollback` method, this task
will work.
:param connection: the connection to some database.
:param dry_run: a flag indicating if persistence is desired. If dry_run
is True, changes will be rolledback. Otherwise, they will be committed.
:return: None
"""
logger = prefect.context.get("logger")
logger.info(f"Using connection: {connection}.")
if dry_run:
connection.rollback()
logger.info("Dry-run mode activated. Rolling back the transaction.")
else:
logger.info("Committing the transaction.")
connection.commit()

View file

@ -1,7 +1,7 @@
from great_expectations.core.expectation_configuration import ExpectationConfiguration from great_expectations.core.expectation_configuration import ExpectationConfiguration
from lolafect.lolaconfig import build_lolaconfig from lolafect.lolaconfig import build_lolaconfig
from lolafect.data_testing import run_data_test_on_mysql, run_data_test_on_trino from lolafect.data_testing import run_data_test_on_mysql
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
# __ __ _____ _ _ _____ _ _ _____ _ # __ __ _____ _ _ _____ _ _ _____ _
@ -18,139 +18,100 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
# are working properly. # are working properly.
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
#1 AS a_one,
# 'lol' AS a_string,
# NULL AS a_null
TEST_QUERY = """
SELECT *
from app_lm_mysql_pl.comprea.market
where id = 1
"""
TEST_EXPECTATIONS_THAT_FIT_DATA = [
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%lol%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_null",
kwargs={"column": "a_null"},
),
]
TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [ def test_validation_on_mysql_succeeds():
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%xD%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "a_null"},
),
]
# test_query = """
# def test_validation_on_mysql_succeeds(): SELECT 1 AS a_one,
# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( "lol" AS a_string,
# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, NULL AS a_null
# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, """
# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], test_expectations = [
# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], ExpectationConfiguration(
# ) expectation_type="expect_column_values_to_be_between",
# kwargs={"column": "a_one", "min_value": 1, "max_value": 1},
# validation_result = run_data_test_on_mysql.run( ),
# name="lolafect-testing-test_validation_on_mysql_succeeds", ExpectationConfiguration(
# mysql_credentials={ expectation_type="expect_column_values_to_match_like_pattern",
# "host": ssh_tunnel.local_bind_address[0], kwargs={"column": "a_string", "like_pattern": "%lol%"},
# "port": ssh_tunnel.local_bind_address[1], ),
# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], ExpectationConfiguration(
# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], expectation_type="expect_column_values_to_be_null",
# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], kwargs={"column": "a_null"},
# }, ),
# query=TEST_QUERY, ]
# expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
# )
#
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
# data_test_passed = validation_result["success"] == True
#
# assert data_test_passed
#
#
# def test_validation_on_mysql_fails():
# ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
# s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
# ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
# remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
# remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
# )
#
# validation_result = run_data_test_on_mysql.run(
# name="lolafect-testing-test_validation_on_mysql_fails",
# mysql_credentials={
# "host": ssh_tunnel.local_bind_address[0],
# "port": ssh_tunnel.local_bind_address[1],
# "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
# "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
# "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
# },
# query=TEST_QUERY,
# expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
# )
#
# closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
# data_test_failed = validation_result["success"] == False
#
# assert data_test_failed
#
def test_validation_on_trino_succeeds(): ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
validation_result = run_data_test_on_trino.run( s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
name="lolafect-testing-test_validation_on_mysql_fails", ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
trino_credentials={ remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
"host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
"catalog": "data_dw",
"schema": "sandbox"
},
query=TEST_QUERY,
expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
) )
print("###############\n" * 20)
print(validation_result) validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_succeeds",
mysql_credentials={
"host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
},
query=test_query,
expectation_configurations=test_expectations,
)
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
data_test_passed = validation_result["success"] == True data_test_passed = validation_result["success"] == True
assert data_test_passed assert data_test_passed
def test_validation_on_trino_fails(): def test_validation_on_mysql_fails():
validation_result = run_data_test_on_trino.run( test_query = """
name="lolafect-testing-test_validation_on_mysql_fails", SELECT 1 AS a_one,
trino_credentials={ "lol" AS a_string,
"host": TEST_LOLACONFIG.TRINO_CREDENTIALS["host"], NULL AS a_null
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], """
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], test_expectations = [
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], ExpectationConfiguration(
"catalog": "data_dw", expectation_type="expect_column_values_to_be_between",
"schema": "sandbox" kwargs={"column": "a_one", "min_value": 2, "max_value": 2},
}, ),
query=TEST_QUERY, ExpectationConfiguration(
expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "a_string", "like_pattern": "%xD%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "a_null"},
),
]
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
) )
validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_fails",
mysql_credentials={
"host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
},
query=test_query,
expectation_configurations=test_expectations,
)
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
data_test_failed = validation_result["success"] == False data_test_failed = validation_result["success"] == False
assert data_test_failed assert data_test_failed

View file

@ -0,0 +1,123 @@
import pytest
from lolafect.lolaconfig import build_lolaconfig
from lolafect.connections import (
open_ssh_tunnel_with_s3_pkey,
get_local_bind_address_from_ssh_tunnel,
close_ssh_tunnel,
connect_to_mysql,
close_mysql_connection,
)
from lolafect.utils import begin_sql_transaction, end_sql_transaction
# __ __ _____ _ _ _____ _ _ _____ _
# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| |
# \ \ /\ / / \ | |__) | \| | | | | \| | | __| |
# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | |
# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_|
# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_)
# This testing suite requires:
# - The calling shell to have permission in AWS
# - The calling shell to be within the Mercadão network
# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and
# rely external resources. Instead, use them manually to check yourself that things
# are working properly.
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
@pytest.fixture
def connection_with_test_table():
"""
Connects to DW, creates a test table in the sandbox env, and yields the
connection to the test.
After the test, the table is dropped and the connection is closed.
"""
test_local_bind_host = "127.0.0.1"
test_local_bind_port = 12345
tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
local_bind_host=test_local_bind_host,
local_bind_port=test_local_bind_port,
)
connection = connect_to_mysql.run(
mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS,
overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run(
tunnel=tunnel
),
)
cursor = connection.cursor()
cursor.execute("""
CREATE TABLE sandbox.lolafect_transaction_test_table
(
a_test_column INT
)
""")
# Connection and table ready for tests
yield connection # Test happens now
# Test finished, time to remove stuff and close connection
cursor.execute("""
DROP TABLE sandbox.lolafect_transaction_test_table
"""
)
close_mysql_connection.run(connection=connection)
close_ssh_tunnel.run(tunnel=tunnel)
def test_sql_transaction_persists_changes_properly(connection_with_test_table):
cursor = connection_with_test_table.cursor()
cursor.execute("""
SELECT a_test_column
FROM sandbox.lolafect_transaction_test_table
""")
table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False
begin_sql_transaction.run(connection=connection_with_test_table)
cursor.execute("""
INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column)
VALUES (1)
""")
end_sql_transaction.run(connection=connection_with_test_table, dry_run=False)
cursor.execute("""
SELECT a_test_column
FROM sandbox.lolafect_transaction_test_table
""")
table_has_a_record_after_commit = bool(cursor.fetchall()) # A non-empty tuple yields True
assert table_is_empty_at_first and table_has_a_record_after_commit
def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table):
cursor = connection_with_test_table.cursor()
cursor.execute("""
SELECT a_test_column
FROM sandbox.lolafect_transaction_test_table
""")
table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False
begin_sql_transaction.run(connection=connection_with_test_table)
cursor.execute("""
INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column)
VALUES (1)
""")
end_sql_transaction.run(connection=connection_with_test_table, dry_run=True)
cursor.execute("""
SELECT a_test_column
FROM sandbox.lolafect_transaction_test_table
""")
table_is_still_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False
assert table_is_empty_at_first and table_is_still_empty_after_rollback