From 19b1e447ff81384aae34a94a5417b6d91eaeaab0 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 6 Mar 2023 17:08:54 +0100 Subject: [PATCH] Quite a bit of the flow --- docs/gallery/data_testing/README.md | 5 +- .../gallery/data_testing/data_testing_flow.py | 126 +++++++++++++++++- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/docs/gallery/data_testing/README.md b/docs/gallery/data_testing/README.md index 2926488..a1de4b0 100644 --- a/docs/gallery/data_testing/README.md +++ b/docs/gallery/data_testing/README.md @@ -1,4 +1,7 @@ # Data Testing Gallery In this folder, you can find a sample flow project that showcases how you can -do data testing with Lolafect. \ No newline at end of file +do data testing with Lolafect. + +You can also take a look at our GE best practices and guidelines +[here](https://pdofonte.atlassian.net/wiki/spaces/DATA/pages/2484797445/Usage+Guidelines+and+Best+Practices). \ No newline at end of file diff --git a/docs/gallery/data_testing/data_testing_flow.py b/docs/gallery/data_testing/data_testing_flow.py index 7e5dd7f..38b771e 100644 --- a/docs/gallery/data_testing/data_testing_flow.py +++ b/docs/gallery/data_testing/data_testing_flow.py @@ -19,12 +19,130 @@ ### IMPORTS -# TODO +from prefect import Flow, task +from prefect.run_configs import KubernetesRun -### TASK PREP +# ↑↑↑ Standard prefect stuff for the flow. + +from great_expectations.core.expectation_configuration import ExpectationConfiguration + +# ↑↑↑ ExpectationConfiguration is the class that allows us to define a single +# expectation. We use it once for every expectation we define. + +from lolafect.lolaconfig import build_lolaconfig + +# ↑↑↑ Usual lolaconfig import to get all the env data. +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, # ←←← We connect through an SSH tunnel + close_ssh_tunnel, # ←←← Which we will have to close +) +from lolafect.data_testing import ( + run_data_test_on_mysql, +) # ←←← The task to run a data test + +### PREP + +LOLACONFIG = build_lolaconfig(flow_name="018-pl-general-validations") +# ↑↑↑ Get env from S3 and prepare everything related to it + + +DATA_TEST_NAME = "gallery-example-test" +# ↑↑↑ Our data test must have a name. We will need this if we want to look for +# its logs in S3. + +DATA_TEST_QUERY = """ + SELECT "hi" AS some_string, + 1 AS some_number, + NULL as some_null +""" +# ↑↑↑ Our query defines what data do we want to test. This is a silly select +# with hardcoded values because this is a demo, but in a real environment, you +# most probably will want to have a common SELECT [...] FROM [...] query that +# fetches the data your want to test. + +DATA_TEST_EXPECTATIONS = [ + ExpectationConfiguration( + expectation_type="expect_column_values_to_match_like_pattern", + kwargs={"column": "some_string", "like_pattern": "%hi%"}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_between", + kwargs={"column": "some_number", "min_value": 1, "max_value": 1}, + ), + ExpectationConfiguration( + expectation_type="expect_column_values_to_be_null", + kwargs={"column": "some_null"}, + ), +] + +# ↑↑↑ Our expectations define what data should be like. Each expectation is +# defined with a call to ExpectationConfiguration. You can check a reference +# of available expectations and how to call them here: +# https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html + + +@task +def fetch_tunnel_host_and_port(ssh_tunnel): + host = ssh_tunnel.local_bind_address[0] + port = ssh_tunnel.local_bind_address[1] + + return host, port -# TODO ### FLOW -# TODO \ No newline at end of file +with Flow( + LOLACONFIG.FLOW_NAME_UDCS, + storage=LOLACONFIG.STORAGE, + run_config=KubernetesRun( + labels=LOLACONFIG.KUBERNETES_LABELS, + image=LOLACONFIG.KUBERNETES_IMAGE, + ), +) as flow: + + ssh_tunnel = open_ssh_tunnel_with_s3_pkey( + s3_bucket_name=LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=LOLACONFIG.DW_CREDENTIALS["port"], + ) + # ↑↑↑ We open an SSH tunnel pointing to DW + + # ↓↓↓ This is where we actually run the data test. The result of the test + # gets stored in data_test_result. + data_test_result = run_data_test_on_mysql.run( + name=DATA_TEST_NAME, # ←←← The name we set earlier + # ↓↓↓ The credentials to the MySQL where the data lives. We pass the + # ssh tunnel host and port instead of the true MySQL because we want + # to pass through the tunnel. If it was a direct connection, we would + # simply use the MySQL true host and port. + mysql_credentials={ + "host": fetch_tunnel_host_and_port(ssh_tunnel)[0], + "port": fetch_tunnel_host_and_port(ssh_tunnel)[1], + "user": LOLACONFIG.DW_CREDENTIALS["user"], + "password": LOLACONFIG.DW_CREDENTIALS["password"], + "db": "sandbox", # ←←← We always need to pass a default db, but it + # is recommended to always specify your schemas + }, # in the queries regardless. + query=DATA_TEST_QUERY, # ←←← The query we set earlier + expectation_configurations=DATA_TEST_EXPECTATIONS, # ←←← Idem + upstream_tasks=[ssh_tunnel] # ←←← We must wait for the tunnel to be ready + ) + + # ↑↑↑ will take care of everything: connecting to S3 and DW, generate all + # the necessary configurations, run the actual test and store results both + # in memory and in S3. + # + # What to do from here is up to you. You can easily check if the test + # passed or not by accessing data_test_result["success"]. If it equals + # True, the test passed. If it equals False, at least one expectation + # failed. + # + # The following snippets are optional. You should judge if you want to do + # something similar or not in your flow based on your needs. + + tunnel_closed = close_ssh_tunnel(ssh_tunnel) + + + +# TODO