Merge pull request #12 from lolamarket/feature/ge_example_project

Feature/ge example project
This commit is contained in:
pablolola 2023-03-27 11:58:19 +02:00 committed by GitHub
commit 885c30184f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 256 additions and 0 deletions

View file

@ -164,6 +164,13 @@ with Flow(...) as flow:
) )
``` ```
## Gallery
This repo also contains a gallery of example flows that you can user to better
understand `lolafect` or as templates to kickstart your own flows. You can
find these in `docs/gallery`.
## How to test ## How to test
There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our

View file

@ -0,0 +1,7 @@
# Data Testing Gallery
In this folder, you can find a sample flow project that showcases how you can
do data testing with Lolafect.
You can also take a look at our GE best practices and guidelines
[here](https://pdofonte.atlassian.net/wiki/spaces/DATA/pages/2484797445/Usage+Guidelines+and+Best+Practices).

View file

@ -0,0 +1,238 @@
### INTRO
# This is an example flow to showcase data testing.
# There are several ways you can use this:
# 1. If you simply want to copy paste useful recipes... the flow is yours.
# 2. If you want to learn, I would advice:
# - Skim through the whole script.
# - Now read the flow block carefully and refer to other parts of the
# script when needed.
# - Try to run the flow as-is. It should succeed.
# - Try to intentionally break the data test by changing the data or the
# expectations.
### HOW TO RUN
# The flow is packed with comments to guide you through what's happening.
# The flow is also runnable. To run it:
# - Make a virtual environment with the requirements.txt that live in the same
# folder as this script.
# - Start a shell, activate the venv, login to AWS and turn on your Mercadão
# VPN.
# - In the shell, run the command: prefect run -p docs/gallery/data_testing/data_testing_flow.py
#
# Note: this flow is designed to run in your laptop. It won't work in the
# prefect server. Don't bother uploading it.
# The flow connects to DW and makes a silly check on a silly query. You can use
# it as a reference on how to set up a data test in your serious flows.
### IMPORTS
import datetime
from prefect import Flow, task, case
from prefect.run_configs import KubernetesRun
# ↑↑↑ Standard prefect stuff for the flow.
from great_expectations.core.expectation_configuration import ExpectationConfiguration
# ↑↑↑ ExpectationConfiguration is the class that allows us to define a single
# expectation. We use it once for every expectation we define.
from lolafect.lolaconfig import build_lolaconfig
# ↑↑↑ Usual lolaconfig import to get all the env data.
from lolafect.connections import (
open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel
close_ssh_tunnel, # ←←← Which we will have to close
connect_to_mysql, # ←←← For quarantine purposes
close_mysql_connection, # ←←← To close the previous connection
)
from lolafect.slack import SendSlackMessageTask
# ↑↑↑ The task class to send slack messages.
from lolafect.data_testing import (
run_data_test_on_mysql,
) # ←←← The task to run a data test
### PREP
LOLACONFIG = build_lolaconfig(flow_name="lolafect-gallery-data-testing-demo")
# ↑↑↑ Get env from S3 and prepare everything related to it
DATA_TEST_NAME = "gallery-example-test"
# ↑↑↑ Our data test must have a name. We will need this if we want to look for
# its logs in S3.
DATA_TEST_QUERY = """
SELECT "hi" AS some_string,
1 AS some_number,
NULL as some_null
"""
# ↑↑↑ Our query defines what data do we want to test. This is a silly select
# with hardcoded values because this is a demo, but in a real environment, you
# most probably will want to have a common SELECT [...] FROM [...] query that
# fetches the data your want to test.
DATA_TEST_EXPECTATIONS = [
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_like_pattern",
kwargs={"column": "some_string", "like_pattern": "%hi%"},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={"column": "some_number", "min_value": 1, "max_value": 1},
),
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_null",
kwargs={"column": "some_null"},
),
]
# ↑↑↑ Our expectations define what data should be like. Each expectation is
# defined with a call to ExpectationConfiguration. You can check a reference
# of available expectations and how to call them here:
# https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html
@task
def fetch_tunnel_host_and_port(ssh_tunnel):
host = ssh_tunnel.local_bind_address[0]
port = ssh_tunnel.local_bind_address[1]
return host, port
# ↑↑↑ A small helper function to get the host and the port where the SSH
# tunnel is listening inside this host.
@task
def fail(exception, message):
raise exception(message)
# ↑↑↑ A small helper function to cause a task failure with a custom message.
@task
def quarantine_failed_data(connection, query_to_get_quarantine_data):
cursor = connection.cursor()
cursor.execute(
f"""
CREATE TABLE quarantine.{LOLACONFIG.FLOW_NAME_UDCS}_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")} AS
{query_to_get_quarantine_data}
"""
)
# ↑↑↑ This query will store the faulty data in a quarantine schema in DW
# It creates a new table on each run, and uses the flow name and the current time
# to give the table a unique and informative name.
send_slack_message = SendSlackMessageTask()
# ↑↑↑ Simply making an instance of the task class. send_slack_message will be
# the task we use in the flow.
### FLOW
with Flow(
LOLACONFIG.FLOW_NAME_UDCS,
storage=LOLACONFIG.STORAGE,
run_config=KubernetesRun(
labels=LOLACONFIG.KUBERNETES_LABELS,
image=LOLACONFIG.KUBERNETES_IMAGE,
),
) as flow:
ssh_tunnel = open_ssh_tunnel_with_s3_pkey(
s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"],
)
# ↑↑↑ We open an SSH tunnel pointing to DW
# ↓↓↓ This is where we actually run the data test. The result of the test
# gets stored in data_test_result.
data_test_result = run_data_test_on_mysql(
name=DATA_TEST_NAME, # ←←← The name we set earlier
# ↓↓↓ The credentials to the MySQL where the data lives. We pass the
# ssh tunnel host and port instead of the true MySQL because we want
# to pass through the tunnel. If it was a direct connection, we would
# simply use the MySQL true host and port.
mysql_credentials={
"host": fetch_tunnel_host_and_port(ssh_tunnel)[0],
"port": fetch_tunnel_host_and_port(ssh_tunnel)[1],
"user": LOLACONFIG.DW_CREDENTIALS["user"],
"password": LOLACONFIG.DW_CREDENTIALS["password"],
"db": "sandbox", # ←←← We always need to pass a default db, but it
# is recommended to always specify your schemas
}, # in the queries regardless.
query=DATA_TEST_QUERY, # ←←← The query we set earlier
expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem
upstream_tasks=[ssh_tunnel], # ←←← We must wait for the tunnel to be ready
)
# ↑↑↑ will take care of everything: connecting to S3 and DW, generate all
# the necessary configurations, run the actual test and store results both
# in memory and in S3.
#
# What to do from here is up to you. You can easily check if the test
# passed or not by accessing data_test_result["success"]. If it equals
# True, the test passed. If it equals False, at least one expectation
# failed.
#
# The following snippets are optional. You should judge if you want to do
# something similar or not in your flow based on your needs.
### RAISING AN EXCEPTION
# When a test with run_data_test_on_mysql fails, it's important that you
# keep in mind that this will not cause a failure, in the sense of a
# prefect task failing. This is intentional: we didn't want to assume that
# a failing data test always translates into a failed flow.
#
# Nevertheless, it might be the case that you want your flow to fail if
# the data test didn't pass. To do so, you can use a simple helper task and
# a case, just like this:
with case(data_test_result["success"], False):
fail(ValueError, "Woops, the test didn't pass.")
### SENDING A SLACK WARNING
# You might also want to send a slack message to a channel if the test
# does not pass. You can do so like this:
with case(data_test_result["success"], False):
send_slack_message(
LOLACONFIG.SLACK_WEBHOOKS["data-team-alerts-testing"], # ←←← A webhook URL
"Uh oh, the demo flow failed.", # ←←← Your warning message
)
### QUARANTINE THE TESTED DATA
# Another common practice is to store the data that doesn't pass a test.
# This provides a lot of benefits that are discussed in our best practices
# docs in Confluence. Here is an example of you can quarantine the data
# that made your test fail:
dw_connection = connect_to_mysql(
mysql_credentials=LOLACONFIG.DW_CREDENTIALS,
overriding_host_and_port=fetch_tunnel_host_and_port(ssh_tunnel),
upstream_tasks=[data_test_result],
)
# ↑↑↑ We connect to DW, and since we are using the SSH tunnel, we
# override DWs host and port and instead use the listening ones from
# the tunnel.
with case(data_test_result["success"], False):
quarantined = quarantine_failed_data(dw_connection, DATA_TEST_QUERY)
# ↑↑↑ We call the quarantine_failed_data task. You can review the
# actions of this task in the definition that appears earlier in this
# file.
mysql_closed = close_mysql_connection(dw_connection)
tunnel_closed = close_ssh_tunnel(ssh_tunnel, upstream_tasks=[mysql_closed])

View file

@ -0,0 +1,4 @@
prefect==1.2.2
great_expectations==0.15.45
SQLAlchemy==1.4.46
lolafect==0.4.0