### INTRO # This is an example flow to showcase data testing. # There are several ways you can use this: # 1. If you simply want to copy paste useful recipes... the flow is yours. # 2. If you want to learn, I would advice: # - Skim through the whole script. # - Now read the flow block carefully and refer to other parts of the # script when needed. # - Try to run the flow as-is. It should succeed. # - Try to intentionally break the data test by changing the data or the # expectations. ### HOW TO RUN # The flow is packed with comments to guide you through what's happening. # The flow is also runnable. To run it: # - Make a virtual environment with the requirements.txt that live in the same # folder as this script. # - Start a shell, activate the venv, login to AWS and turn on your Mercadão # VPN. # - In the shell, run the command: prefect run -p docs/gallery/data_testing/data_testing_flow.py # # Note: this flow is designed to run in your laptop. It won't work in the # prefect server. Don't bother uploading it. # The flow connects to DW and makes a silly check on a silly query. You can use # it as a reference on how to set up a data test in your serious flows. ### IMPORTS import datetime from prefect import Flow, task, case from prefect.run_configs import KubernetesRun # ↑↑↑ Standard prefect stuff for the flow. from great_expectations.core.expectation_configuration import ExpectationConfiguration # ↑↑↑ ExpectationConfiguration is the class that allows us to define a single # expectation. We use it once for every expectation we define. from lolafect.lolaconfig import build_lolaconfig # ↑↑↑ Usual lolaconfig import to get all the env data. from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel close_ssh_tunnel, # ←←← Which we will have to close connect_to_mysql, # ←←← For quarantine purposes close_mysql_connection, # ←←← To close the previous connection ) from lolafect.slack import SendSlackMessageTask # ↑↑↑ The task class to send slack messages. from lolafect.data_testing import ( run_data_test_on_mysql, ) # ←←← The task to run a data test ### PREP LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo") # ↑↑↑ Get env from S3 and prepare everything related to it DATA_TEST_NAME = "gallery-example-test" # ↑↑↑ Our data test must have a name. We will need this if we want to look for # its logs in S3. DATA_TEST_QUERY = """ SELECT "hi" AS some_string, 1 AS some_number, NULL as some_null """ # ↑↑↑ Our query defines what data do we want to test. This is a silly select # with hardcoded values because this is a demo, but in a real environment, you # most probably will want to have a common SELECT [...] FROM [...] query that # fetches the data your want to test. DATA_TEST_EXPECTATIONS = [ ExpectationConfiguration( expectation_type="expect_column_values_to_match_like_pattern", kwargs={"column": "some_string", "like_pattern": "%hi%"}, ), ExpectationConfiguration( expectation_type="expect_column_values_to_be_between", kwargs={"column": "some_number", "min_value": 1, "max_value": 1}, ), ExpectationConfiguration( expectation_type="expect_column_values_to_be_null", kwargs={"column": "some_null"}, ), ] # ↑↑↑ Our expectations define what data should be like. Each expectation is # defined with a call to ExpectationConfiguration. You can check a reference # of available expectations and how to call them here: # https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html @task def fetch_tunnel_host_and_port(ssh_tunnel): host = ssh_tunnel.local_bind_address[0] port = ssh_tunnel.local_bind_address[1] return host, port # ↑↑↑ A small helper function to get the host and the port where the SSH # tunnel is listening inside this host. @task def fail(exception, message): raise exception(message) # ↑↑↑ A small helper function to cause a task failure with a custom message. @task def quarantine_failed_data(connection, query_to_get_quarantine_data): cursor = connection.cursor() cursor.execute( f""" CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS {query_to_get_quarantine_data} """ ) # ↑↑↑ This query will store the faulty data in a quarantine schema in DW # It creates a new table on each run, and uses the flow name and the current time # to give the table a unique and informative name. send_slack_message = SendSlackMessageTask() # ↑↑↑ Simply making an instance of the task class. send_slack_message will be # the task we use in the flow. ### FLOW with Flow( LOLACONFIG.FLOW_NAME_UDCS, storage=LOLACONFIG.STORAGE, run_config=KubernetesRun( labels=LOLACONFIG.KUBERNETES_LABELS, image=LOLACONFIG.KUBERNETES_IMAGE, ), ) as flow: ssh_tunnel = open_ssh_tunnel_with_s3_pkey( s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME, ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS, remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"], remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"], ) # ↑↑↑ We open an SSH tunnel pointing to DW # ↓↓↓ This is where we actually run the data test. The result of the test # gets stored in data_test_result. data_test_result = run_data_test_on_mysql( name=DATA_TEST_NAME, # ←←← The name we set earlier # ↓↓↓ The credentials to the MySQL where the data lives. We pass the # ssh tunnel host and port instead of the true MySQL because we want # to pass through the tunnel. If it was a direct connection, we would # simply use the MySQL true host and port. mysql_credentials={ "host": fetch_tunnel_host_and_port(ssh_tunnel)[0], "port": fetch_tunnel_host_and_port(ssh_tunnel)[1], "user": LOLACONFIG.DW_CREDENTIALS["user"], "password": LOLACONFIG.DW_CREDENTIALS["password"], "db": "sandbox", # ←←← We always need to pass a default db, but it # is recommended to always specify your schemas }, # in the queries regardless. query=DATA_TEST_QUERY, # ←←← The query we set earlier expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem upstream_tasks=[ssh_tunnel], # ←←← We must wait for the tunnel to be ready ) # ↑↑↑ will take care of everything: connecting to S3 and DW, generate all # the necessary configurations, run the actual test and store results both # in memory and in S3. # # What to do from here is up to you. You can easily check if the test # passed or not by accessing data_test_result["success"]. If it equals # True, the test passed. If it equals False, at least one expectation # failed. # # The following snippets are optional. You should judge if you want to do # something similar or not in your flow based on your needs. ### RAISING AN EXCEPTION # When a test with run_data_test_on_mysql fails, it's important that you # keep in mind that this will not cause a failure, in the sense of a # prefect task failing. This is intentional: we didn't want to assume that # a failing data test always translates into a failed flow. # # Nevertheless, it might be the case that you want your flow to fail if # the data test didn't pass. To do so, you can use a simple helper task and # a case, just like this: with case(data_test_result["success"], False): fail(ValueError, "Woops, the test didn't pass.") ### SENDING A SLACK WARNING # You might also want to send a slack message to a channel if the test # does not pass. You can do so like this: with case(data_test_result["success"], False): send_slack_message( LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL "Uh oh, the demo flow failed.", # ←←← Your warning message ) ### QUARANTINE THE TESTED DATA # Another common practice is to store the data that doesn't pass a test. # This provides a lot of benefits that are discussed in our best practices # docs in Confluence. Here is an example of you can quarantine the data # that made your test fail: dw_connection = connect_to_mysql( mysql_credentials=LOLACONFIG.DW_CREDENTIALS, overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), upstream_tasks=[data_test_result], ) # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we # override DWs host and port and instead use the listening ones from # the tunnel. with case(data_test_result["success"], False): quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) # ↑↑↑ We call the quarantine_failed_data task. You can review the # actions of this task in the definition that appears earlier in this # file. mysql_closed = close_mysql_connection(dw_connection) tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed])