Completed examples in flow.
This commit is contained in:
parent
0c3243c9ce
commit
c30e2778e8
1 changed files with 82 additions and 4 deletions
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
### IMPORTS
|
||||
|
||||
from prefect import Flow, task
|
||||
from prefect import Flow, task, case
|
||||
from prefect.run_configs import KubernetesRun
|
||||
|
||||
# ↑↑↑ Standard prefect stuff for the flow.
|
||||
|
|
@ -35,14 +35,19 @@ from lolafect.lolaconfig import build_lolaconfig
|
|||
from lolafect.connections import (
|
||||
open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel
|
||||
close_ssh_tunnel, # ←←← Which we will have to close
|
||||
connect_to_mysql, # ←←← For quarantine purposes
|
||||
close_mysql_connection, # ←←← To close the previous connection
|
||||
)
|
||||
from lolafect.slack import SendSlackMessageTask
|
||||
|
||||
# ↑↑↑ The task class to send slack messages.
|
||||
from lolafect.data_testing import (
|
||||
run_data_test_on_mysql,
|
||||
) # ←←← The task to run a data test
|
||||
|
||||
### PREP
|
||||
|
||||
LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations")
|
||||
LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo")
|
||||
# ↑↑↑ Get env from S3 and prepare everything related to it
|
||||
|
||||
|
||||
|
|
@ -89,6 +94,37 @@ def fetch_tunnel_host_and_port(ssh_tunnel):
|
|||
return host, port
|
||||
|
||||
|
||||
# ↑↑↑ A small helper function to get the host and the port where the SSH
|
||||
# tunnel is listening inside this host.
|
||||
|
||||
|
||||
@task
|
||||
def fail(exception, message):
|
||||
raise exception(message)
|
||||
|
||||
|
||||
# ↑↑↑ A small helper function to cause a task failure with a custom message.
|
||||
|
||||
|
||||
@task
|
||||
def quarantine_failed_data(connection, query_to_get_quarantine_data):
|
||||
cursor = connection.cursor()
|
||||
|
||||
cursor.execute(
|
||||
f"""
|
||||
CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS
|
||||
{query_to_get_quarantine_data}
|
||||
"""
|
||||
)
|
||||
# ↑↑↑ This query will store the faulty data in a quarantine schema in DW
|
||||
# It creates a new table on each run, and uses the flow name and the current time
|
||||
# to give the table a unique and informative name.
|
||||
|
||||
|
||||
send_slack_message = SendSlackMessageTask()
|
||||
# ↑↑↑ Simply making an instance of the task class. send_slack_message will be
|
||||
# the task we use in the flow.
|
||||
|
||||
### FLOW
|
||||
|
||||
with Flow(
|
||||
|
|
@ -141,7 +177,49 @@ with Flow(
|
|||
# The following snippets are optional. You should judge if you want to do
|
||||
# something similar or not in your flow based on your needs.
|
||||
|
||||
tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[data_test_result])
|
||||
### RAISING AN EXCEPTION
|
||||
# When a test with run_data_test_on_mysql fails, it's important that you
|
||||
# keep in mind that this will not cause a failure, in the sense of a
|
||||
# prefect task failing. This is intentional: we didn't want to assume that
|
||||
# a failing data test always translates into a failed flow.
|
||||
#
|
||||
# Nevertheless, it might be the case that you want your flow to fail if
|
||||
# the data test didn't pass. To do so, you can use a simple helper task and
|
||||
# a case, just like this (uncomment the lines if you want to cause the
|
||||
# failure):
|
||||
|
||||
# with case(data_test_result["success"], False):
|
||||
# fail(ValueError, "Woops, the test didn't pass.")
|
||||
|
||||
# TODO
|
||||
### SENDING A SLACK WARNING
|
||||
# You might also want to send a slack message to a channel if the test
|
||||
# does not pass. You can do so like this:
|
||||
|
||||
with case(data_test_result["success"], False):
|
||||
send_slack_message(
|
||||
LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL
|
||||
"Uh oh, the demo flow failed.", # ←←← Your warning message
|
||||
)
|
||||
|
||||
### QUARANTINE THE TESTED DATA
|
||||
# Another common practice is to store the data that doesn't pass a test.
|
||||
# This provides a lot of benefits that are discussed in our best practices
|
||||
# docs in Confluence. Here is an example of you can quarantine the data
|
||||
# that made your test fail:
|
||||
|
||||
with case(data_test_result["success"], False):
|
||||
dw_connection = connect_to_mysql(
|
||||
mysql_credentials=LOLACONFIG.DW_CREDENTIALS,
|
||||
overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel),
|
||||
)
|
||||
# ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we
|
||||
# override DWs host and port and instead use the listening ones from
|
||||
# the tunnel.
|
||||
|
||||
quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY)
|
||||
# ↑↑↑ We call the quarantine_failed_data task. You can review the
|
||||
# actions of this task in the definition that appears earlier in this
|
||||
# file.
|
||||
|
||||
mysql_closed = close_mysql_connection(dw_connection)
|
||||
tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed])
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue