2022-12-29 18:08:34 +01:00
|
|
|
# Lolafect
|
|
|
|
|
|
2023-01-09 13:51:16 +01:00
|
|
|
Lolafect is a collection of Python bits that help us build our Prefect flows.
|
|
|
|
|
|
2023-01-09 16:29:38 +01:00
|
|
|
## Quickstart
|
|
|
|
|
|
|
|
|
|
You can find below examples of how to leverage `lolafect` in your flows.
|
|
|
|
|
|
2023-01-24 14:28:04 +01:00
|
|
|
**_Note: the code excerpts below are simplified for brevity and won't run
|
|
|
|
|
as-is. If you want to see perfect examples, you might want to check the tests
|
|
|
|
|
in this repository._**
|
|
|
|
|
|
|
|
|
|
### Config
|
|
|
|
|
|
2023-01-09 16:29:38 +01:00
|
|
|
**Let the `LolaConfig` object do the boilerplate env stuff for you**
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
from lolafect.lolaconfig import build_lolaconfig
|
|
|
|
|
|
2023-01-18 17:20:54 +01:00
|
|
|
lolaconfig = build_lolaconfig(flow_name="some-flow")
|
2023-01-09 16:29:38 +01:00
|
|
|
|
|
|
|
|
# Now you can access all the env stuff from here
|
|
|
|
|
lolaconfig.FLOW_NAME
|
|
|
|
|
lolaconfig.FLOW_NAME_UDCS
|
|
|
|
|
lolaconfig.STORAGE
|
|
|
|
|
lolaconfig.KUBERNETES_IMAGE
|
|
|
|
|
lolaconfig.KUBERNETES_LABELS
|
|
|
|
|
lolaconfig.SLACK_WEBHOOKS
|
2023-01-18 17:18:19 +01:00
|
|
|
lolaconfig.DW_CREDENTIALS
|
|
|
|
|
lolaconfig.TRINO_CREDENTIALS
|
|
|
|
|
lolaconfig.SSH_TUNNEL_CREDENTIALS
|
|
|
|
|
lolaconfig.PREFECT_HOST
|
2023-01-09 16:29:38 +01:00
|
|
|
# etc
|
2023-01-18 17:20:54 +01:00
|
|
|
|
|
|
|
|
# Your flow is different from the typical one?
|
|
|
|
|
# You can customize the behaviour of LolaConfig
|
|
|
|
|
lolaconfig = build_lolaconfig(
|
|
|
|
|
flow_name="some-flow",
|
|
|
|
|
env_s3_bucket="my-odd-bucket",
|
|
|
|
|
kubernetes_labels=["some-unusual-label"],
|
|
|
|
|
kubernetes_image="the-image:not-the-production-one",
|
|
|
|
|
)
|
2023-01-09 16:29:38 +01:00
|
|
|
```
|
|
|
|
|
|
2023-01-24 14:28:04 +01:00
|
|
|
### Connections
|
|
|
|
|
|
2023-01-23 14:27:17 +01:00
|
|
|
**Connect to a Trino server**
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
from lolafect.connections import connect_to_trino, close_trino_connection
|
|
|
|
|
|
2023-01-23 14:37:12 +01:00
|
|
|
with Flow(...) as flow:
|
2023-01-23 14:27:17 +01:00
|
|
|
connection = connect_to_trino.run(
|
2023-01-24 14:28:04 +01:00
|
|
|
trino_credentials=my_trino_credentials # You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS
|
2023-01-23 14:27:17 +01:00
|
|
|
)
|
2023-01-23 14:37:12 +01:00
|
|
|
task_result = some_trino_related_task(trino_connection=connection)
|
|
|
|
|
close_trino_connection.run(
|
|
|
|
|
trino_connection=connection,
|
|
|
|
|
upstream_tasks=[task_result]
|
|
|
|
|
)
|
2023-01-23 14:27:17 +01:00
|
|
|
```
|
|
|
|
|
|
2023-01-24 14:28:04 +01:00
|
|
|
**Open an SSH tunnel**
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
|
|
|
|
|
|
|
|
|
|
with Flow(...) as flow:
|
|
|
|
|
# You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
|
|
|
|
|
tunnel = open_ssh_tunnel_with_s3_pkey(
|
|
|
|
|
s3_bucket_name="some-bucket",
|
|
|
|
|
ssh_tunnel_credentials={...},
|
|
|
|
|
remote_target_host="some-host-probably-mysql",
|
|
|
|
|
remote_target_port=12345,
|
|
|
|
|
)
|
|
|
|
|
# Tunnel is now alive. tunnel.is_active == True
|
|
|
|
|
close_ssh_tunnel(tunnel=tunnel)
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
**Connect to a MySQL instance**
|
|
|
|
|
```python
|
|
|
|
|
from lolafect.connections import connect_to_mysql, close_mysql_connection
|
|
|
|
|
|
|
|
|
|
with Flow(...) as flow:
|
|
|
|
|
connection = connect_to_mysql.run(
|
|
|
|
|
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
|
|
|
|
|
)
|
|
|
|
|
connection.cursor().execute("SELECT 1")
|
|
|
|
|
close_mysql_connection.run(connection=connection)
|
|
|
|
|
|
|
|
|
|
# Want to connect through an SSH tunnel? Open the tunnel normally and then
|
|
|
|
|
# override the host and port when connecting to MySQL.
|
|
|
|
|
|
|
|
|
|
from lolafect.connections import (
|
|
|
|
|
open_ssh_tunnel_with_s3_pkey,
|
|
|
|
|
get_local_bind_address_from_ssh_tunnel,
|
|
|
|
|
close_ssh_tunnel
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with Flow(...) as flow:
|
|
|
|
|
# You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
|
|
|
|
|
tunnel = open_ssh_tunnel_with_s3_pkey(
|
|
|
|
|
s3_bucket_name="some-bucket",
|
|
|
|
|
ssh_tunnel_credentials={...},
|
|
|
|
|
remote_target_host="the-mysql-host",
|
|
|
|
|
remote_target_port=3306,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
connection = connect_to_mysql.run(
|
|
|
|
|
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
|
|
|
|
|
overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run(
|
|
|
|
|
tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
connection.cursor().execute("SELECT 1")
|
|
|
|
|
|
|
|
|
|
close_mysql_connection.run(connection=connection)
|
|
|
|
|
close_ssh_tunnel.run(tunnel=tunnel)
|
|
|
|
|
```
|
|
|
|
|
|
2023-01-26 16:48:01 +01:00
|
|
|
### Use Great Expectations
|
|
|
|
|
|
|
|
|
|
**Run a Great Expectations validation on a MySQL query**
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
from lolafect.data_testing import run_data_test_on_mysql
|
|
|
|
|
|
|
|
|
|
with Flow(...) as flow:
|
2023-01-26 16:56:18 +01:00
|
|
|
|
2023-01-26 16:48:01 +01:00
|
|
|
my_query = """SELECT something FROM somewhere"""
|
|
|
|
|
my_expectations = {...} # A bunch of things you want to validate on the result of the query
|
|
|
|
|
|
|
|
|
|
validation_results = run_validation_on_mysql(
|
2023-01-26 16:56:18 +01:00
|
|
|
name="my-cool-validation",
|
|
|
|
|
mysql_credentials={...},
|
2023-01-26 16:48:01 +01:00
|
|
|
query=my_query,
|
|
|
|
|
expectations=my_expectations
|
|
|
|
|
)
|
|
|
|
|
```
|
|
|
|
|
|
2023-01-24 14:28:04 +01:00
|
|
|
### Slack
|
2023-01-23 14:27:17 +01:00
|
|
|
|
2023-01-09 16:29:38 +01:00
|
|
|
**Send a warning message to slack if your tasks fails**
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
from prefect.triggers import any_failed
|
|
|
|
|
from lolafect.slack import SendSlackMessageTask
|
|
|
|
|
|
|
|
|
|
send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with
|
|
|
|
|
#different triggers. For example, you can send a message when all tasks fail, or all tasks succeed
|
|
|
|
|
|
|
|
|
|
with Flow(...) as flow:
|
|
|
|
|
crucial_task_result = some_crucial_task()
|
|
|
|
|
|
|
|
|
|
send_warning_message_on_any_failure(
|
|
|
|
|
webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS
|
|
|
|
|
text_to_send="Watchout, the flow failed!",
|
|
|
|
|
upstream_tasks=[crucial_task_result]
|
|
|
|
|
)
|
|
|
|
|
```
|
2023-01-09 13:51:16 +01:00
|
|
|
|
|
|
|
|
## How to test
|
|
|
|
|
|
2023-01-23 14:35:37 +01:00
|
|
|
There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our
|
|
|
|
|
AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The
|
|
|
|
|
recommended policy is:
|
|
|
|
|
|
|
|
|
|
- Use the unit tests in any CI process you want.
|
|
|
|
|
- Use the unit tests frequently as you code.
|
|
|
|
|
- Do not use the integration tests in CI processes.
|
|
|
|
|
- Use the integration tests as milestone checks when finishing feature branches.
|
|
|
|
|
- Make sure to ensure integration tests are working before making a new release.
|
|
|
|
|
|
|
|
|
|
When building new tests, please keep this philosophy in mind.
|
|
|
|
|
|
|
|
|
|
|
2023-01-09 13:51:16 +01:00
|
|
|
IDE-agnostic:
|
2023-01-09 16:29:38 +01:00
|
|
|
|
2023-01-09 13:51:16 +01:00
|
|
|
1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`.
|
2023-01-23 14:35:37 +01:00
|
|
|
2. Run:
|
|
|
|
|
- For all tests: `pytests tests`
|
|
|
|
|
- Only unit tests: `pytest tests/test_unit`
|
|
|
|
|
- Only integration tests: `pytest tests/test_integration`
|
2023-01-09 13:51:16 +01:00
|
|
|
|
2023-01-09 16:29:38 +01:00
|
|
|
In Pycharm:
|
|
|
|
|
|
|
|
|
|
- If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test
|
2023-01-23 14:35:37 +01:00
|
|
|
folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration
|
|
|
|
|
tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use
|
|
|
|
|
the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that.
|