From 106ab3da562f59e549cbe7e1e78bcf55cfac07a1 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 12:21:45 +0100 Subject: [PATCH 01/19] Readme --- docs/gallery/data_testing/README.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 docs/gallery/data_testing/README.md diff --git a/docs/gallery/data_testing/README.md b/docs/gallery/data_testing/README.md new file mode 100644 index 0000000..2926488 --- /dev/null +++ b/docs/gallery/data_testing/README.md @@ -0,0 +1,4 @@ +# Data Testing Gallery + +In this folder, you can find a sample flow project that showcases how you can +do data testing with Lolafect. \ No newline at end of file From 03ca1d009fe8bafbd5fe9caf8d282cf2867b3118 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 12:39:35 +0100 Subject: [PATCH 02/19] Script of gallery flow --- .../gallery/data_testing/data_testing_flow.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 docs/gallery/data_testing/data_testing_flow.py diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py new file mode 100644 index 0000000..7e5dd7f --- /dev/null +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -0,0 +1,30 @@ +### INTRO + +# This is an example flow to showcase data testing. + +# The flow is packed with comments to guide you through what's happening. +# The flow is also runnable. To run it: +# - Make a virtual environment with the requirements.txt that live in the same +# folder as this script. +# - Start a shell, activate the venv, login to AWS and turn on your Mercadão +# VPN. +# - In the shell, run the command: TODO +# +# Note: this flow is designed to run in your laptop. It won't work in the +# prefect server. Don't bother uploading it. + +# The flow connects to DW and makes a silly check on a silly query. You can use +# it as a reference on how to set up a data test in your serious flows. + + +### IMPORTS + +# TODO + +### TASK PREP + +# TODO + +### FLOW + +# TODO \ No newline at end of file From 36fffd3790f517b0edbb654e5ce12ed029c582a8 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 12:41:38 +0100 Subject: [PATCH 03/19] Reference in root readme --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index f5b5876..d9cbbf1 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,13 @@ with Flow(...) as flow: ) ``` +## Gallery + +This repo also contains a gallery of example flows that you can user to better +understand `lolafect` or as templates to kickstart your own flows. You can +find these in `docs/gallery`. + + ## How to test There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our From 19b1e447ff81384aae34a94a5417b6d91eaeaab0 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 17:08:54 +0100 Subject: [PATCH 04/19] Quite a bit of the flow --- docs/gallery/data_testing/README.md | 5 +- .../gallery/data_testing/data_testing_flow.py | 126 +++++++++++++++++- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/docs/gallery/data_testing/README.md b/docs/gallery/data_testing/README.md index 2926488..a1de4b0 100644 --- a/docs/gallery/data_testing/README.md +++ b/docs/gallery/data_testing/README.md @@ -1,4 +1,7 @@ # Data Testing Gallery In this folder, you can find a sample flow project that showcases how you can -do data testing with Lolafect. \ No newline at end of file +do data testing with Lolafect. + +You can also take a look at our GE best practices and guidelines +[here](https://pdofonte.atlassian.net/wiki/spaces/DATA/pages/2484797445/Usage+Guidelines+and+Best+Practices). \ No newline at end of file diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 7e5dd7f..38b771e 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -19,12 +19,130 @@ ### IMPORTS -# TODO +from prefect import Flow, task +from prefect.run_configs import KubernetesRun -### TASK PREP +# ↑↑↑ Standard prefect stuff for the flow. + +from great_expectations.core.expectation_configuration import ExpectationConfiguration + +# ↑↑↑ ExpectationConfiguration is the class that allows us to define a single +# expectation. We use it once for every expectation we define. + +from lolafect.lolaconfig import build_lolaconfig + +# ↑↑↑ Usual lolaconfig import to get all the env data. +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel + close_ssh_tunnel, # ←←← Which we will have to close +) +from lolafect.data_testing import ( + run_data_test_on_mysql, +) # ←←← The task to run a data test + +### PREP + +LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations") +# ↑↑↑ Get env from S3 and prepare everything related to it + + +DATA_TEST_NAME = "gallery-example-test" +# ↑↑↑ Our data test must have a name. We will need this if we want to look for +# its logs in S3. + +DATA_TEST_QUERY = """ + SELECT "hi" AS some_string, + 1 AS some_number, + NULL as some_null +""" +# ↑↑↑ Our query defines what data do we want to test. This is a silly select +# with hardcoded values because this is a demo, but in a real environment, you +# most probably will want to have a common SELECT [...] FROM [...] query that +# fetches the data your want to test. + +DATA_TEST_EXPECTATIONS = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "some_string", "like_pattern": "%hi%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "some_number", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "some_null"}, + ), +] + +# ↑↑↑ Our expectations define what data should be like. Each expectation is +# defined with a call to ExpectationConfiguration. You can check a reference +# of available expectations and how to call them here: +# https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html + + +@task +def fetch_tunnel_host_and_port(ssh_tunnel): + host = ssh_tunnel.local_bind_address[0] + port = ssh_tunnel.local_bind_address[1] + + return host, port -# TODO ### FLOW -# TODO \ No newline at end of file +with Flow( + LOLACONFIG.FLOW_NAME_UDCS, + storage=LOLACONFIG.STORAGE, + run_config=KubernetesRun( + labels=LOLACONFIG.KUBERNETES_LABELS, + image=LOLACONFIG.KUBERNETES_IMAGE, + ), +) as flow: + + ssh_tunnel = open_ssh_tunnel_with_s3_pkey( + s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"], + ) + # ↑↑↑ We open an SSH tunnel pointing to DW + + # ↓↓↓ This is where we actually run the data test. The result of the test + # gets stored in data_test_result. + data_test_result = run_data_test_on_mysql.run( + name=DATA_TEST_NAME, # ←←← The name we set earlier + # ↓↓↓ The credentials to the MySQL where the data lives. We pass the + # ssh tunnel host and port instead of the true MySQL because we want + # to pass through the tunnel. If it was a direct connection, we would + # simply use the MySQL true host and port. + mysql_credentials={ + "host": fetch_tunnel_host_and_port(ssh_tunnel)[0], + "port": fetch_tunnel_host_and_port(ssh_tunnel)[1], + "user": LOLACONFIG.DW_CREDENTIALS["user"], + "password": LOLACONFIG.DW_CREDENTIALS["password"], + "db": "sandbox", # ←←← We always need to pass a default db, but it + # is recommended to always specify your schemas + }, # in the queries regardless. + query=DATA_TEST_QUERY, # ←←← The query we set earlier + expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem + upstream_tasks=[ssh_tunnel] # ←←← We must wait for the tunnel to be ready + ) + + # ↑↑↑ will take care of everything: connecting to S3 and DW, generate all + # the necessary configurations, run the actual test and store results both + # in memory and in S3. + # + # What to do from here is up to you. You can easily check if the test + # passed or not by accessing data_test_result["success"]. If it equals + # True, the test passed. If it equals False, at least one expectation + # failed. + # + # The following snippets are optional. You should judge if you want to do + # something similar or not in your flow based on your needs. + + tunnel_closed = close_ssh_tunnel(ssh_tunnel) + + + +# TODO From 0c3243c9cea298571d940eac9604c9674e52412b Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 17:28:00 +0100 Subject: [PATCH 05/19] Fixed the flow, now it runs. --- docs/gallery/data_testing/data_testing_flow.py | 7 +++---- docs/gallery/data_testing/requirements.txt | 4 ++++ 2 files changed, 7 insertions(+), 4 deletions(-) create mode 100644 docs/gallery/data_testing/requirements.txt diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 38b771e..dd39164 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -110,7 +110,7 @@ with Flow( # ↓↓↓ This is where we actually run the data test. The result of the test # gets stored in data_test_result. - data_test_result = run_data_test_on_mysql.run( + data_test_result = run_data_test_on_mysql( name=DATA_TEST_NAME, # ←←← The name we set earlier # ↓↓↓ The credentials to the MySQL where the data lives. We pass the # ssh tunnel host and port instead of the true MySQL because we want @@ -126,7 +126,7 @@ with Flow( }, # in the queries regardless. query=DATA_TEST_QUERY, # ←←← The query we set earlier expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem - upstream_tasks=[ssh_tunnel] # ←←← We must wait for the tunnel to be ready + upstream_tasks=[ssh_tunnel], # ←←← We must wait for the tunnel to be ready ) # ↑↑↑ will take care of everything: connecting to S3 and DW, generate all @@ -141,8 +141,7 @@ with Flow( # The following snippets are optional. You should judge if you want to do # something similar or not in your flow based on your needs. - tunnel_closed = close_ssh_tunnel(ssh_tunnel) - + tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[data_test_result]) # TODO diff --git a/docs/gallery/data_testing/requirements.txt b/docs/gallery/data_testing/requirements.txt new file mode 100644 index 0000000..245292b --- /dev/null +++ b/docs/gallery/data_testing/requirements.txt @@ -0,0 +1,4 @@ +prefect==1.2.2 +great_expectations==0.15.45 +SQLAlchemy==1.4.46 +lolafect==0.4.0 \ No newline at end of file From c30e2778e8e356e88161813f233f8b41bf0dacd8 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 7 Mar 2023 16:39:42 +0100 Subject: [PATCH 06/19] Completed examples in flow. --- .../gallery/data_testing/data_testing_flow.py | 86 ++++++++++++++++++- 1 file changed, 82 insertions(+), 4 deletions(-) diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index dd39164..3e39139 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -19,7 +19,7 @@ ### IMPORTS -from prefect import Flow, task +from prefect import Flow, task, case from prefect.run_configs import KubernetesRun # ↑↑↑ Standard prefect stuff for the flow. @@ -35,14 +35,19 @@ from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel close_ssh_tunnel, # ←←← Which we will have to close + connect_to_mysql, # ←←← For quarantine purposes + close_mysql_connection, # ←←← To close the previous connection ) +from lolafect.slack import SendSlackMessageTask + +# ↑↑↑ The task class to send slack messages. from lolafect.data_testing import ( run_data_test_on_mysql, ) # ←←← The task to run a data test ### PREP -LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations") +LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo") # ↑↑↑ Get env from S3 and prepare everything related to it @@ -89,6 +94,37 @@ def fetch_tunnel_host_and_port(ssh_tunnel): return host, port +# ↑↑↑ A small helper function to get the host and the port where the SSH +# tunnel is listening inside this host. + + +@task +def fail(exception, message): + raise exception(message) + + +# ↑↑↑ A small helper function to cause a task failure with a custom message. + + +@task +def quarantine_failed_data(connection, query_to_get_quarantine_data): + cursor = connection.cursor() + + cursor.execute( + f""" + CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS + {query_to_get_quarantine_data} + """ + ) + # ↑↑↑ This query will store the faulty data in a quarantine schema in DW + # It creates a new table on each run, and uses the flow name and the current time + # to give the table a unique and informative name. + + +send_slack_message = SendSlackMessageTask() +# ↑↑↑ Simply making an instance of the task class. send_slack_message will be +# the task we use in the flow. + ### FLOW with Flow( @@ -141,7 +177,49 @@ with Flow( # The following snippets are optional. You should judge if you want to do # something similar or not in your flow based on your needs. - tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[data_test_result]) + ### RAISING AN EXCEPTION + # When a test with run_data_test_on_mysql fails, it's important that you + # keep in mind that this will not cause a failure, in the sense of a + # prefect task failing. This is intentional: we didn't want to assume that + # a failing data test always translates into a failed flow. + # + # Nevertheless, it might be the case that you want your flow to fail if + # the data test didn't pass. To do so, you can use a simple helper task and + # a case, just like this (uncomment the lines if you want to cause the + # failure): + # with case(data_test_result["success"], False): + # fail(ValueError, "Woops, the test didn't pass.") -# TODO + ### SENDING A SLACK WARNING + # You might also want to send a slack message to a channel if the test + # does not pass. You can do so like this: + + with case(data_test_result["success"], False): + send_slack_message( + LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL + "Uh oh, the demo flow failed.", # ←←← Your warning message + ) + + ### QUARANTINE THE TESTED DATA + # Another common practice is to store the data that doesn't pass a test. + # This provides a lot of benefits that are discussed in our best practices + # docs in Confluence. Here is an example of you can quarantine the data + # that made your test fail: + + with case(data_test_result["success"], False): + dw_connection = connect_to_mysql( + mysql_credentials=LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), + ) + # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we + # override DWs host and port and instead use the listening ones from + # the tunnel. + + quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) + # ↑↑↑ We call the quarantine_failed_data task. You can review the + # actions of this task in the definition that appears earlier in this + # file. + + mysql_closed = close_mysql_connection(dw_connection) + tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed]) From ea5edc5cda29377130c3cbf64c83813a9ba96883 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 7 Mar 2023 17:09:46 +0100 Subject: [PATCH 07/19] More bits and pieces. --- .../gallery/data_testing/data_testing_flow.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 3e39139..62134a0 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -2,6 +2,17 @@ # This is an example flow to showcase data testing. +# There are several ways you can use this: +# 1. If you simply want to copy paste useful recipes... the flow is yours. +# 2. If you want to learn, I would advice: +# - Skim through the whole script. +# - Now read the flow block carefully and refer to other parts of the +# script when needed. +# - Try to run the flow as-is. It should succeed. +# - Try to intentionally break the data test by changing the data or the +# expectations. + +### HOW TO RUN # The flow is packed with comments to guide you through what's happening. # The flow is also runnable. To run it: # - Make a virtual environment with the requirements.txt that live in the same @@ -19,6 +30,8 @@ ### IMPORTS +import datetime + from prefect import Flow, task, case from prefect.run_configs import KubernetesRun @@ -185,11 +198,10 @@ with Flow( # # Nevertheless, it might be the case that you want your flow to fail if # the data test didn't pass. To do so, you can use a simple helper task and - # a case, just like this (uncomment the lines if you want to cause the - # failure): + # a case, just like this: - # with case(data_test_result["success"], False): - # fail(ValueError, "Woops, the test didn't pass.") + with case(data_test_result["success"], False): + fail(ValueError, "Woops, the test didn't pass.") ### SENDING A SLACK WARNING # You might also want to send a slack message to a channel if the test @@ -207,15 +219,16 @@ with Flow( # docs in Confluence. Here is an example of you can quarantine the data # that made your test fail: - with case(data_test_result["success"], False): - dw_connection = connect_to_mysql( - mysql_credentials=LOLACONFIG.DW_CREDENTIALS, - overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), - ) - # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we - # override DWs host and port and instead use the listening ones from - # the tunnel. + dw_connection = connect_to_mysql( + mysql_credentials=LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), + upstream_tasks=[data_test_result], + ) + # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we + # override DWs host and port and instead use the listening ones from + # the tunnel. + with case(data_test_result["success"], False): quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) # ↑↑↑ We call the quarantine_failed_data task. You can review the # actions of this task in the definition that appears earlier in this From 088e2069bbf2e3bc6b00bd32ad5fcc4b71b70f04 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 7 Mar 2023 17:11:19 +0100 Subject: [PATCH 08/19] Finish details. --- docs/gallery/data_testing/data_testing_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 62134a0..683237c 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -19,7 +19,7 @@ # folder as this script. # - Start a shell, activate the venv, login to AWS and turn on your Mercadão # VPN. -# - In the shell, run the command: TODO +# - In the shell, run the command: prefect run -p docs/gallery/data_testing/data_testing_flow.py # # Note: this flow is designed to run in your laptop. It won't work in the # prefect server. Don't bother uploading it. From 8f1f3d75e1b0772d1ef5f0b7ea5b570d21f37754 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 21 Apr 2023 12:22:32 +0200 Subject: [PATCH 09/19] Implemented the tasks. --- lolafect/utils.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/lolafect/utils.py b/lolafect/utils.py index 621e59d..608f27b 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -1,5 +1,8 @@ import json +from typing import Any +import prefect +from prefect import task class S3FileReader: """ @@ -22,3 +25,38 @@ class S3FileReader: .read() .decode("utf-8") ) + +@task() +def begin_sql_transaction(connection: Any) -> None: + """ + Start a SQL transaction in the passed connection. The task is agnostic to + the SQL engine being used. As long as it implements a begin() method, this + will work. + + :param connection: the connection to some database. + :return: None + """ + logger = prefect.context.get("logger") + logger.info(f"Starting SQL transaction with connection: {connection}.") + connection.begin() + + +@task() +def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: + """ + Finish a SQL transaction, either by rolling it back or by committing it. + + :param connection: the connection to some database. + :param dry_run: a flag indicating if persistence is desired. If dry_run + is True, changes will be rollbacked. + :return: None + """ + logger = prefect.context.get("logger") + logger.info(f"Using connection: {connection}.") + + if dry_run: + connection.rollback() + logger.info("Dry-run mode activated. Rolling back the transaction.") + else: + logger.info("Committing the transaction.") + connection.commit() From 5b53b71fd87a8315195076e65cbbb65afe503a11 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 21 Apr 2023 12:37:37 +0200 Subject: [PATCH 10/19] Testing plan --- tests/test_integration/test_utils.py | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/test_integration/test_utils.py diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py new file mode 100644 index 0000000..4b33844 --- /dev/null +++ b/tests/test_integration/test_utils.py @@ -0,0 +1,41 @@ +from lolafect.lolaconfig import build_lolaconfig + +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. + + +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + + +def test_sql_transaction_persists_changes_properly(): + # Connect + # Create table in Sandbox + # Check that table is empty + # Start transaction + # Insert value in table + # Commit transaction + # Check that value is there + # assert that the table was initially empty and afterwards it got a record + pass + + +def test_sql_transaction_rollbacks_changes_properly(): + # Connect + # Create table in Sandbox + # Check that table is empty + # Start transaction + # Insert value in table + # Commit transaction + # Check that the table is still empty + # assert that the table was initially empty and afterwards as well + pass \ No newline at end of file From 47a3e8b1e755428a44a7277b536f1f0ea316ddab Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 11:38:51 +0200 Subject: [PATCH 11/19] Test working --- tests/test_integration/test_utils.py | 134 +++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 18 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index 4b33844..ba6023a 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -1,4 +1,12 @@ from lolafect.lolaconfig import build_lolaconfig +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, + get_local_bind_address_from_ssh_tunnel, + close_ssh_tunnel, + connect_to_mysql, + close_mysql_connection, +) +from lolafect.utils import begin_sql_transaction, end_sql_transaction # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -18,24 +26,114 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") def test_sql_transaction_persists_changes_properly(): - # Connect - # Create table in Sandbox - # Check that table is empty - # Start transaction - # Insert value in table - # Commit transaction - # Check that value is there - # assert that the table was initially empty and afterwards it got a record - pass + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, + ) + + connection = connect_to_mysql.run( + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel + ), + ) + + cursor = connection.cursor() + cursor.execute(""" + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False + + begin_sql_transaction.run(connection=connection) + cursor.execute(""" + INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) + VALUES (1) + """) + end_sql_transaction.run(connection=connection, dry_run=False) + + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + assert table_is_empty_at_first and table_has_a_record_after_transaction def test_sql_transaction_rollbacks_changes_properly(): - # Connect - # Create table in Sandbox - # Check that table is empty - # Start transaction - # Insert value in table - # Commit transaction - # Check that the table is still empty - # assert that the table was initially empty and afterwards as well - pass \ No newline at end of file + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, + ) + + connection = connect_to_mysql.run( + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel + ), + ) + + cursor = connection.cursor() + cursor.execute(""" + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False + + begin_sql_transaction.run(connection=connection) + cursor.execute(""" + INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) + VALUES (1) + """) + end_sql_transaction.run(connection=connection, dry_run=True) + + cursor.execute(""" + SELECT a_test_column + FROM sandbox.lolafect_transaction_test_table + """) + table_is_empty_after_rollback = not bool(cursor.fetchall()) # An tuple yields False + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + assert table_is_empty_at_first and table_is_empty_after_rollback From 2090ec7c916e08eeefdab22a3d5e33e59e772882 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:41:15 +0200 Subject: [PATCH 12/19] Refactor setup and teardown of connection and test table into a fixture. --- tests/test_integration/test_utils.py | 86 +++++++++++----------------- 1 file changed, 32 insertions(+), 54 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index ba6023a..e195626 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -1,3 +1,5 @@ +import pytest + from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, @@ -25,7 +27,8 @@ from lolafect.utils import begin_sql_transaction, end_sql_transaction TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") -def test_sql_transaction_persists_changes_properly(): +@pytest.fixture +def connection_with_test_table(): test_local_bind_host = "127.0.0.1" test_local_bind_port = 12345 @@ -44,26 +47,41 @@ def test_sql_transaction_persists_changes_properly(): tunnel=tunnel ), ) - cursor = connection.cursor() cursor.execute(""" - CREATE TABLE sandbox.lolafect_transaction_test_table - ( - a_test_column INT - ) - """) + CREATE TABLE sandbox.lolafect_transaction_test_table + ( + a_test_column INT + ) + """) + + # Connection and table ready for tests + yield connection # Test happens now + # Test finished, time to remove stuff and close connection + + cursor.execute(""" + DROP TABLE sandbox.lolafect_transaction_test_table + """ + ) + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + +def test_sql_transaction_persists_changes_properly(connection_with_test_table): + cursor = connection_with_test_table.cursor() + cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False - begin_sql_transaction.run(connection=connection) + begin_sql_transaction.run(connection=connection_with_test_table) cursor.execute(""" INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) VALUES (1) """) - end_sql_transaction.run(connection=connection, dry_run=False) + end_sql_transaction.run(connection=connection_with_test_table, dry_run=False) cursor.execute(""" SELECT a_test_column @@ -71,69 +89,29 @@ def test_sql_transaction_persists_changes_properly(): """) table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True - cursor.execute(""" - DROP TABLE sandbox.lolafect_transaction_test_table - """ - ) - - close_mysql_connection.run(connection=connection) - close_ssh_tunnel.run(tunnel=tunnel) - assert table_is_empty_at_first and table_has_a_record_after_transaction -def test_sql_transaction_rollbacks_changes_properly(): - test_local_bind_host = "127.0.0.1" - test_local_bind_port = 12345 +def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): + cursor = connection_with_test_table.cursor() - tunnel = open_ssh_tunnel_with_s3_pkey.run( - s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, - ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, - remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], - remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], - local_bind_host=test_local_bind_host, - local_bind_port=test_local_bind_port, - ) - - connection = connect_to_mysql.run( - mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, - overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( - tunnel=tunnel - ), - ) - - cursor = connection.cursor() - cursor.execute(""" - CREATE TABLE sandbox.lolafect_transaction_test_table - ( - a_test_column INT - ) - """) cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) table_is_empty_at_first = not bool(cursor.fetchall()) # An empty tuple yields False - begin_sql_transaction.run(connection=connection) + begin_sql_transaction.run(connection=connection_with_test_table) cursor.execute(""" INSERT INTO sandbox.lolafect_transaction_test_table (a_test_column) VALUES (1) """) - end_sql_transaction.run(connection=connection, dry_run=True) + end_sql_transaction.run(connection=connection_with_test_table, dry_run=True) cursor.execute(""" SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_is_empty_after_rollback = not bool(cursor.fetchall()) # An tuple yields False - - cursor.execute(""" - DROP TABLE sandbox.lolafect_transaction_test_table - """ - ) - - close_mysql_connection.run(connection=connection) - close_ssh_tunnel.run(tunnel=tunnel) + table_is_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False assert table_is_empty_at_first and table_is_empty_after_rollback From 467d70fcdc0744343a573b2e715da8de26fb6fc4 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:44:11 +0200 Subject: [PATCH 13/19] Docstrings and small name improvements --- tests/test_integration/test_utils.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_utils.py b/tests/test_integration/test_utils.py index e195626..1ce5f85 100644 --- a/tests/test_integration/test_utils.py +++ b/tests/test_integration/test_utils.py @@ -29,6 +29,12 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") @pytest.fixture def connection_with_test_table(): + """ + Connects to DW, creates a test table in the sandbox env, and yields the + connection to the test. + + After the test, the table is dropped and the connection is closed. + """ test_local_bind_host = "127.0.0.1" test_local_bind_port = 12345 @@ -87,9 +93,9 @@ def test_sql_transaction_persists_changes_properly(connection_with_test_table): SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_has_a_record_after_transaction = bool(cursor.fetchall()) # A non-empty tuple yields True + table_has_a_record_after_commit = bool(cursor.fetchall()) # A non-empty tuple yields True - assert table_is_empty_at_first and table_has_a_record_after_transaction + assert table_is_empty_at_first and table_has_a_record_after_commit def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): @@ -112,6 +118,6 @@ def test_sql_transaction_rollbacks_changes_properly(connection_with_test_table): SELECT a_test_column FROM sandbox.lolafect_transaction_test_table """) - table_is_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False + table_is_still_empty_after_rollback = not bool(cursor.fetchall()) # A tuple yields False - assert table_is_empty_at_first and table_is_empty_after_rollback + assert table_is_empty_at_first and table_is_still_empty_after_rollback From 9201c236af9302026aa480ef13d69a783860f316 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:48:33 +0200 Subject: [PATCH 14/19] Typo. --- lolafect/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/utils.py b/lolafect/utils.py index 608f27b..2aa82a0 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -48,7 +48,7 @@ def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: :param connection: the connection to some database. :param dry_run: a flag indicating if persistence is desired. If dry_run - is True, changes will be rollbacked. + is True, changes will be rolledback. :return: None """ logger = prefect.context.get("logger") From 4ba1b4c007e54bcf09c0a71fd9a39c5a9bded9e1 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:55:29 +0200 Subject: [PATCH 15/19] Improve docstrings --- lolafect/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lolafect/utils.py b/lolafect/utils.py index 2aa82a0..2b7e460 100644 --- a/lolafect/utils.py +++ b/lolafect/utils.py @@ -30,8 +30,8 @@ class S3FileReader: def begin_sql_transaction(connection: Any) -> None: """ Start a SQL transaction in the passed connection. The task is agnostic to - the SQL engine being used. As long as it implements a begin() method, this - will work. + the SQL engine being used. As long as the connection object implements a + begin() method, this task will work. :param connection: the connection to some database. :return: None @@ -45,10 +45,13 @@ def begin_sql_transaction(connection: Any) -> None: def end_sql_transaction(connection: Any, dry_run: bool = False) -> None: """ Finish a SQL transaction, either by rolling it back or by committing it. + The task is agnostic to the SQL engine being used. As long as the + connection object implements a `commit` and a `rollback` method, this task + will work. :param connection: the connection to some database. :param dry_run: a flag indicating if persistence is desired. If dry_run - is True, changes will be rolledback. + is True, changes will be rolledback. Otherwise, they will be committed. :return: None """ logger = prefect.context.get("logger") From 58261b1a692f5867d322b07089398206ebb9aba6 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 13:56:30 +0200 Subject: [PATCH 16/19] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 525db12..4dc06d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Added tasks `begin_sql_transaction` and `end_sql_transaction` to the `utils`module. These enable the management of SQL + transactions in flows. It also allows for dry running SQL statements. + ## [0.4.0] - 2023-02-08 ### Added From 52ad64076608787b0b52c864742b19385b276025 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 24 Apr 2023 14:22:45 +0200 Subject: [PATCH 17/19] Add example in readme. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index d9cbbf1..e3abcf9 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,31 @@ with Flow(...) as flow: close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed]) ``` +**Use SQL transactions and dry running** + +```python +from lolafect.connections import connect_to_mysql, close_mysql_connection +from lolafect.utils import begin_sql_transaction, end_sql_transaction + +with Flow(...) as flow: + connection = connect_to_mysql( + mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS + ) + transaction_started = begin_sql_transaction(connection) + task_result = some_task_that_needs_mysql( + connection=connection, + upstream_task=[transaction_started] + ) + transaction_finished = end_sql_transaction( + connection, + dry_run=False, # True means rollback, False means commit changes + upstream_tasks=[task_result] + ) + + close_mysql_connection(connection=connection, upstream_tasks=[transaction_finished]) + +``` + ### Use Great Expectations **Run a Great Expectations validation on a MySQL query** From 897145ac199746ac64b7d8f757f42d1c5d7364f0 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 26 Apr 2023 09:42:30 +0200 Subject: [PATCH 18/19] Bumped version --- lolafect/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/__version__.py b/lolafect/__version__.py index 6a9beea..3d18726 100644 --- a/lolafect/__version__.py +++ b/lolafect/__version__.py @@ -1 +1 @@ -__version__ = "0.4.0" +__version__ = "0.5.0" From 054de6ab3937a91d88551f176edb05d0884db8d9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 26 Apr 2023 09:43:01 +0200 Subject: [PATCH 19/19] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dc06d5..30b1c5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. -## [Unreleased] +## [0.5.0] - 2023-04-26 ### Added