diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index dd39164..3e39139 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -19,7 +19,7 @@ ### IMPORTS -from prefect import Flow, task +from prefect import Flow, task, case from prefect.run_configs import KubernetesRun # ↑↑↑ Standard prefect stuff for the flow. @@ -35,14 +35,19 @@ from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel close_ssh_tunnel, # ←←← Which we will have to close + connect_to_mysql, # ←←← For quarantine purposes + close_mysql_connection, # ←←← To close the previous connection ) +from lolafect.slack import SendSlackMessageTask + +# ↑↑↑ The task class to send slack messages. from lolafect.data_testing import ( run_data_test_on_mysql, ) # ←←← The task to run a data test ### PREP -LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations") +LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo") # ↑↑↑ Get env from S3 and prepare everything related to it @@ -89,6 +94,37 @@ def fetch_tunnel_host_and_port(ssh_tunnel): return host, port +# ↑↑↑ A small helper function to get the host and the port where the SSH +# tunnel is listening inside this host. + + +@task +def fail(exception, message): + raise exception(message) + + +# ↑↑↑ A small helper function to cause a task failure with a custom message. + + +@task +def quarantine_failed_data(connection, query_to_get_quarantine_data): + cursor = connection.cursor() + + cursor.execute( + f""" + CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS + {query_to_get_quarantine_data} + """ + ) + # ↑↑↑ This query will store the faulty data in a quarantine schema in DW + # It creates a new table on each run, and uses the flow name and the current time + # to give the table a unique and informative name. + + +send_slack_message = SendSlackMessageTask() +# ↑↑↑ Simply making an instance of the task class. send_slack_message will be +# the task we use in the flow. + ### FLOW with Flow( @@ -141,7 +177,49 @@ with Flow( # The following snippets are optional. You should judge if you want to do # something similar or not in your flow based on your needs. - tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[data_test_result]) + ### RAISING AN EXCEPTION + # When a test with run_data_test_on_mysql fails, it's important that you + # keep in mind that this will not cause a failure, in the sense of a + # prefect task failing. This is intentional: we didn't want to assume that + # a failing data test always translates into a failed flow. + # + # Nevertheless, it might be the case that you want your flow to fail if + # the data test didn't pass. To do so, you can use a simple helper task and + # a case, just like this (uncomment the lines if you want to cause the + # failure): + # with case(data_test_result["success"], False): + # fail(ValueError, "Woops, the test didn't pass.") -# TODO + ### SENDING A SLACK WARNING + # You might also want to send a slack message to a channel if the test + # does not pass. You can do so like this: + + with case(data_test_result["success"], False): + send_slack_message( + LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL + "Uh oh, the demo flow failed.", # ←←← Your warning message + ) + + ### QUARANTINE THE TESTED DATA + # Another common practice is to store the data that doesn't pass a test. + # This provides a lot of benefits that are discussed in our best practices + # docs in Confluence. Here is an example of you can quarantine the data + # that made your test fail: + + with case(data_test_result["success"], False): + dw_connection = connect_to_mysql( + mysql_credentials=LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), + ) + # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we + # override DWs host and port and instead use the listening ones from + # the tunnel. + + quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) + # ↑↑↑ We call the quarantine_failed_data task. You can review the + # actions of this task in the definition that appears earlier in this + # file. + + mysql_closed = close_mysql_connection(dw_connection) + tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed])