From ea5edc5cda29377130c3cbf64c83813a9ba96883 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 7 Mar 2023 17:09:46 +0100 Subject: [PATCH] More bits and pieces. --- .../gallery/data_testing/data_testing_flow.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 3e39139..62134a0 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -2,6 +2,17 @@ # This is an example flow to showcase data testing. +# There are several ways you can use this: +# 1. If you simply want to copy paste useful recipes... the flow is yours. +# 2. If you want to learn, I would advice: +# - Skim through the whole script. +# - Now read the flow block carefully and refer to other parts of the +# script when needed. +# - Try to run the flow as-is. It should succeed. +# - Try to intentionally break the data test by changing the data or the +# expectations. + +### HOW TO RUN # The flow is packed with comments to guide you through what's happening. # The flow is also runnable. To run it: # - Make a virtual environment with the requirements.txt that live in the same @@ -19,6 +30,8 @@ ### IMPORTS +import datetime + from prefect import Flow, task, case from prefect.run_configs import KubernetesRun @@ -185,11 +198,10 @@ with Flow( # # Nevertheless, it might be the case that you want your flow to fail if # the data test didn't pass. To do so, you can use a simple helper task and - # a case, just like this (uncomment the lines if you want to cause the - # failure): + # a case, just like this: - # with case(data_test_result["success"], False): - # fail(ValueError, "Woops, the test didn't pass.") + with case(data_test_result["success"], False): + fail(ValueError, "Woops, the test didn't pass.") ### SENDING A SLACK WARNING # You might also want to send a slack message to a channel if the test @@ -207,15 +219,16 @@ with Flow( # docs in Confluence. Here is an example of you can quarantine the data # that made your test fail: - with case(data_test_result["success"], False): - dw_connection = connect_to_mysql( - mysql_credentials=LOLACONFIG.DW_CREDENTIALS, - overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), - ) - # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we - # override DWs host and port and instead use the listening ones from - # the tunnel. + dw_connection = connect_to_mysql( + mysql_credentials=LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel), + upstream_tasks=[data_test_result], + ) + # ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we + # override DWs host and port and instead use the listening ones from + # the tunnel. + with case(data_test_result["success"], False): quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY) # ↑↑↑ We call the quarantine_failed_data task. You can review the # actions of this task in the definition that appears earlier in this