More bits and pieces.

This commit is contained in:
Pablo Martin 2023-03-07 17:09:46 +01:00
parent c30e2778e8
commit ea5edc5cda

View file

@ -2,6 +2,17 @@
# This is an example flow to showcase data testing.
# There are several ways you can use this:
# 1. If you simply want to copy paste useful recipes... the flow is yours.
# 2. If you want to learn, I would advice:
# - Skim through the whole script.
# - Now read the flow block carefully and refer to other parts of the
# script when needed.
# - Try to run the flow as-is. It should succeed.
# - Try to intentionally break the data test by changing the data or the
# expectations.
### HOW TO RUN
# The flow is packed with comments to guide you through what's happening.
# The flow is also runnable. To run it:
# - Make a virtual environment with the requirements.txt that live in the same
@ -19,6 +30,8 @@
### IMPORTS
import datetime
from prefect import Flow, task, case
from prefect.run_configs import KubernetesRun
@ -185,11 +198,10 @@ with Flow(
#
# Nevertheless, it might be the case that you want your flow to fail if
# the data test didn't pass. To do so, you can use a simple helper task and
# a case, just like this (uncomment the lines if you want to cause the
# failure):
# a case, just like this:
# with case(data_test_result["success"], False):
# fail(ValueError, "Woops, the test didn't pass.")
with case(data_test_result["success"], False):
fail(ValueError, "Woops, the test didn't pass.")
### SENDING A SLACK WARNING
# You might also want to send a slack message to a channel if the test
@ -207,15 +219,16 @@ with Flow(
# docs in Confluence. Here is an example of you can quarantine the data
# that made your test fail:
with case(data_test_result["success"], False):
dw_connection = connect_to_mysql(
mysql_credentials=LOLACONFIG.DW_CREDENTIALS,
overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel),
)
# ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we
# override DWs host and port and instead use the listening ones from
# the tunnel.
dw_connection = connect_to_mysql(
mysql_credentials=LOLACONFIG.DW_CREDENTIALS,
overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel),
upstream_tasks=[data_test_result],
)
# ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we
# override DWs host and port and instead use the listening ones from
# the tunnel.
with case(data_test_result["success"], False):
quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY)
# ↑↑↑ We call the quarantine_failed_data task. You can review the
# actions of this task in the definition that appears earlier in this