diff --git a/README.md b/README.md index f5b5876..d9cbbf1 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,13 @@ with Flow(...) as flow: ) ``` +## Gallery + +This repo also contains a gallery of example flows that you can user to better +understand `lolafect` or as templates to kickstart your own flows. You can +find these in `docs/gallery`. + + ## How to test There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our diff --git a/docs/gallery/data_testing/README.md b/docs/gallery/data_testing/README.md new file mode 100644 index 0000000..a1de4b0 --- /dev/null +++ b/docs/gallery/data_testing/README.md @@ -0,0 +1,7 @@ +# Data Testing Gallery + +In this folder, you can find a sample flow project that showcases how you can +do data testing with Lolafect. + +You can also take a look at our GE best practices and guidelines +[here](https://pdofonte.atlassian.net/wiki/spaces/DATA/pages/2484797445/Usage+Guidelines+and+Best+Practices). \ No newline at end of file diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py new file mode 100644 index 0000000..683237c --- /dev/null +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -0,0 +1,238 @@ +### INTRO + +# This is an example flow to showcase data testing. + +# There are several ways you can use this: +# 1. If you simply want to copy paste useful recipes... the flow is yours. +# 2. If you want to learn, I would advice: +# - Skim through the whole script. +# - Now read the flow block carefully and refer to other parts of the +# script when needed. +# - Try to run the flow as-is. It should succeed. +# - Try to intentionally break the data test by changing the data or the +# expectations. + +### HOW TO RUN +# The flow is packed with comments to guide you through what's happening. +# The flow is also runnable. To run it: +# - Make a virtual environment with the requirements.txt that live in the same +# folder as this script. +# - Start a shell, activate the venv, login to AWS and turn on your Mercadão +# VPN. +# - In the shell, run the command: prefect run -p docs/gallery/data_testing/data_testing_flow.py +# +# Note: this flow is designed to run in your laptop. It won't work in the +# prefect server. Don't bother uploading it. + +# The flow connects to DW and makes a silly check on a silly query. You can use +# it as a reference on how to set up a data test in your serious flows. + + +### IMPORTS + +import datetime + +from prefect import Flow, task, case +from prefect.run_configs import KubernetesRun + +# ↑↑↑ Standard prefect stuff for the flow. + +from great_expectations.core.expectation_configuration import ExpectationConfiguration + +# ↑↑↑ ExpectationConfiguration is the class that allows us to define a single +# expectation. We use it once for every expectation we define. + +from lolafect.lolaconfig import build_lolaconfig + +# ↑↑↑ Usual lolaconfig import to get all the env data. +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel + close_ssh_tunnel, # ←←← Which we will have to close + connect_to_mysql, # ←←← For quarantine purposes + close_mysql_connection, # ←←← To close the previous connection +) +from lolafect.slack import SendSlackMessageTask + +# ↑↑↑ The task class to send slack messages. +from lolafect.data_testing import ( + run_data_test_on_mysql, +) # ←←← The task to run a data test + +### PREP + +LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo") +# ↑↑↑ Get env from S3 and prepare everything related to it + + +DATA_TEST_NAME = "gallery-example-test" +# ↑↑↑ Our data test must have a name. We will need this if we want to look for +# its logs in S3. + +DATA_TEST_QUERY = """ + SELECT "hi" AS some_string, + 1 AS some_number, + NULL as some_null +""" +# ↑↑↑ Our query defines what data do we want to test. This is a silly select +# with hardcoded values because this is a demo, but in a real environment, you +# most probably will want to have a common SELECT [...] FROM [...] query that +# fetches the data your want to test. + +DATA_TEST_EXPECTATIONS = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "some_string", "like_pattern": "%hi%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "some_number", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "some_null"}, + ), +] + +# ↑↑↑ Our expectations define what data should be like. Each expectation is +# defined with a call to ExpectationConfiguration. You can check a reference +# of available expectations and how to call them here: +# https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html + + +@task +def fetch_tunnel_host_and_port(ssh_tunnel): + host = ssh_tunnel.local_bind_address[0] + port = ssh_tunnel.local_bind_address[1] + + return host, port + + +# ↑↑↑ A small helper function to get the host and the port where the SSH +# tunnel is listening inside this host. + + +@task +def fail(exception, message): + raise exception(message) + + +# ↑↑↑ A small helper function to cause a task failure with a custom message. + + +@task +def quarantine_failed_data(connection, query_to_get_quarantine_data): + cursor = connection.cursor() + + cursor.execute( + f""" + CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS + {query_to_get_quarantine_data} + """ + ) + # ↑↑↑ This query will store the faulty data in a quarantine schema in DW + # It creates a new table on each run, and uses the flow name and the current time + # to give the table a unique and informative name. + + +send_slack_message = SendSlackMessageTask() +# ↑↑↑ Simply making an instance of the task class. send_slack_message will be +# the task we use in the flow. + +### FLOW + +with Flow( + LOLACONFIG.FLOW_NAME_UDCS, + storage=LOLACONFIG.STORAGE, + run_config=KubernetesRun( + labels=LOLACONFIG.KUBERNETES_LABELS, + image=LOLACONFIG.KUBERNETES_IMAGE, + ), +) as flow: + + ssh_tunnel = open_ssh_tunnel_with_s3_pkey( + s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"], + ) + # ↑↑↑ We open an SSH tunnel pointing to DW + + # ↓↓↓ This is where we actually run the data test. The result of the test + # gets stored in data_test_result. + data_test_result = run_data_test_on_mysql( + name=DATA_TEST_NAME, # ←←← The name we set earlier + # ↓↓↓ The credentials to the MySQL where the data lives. We pass the + # ssh tunnel host and port instead of the true MySQL because we want + # to pass through the tunnel. If it was a direct connection, we would + # simply use the MySQL true host and port. + mysql_credentials={ + "host": fetch_tunnel_host_and_port(ssh_tunnel)[0], + "port": fetch_tunnel_host_and_port(ssh_tunnel)[1], + "user": LOLACONFIG.DW_CREDENTIALS["user"], + "password": LOLACONFIG.DW_CREDENTIALS["password"], + "db": "sandbox", # ←←← We always need to pass a default db, but it + # is recommended to always specify your schemas + }, # in the queries regardless. + query=DATA_TEST_QUERY, # ←←← The query we set earlier + expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem + upstream_tasks=[ssh_tunnel], # ←←← We must wait for the tunnel to be ready + ) + + # ↑↑↑ will take care of everything: connecting to S3 and DW, generate all + # the necessary configurations, run the actual test and store results both + # in memory and in S3. + # + # What to do from here is up to you. You can easily check if the test + # passed or not by accessing data_test_result["success"]. If it equals + # True, the test passed. If it equals False, at least one expectation + # failed. + # + # The following snippets are optional. You should judge if you want to do + # something similar or not in your flow based on your needs. + + ### RAISING AN EXCEPTION + # When a test with run_data_test_on_mysql fails, it's important that you + # keep in mind that this will not cause a failure, in the sense of a + # prefect task failing. This is intentional: we didn't want to assume that + # a failing data test always translates into a failed flow. + # + # Nevertheless, it might be the case that you want your flow to fail if + # the data test didn't pass. To do so, you can use a simple helper task and + # a case, just like this: + + with case(data_test_result["success"], False): + fail(ValueError, "Woops, the test didn't pass.") + + ### SENDING A SLACK WARNING + # You might also want to send a slack message to a channel if the test + # does not pass. You can do so like this: + + with case(data_test_result["success"], False): + send_slack_message( + LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL + "Uh oh, the demo flow failed.", # ←←← Your warning message + ) + + ### QUARANTINE THE TESTED DATA + # Another common practice is to store the data that doesn't pass a test. + # This provides a lot of benefits that are discussed in our best practices + # docs in Confluence. Here is an example of you can quarantine the data + # that made your test fail: + + dw_connection = connect_to_mysql( + mysql_credentials=LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), + upstream_tasks=[data_test_result], + ) + # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we + # override DWs host and port and instead use the listening ones from + # the tunnel. + + with case(data_test_result["success"], False): + quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) + # ↑↑↑ We call the quarantine_failed_data task. You can review the + # actions of this task in the definition that appears earlier in this + # file. + + mysql_closed = close_mysql_connection(dw_connection) + tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed]) diff --git a/docs/gallery/data_testing/requirements.txt b/docs/gallery/data_testing/requirements.txt new file mode 100644 index 0000000..245292b --- /dev/null +++ b/docs/gallery/data_testing/requirements.txt @@ -0,0 +1,4 @@ +prefect==1.2.2 +great_expectations==0.15.45 +SQLAlchemy==1.4.46 +lolafect==0.4.0 \ No newline at end of file