Quite a bit of the flow
This commit is contained in:
parent
36fffd3790
commit
19b1e447ff
2 changed files with 126 additions and 5 deletions
|
|
@ -1,4 +1,7 @@
|
||||||
# Data Testing Gallery
|
# Data Testing Gallery
|
||||||
|
|
||||||
In this folder, you can find a sample flow project that showcases how you can
|
In this folder, you can find a sample flow project that showcases how you can
|
||||||
do data testing with Lolafect.
|
do data testing with Lolafect.
|
||||||
|
|
||||||
|
You can also take a look at our GE best practices and guidelines
|
||||||
|
[here](https://pdofonte.atlassian.net/wiki/spaces/DATA/pages/2484797445/Usage+Guidelines+and+Best+Practices).
|
||||||
|
|
@ -19,12 +19,130 @@
|
||||||
|
|
||||||
### IMPORTS
|
### IMPORTS
|
||||||
|
|
||||||
# TODO
|
from prefect import Flow, task
|
||||||
|
from prefect.run_configs import KubernetesRun
|
||||||
|
|
||||||
### TASK PREP
|
# ↑↑↑ Standard prefect stuff for the flow.
|
||||||
|
|
||||||
|
from great_expectations.core.expectation_configuration import ExpectationConfiguration
|
||||||
|
|
||||||
|
# ↑↑↑ ExpectationConfiguration is the class that allows us to define a single
|
||||||
|
# expectation. We use it once for every expectation we define.
|
||||||
|
|
||||||
|
from lolafect.lolaconfig import build_lolaconfig
|
||||||
|
|
||||||
|
# ↑↑↑ Usual lolaconfig import to get all the env data.
|
||||||
|
from lolafect.connections import (
|
||||||
|
open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel
|
||||||
|
close_ssh_tunnel, # ←←← Which we will have to close
|
||||||
|
)
|
||||||
|
from lolafect.data_testing import (
|
||||||
|
run_data_test_on_mysql,
|
||||||
|
) # ←←← The task to run a data test
|
||||||
|
|
||||||
|
### PREP
|
||||||
|
|
||||||
|
LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations")
|
||||||
|
# ↑↑↑ Get env from S3 and prepare everything related to it
|
||||||
|
|
||||||
|
|
||||||
|
DATA_TEST_NAME = "gallery-example-test"
|
||||||
|
# ↑↑↑ Our data test must have a name. We will need this if we want to look for
|
||||||
|
# its logs in S3.
|
||||||
|
|
||||||
|
DATA_TEST_QUERY = """
|
||||||
|
SELECT "hi" AS some_string,
|
||||||
|
1 AS some_number,
|
||||||
|
NULL as some_null
|
||||||
|
"""
|
||||||
|
# ↑↑↑ Our query defines what data do we want to test. This is a silly select
|
||||||
|
# with hardcoded values because this is a demo, but in a real environment, you
|
||||||
|
# most probably will want to have a common SELECT [...] FROM [...] query that
|
||||||
|
# fetches the data your want to test.
|
||||||
|
|
||||||
|
DATA_TEST_EXPECTATIONS = [
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_match_like_pattern",
|
||||||
|
kwargs={"column": "some_string", "like_pattern": "%hi%"},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_be_between",
|
||||||
|
kwargs={"column": "some_number", "min_value": 1, "max_value": 1},
|
||||||
|
),
|
||||||
|
ExpectationConfiguration(
|
||||||
|
expectation_type="expect_column_values_to_be_null",
|
||||||
|
kwargs={"column": "some_null"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
# ↑↑↑ Our expectations define what data should be like. Each expectation is
|
||||||
|
# defined with a call to ExpectationConfiguration. You can check a reference
|
||||||
|
# of available expectations and how to call them here:
|
||||||
|
# https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def fetch_tunnel_host_and_port(ssh_tunnel):
|
||||||
|
host = ssh_tunnel.local_bind_address[0]
|
||||||
|
port = ssh_tunnel.local_bind_address[1]
|
||||||
|
|
||||||
|
return host, port
|
||||||
|
|
||||||
# TODO
|
|
||||||
|
|
||||||
### FLOW
|
### FLOW
|
||||||
|
|
||||||
# TODO
|
with Flow(
|
||||||
|
LOLACONFIG.FLOW_NAME_UDCS,
|
||||||
|
storage=LOLACONFIG.STORAGE,
|
||||||
|
run_config=KubernetesRun(
|
||||||
|
labels=LOLACONFIG.KUBERNETES_LABELS,
|
||||||
|
image=LOLACONFIG.KUBERNETES_IMAGE,
|
||||||
|
),
|
||||||
|
) as flow:
|
||||||
|
|
||||||
|
ssh_tunnel = open_ssh_tunnel_with_s3_pkey(
|
||||||
|
s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME,
|
||||||
|
ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
|
||||||
|
remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"],
|
||||||
|
remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"],
|
||||||
|
)
|
||||||
|
# ↑↑↑ We open an SSH tunnel pointing to DW
|
||||||
|
|
||||||
|
# ↓↓↓ This is where we actually run the data test. The result of the test
|
||||||
|
# gets stored in data_test_result.
|
||||||
|
data_test_result = run_data_test_on_mysql.run(
|
||||||
|
name=DATA_TEST_NAME, # ←←← The name we set earlier
|
||||||
|
# ↓↓↓ The credentials to the MySQL where the data lives. We pass the
|
||||||
|
# ssh tunnel host and port instead of the true MySQL because we want
|
||||||
|
# to pass through the tunnel. If it was a direct connection, we would
|
||||||
|
# simply use the MySQL true host and port.
|
||||||
|
mysql_credentials={
|
||||||
|
"host": fetch_tunnel_host_and_port(ssh_tunnel)[0],
|
||||||
|
"port": fetch_tunnel_host_and_port(ssh_tunnel)[1],
|
||||||
|
"user": LOLACONFIG.DW_CREDENTIALS["user"],
|
||||||
|
"password": LOLACONFIG.DW_CREDENTIALS["password"],
|
||||||
|
"db": "sandbox", # ←←← We always need to pass a default db, but it
|
||||||
|
# is recommended to always specify your schemas
|
||||||
|
}, # in the queries regardless.
|
||||||
|
query=DATA_TEST_QUERY, # ←←← The query we set earlier
|
||||||
|
expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem
|
||||||
|
upstream_tasks=[ssh_tunnel] # ←←← We must wait for the tunnel to be ready
|
||||||
|
)
|
||||||
|
|
||||||
|
# ↑↑↑ will take care of everything: connecting to S3 and DW, generate all
|
||||||
|
# the necessary configurations, run the actual test and store results both
|
||||||
|
# in memory and in S3.
|
||||||
|
#
|
||||||
|
# What to do from here is up to you. You can easily check if the test
|
||||||
|
# passed or not by accessing data_test_result["success"]. If it equals
|
||||||
|
# True, the test passed. If it equals False, at least one expectation
|
||||||
|
# failed.
|
||||||
|
#
|
||||||
|
# The following snippets are optional. You should judge if you want to do
|
||||||
|
# something similar or not in your flow based on your needs.
|
||||||
|
|
||||||
|
tunnel_closed = close_ssh_tunnel(ssh_tunnel)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue