lolafect/README.md
2023-04-24 14:22:45 +02:00

227 lines
No EOL
7.5 KiB
Markdown

# Lolafect
Lolafect is a collection of Python bits that help us build our Prefect flows.
## Quickstart
You can find below examples of how to leverage `lolafect` in your flows.
**_Note: the code excerpts below are simplified for brevity and won't run
as-is. If you want to see perfect examples, you might want to check the tests
in this repository._**
### Config
**Let the `LolaConfig` object do the boilerplate env stuff for you**
```python
from lolafect.lolaconfig import build_lolaconfig
lolaconfig = build_lolaconfig(flow_name="some-flow")
# Now you can access all the env stuff from here
lolaconfig.FLOW_NAME
lolaconfig.FLOW_NAME_UDCS
lolaconfig.STORAGE
lolaconfig.KUBERNETES_IMAGE
lolaconfig.KUBERNETES_LABELS
lolaconfig.SLACK_WEBHOOKS
lolaconfig.DW_CREDENTIALS
lolaconfig.TRINO_CREDENTIALS
lolaconfig.SSH_TUNNEL_CREDENTIALS
lolaconfig.PREFECT_HOST
# etc
# Your flow is different from the typical one?
# You can customize the behaviour of LolaConfig
lolaconfig = build_lolaconfig(
flow_name="some-flow",
env_s3_bucket="my-odd-bucket",
kubernetes_labels=["some-unusual-label"],
kubernetes_image="the-image:not-the-production-one",
)
```
### Connections
**Connect to a Trino server**
```python
from lolafect.connections import connect_to_trino, close_trino_connection
with Flow(...) as flow:
connection = connect_to_trino.run(
trino_credentials=my_trino_credentials # You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS
)
task_result = some_trino_related_task(trino_connection=connection)
close_trino_connection.run(
trino_connection=connection,
upstream_tasks=[task_result]
)
```
**Open an SSH tunnel**
```python
from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
with Flow(...) as flow:
# You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
tunnel = open_ssh_tunnel_with_s3_pkey(
s3_bucket_name="some-bucket",
ssh_tunnel_credentials={...},
remote_target_host="some-host-probably-mysql",
remote_target_port=12345,
)
task_result = some_task_that_needs_the_tunnel(tunnel=tunnel)
# Tunnel is now alive. tunnel.is_active == True
close_ssh_tunnel(tunnel=tunnel, upstream_tasks=[task_result])
```
**Connect to a MySQL instance**
```python
from lolafect.connections import connect_to_mysql, close_mysql_connection
with Flow(...) as flow:
connection = connect_to_mysql(
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
)
task_result = some_task_that_needs_mysql(connection=connection)
close_mysql_connection(connection=connection, upstream_tasks=[task_result])
# Want to connect through an SSH tunnel? Open the tunnel normally and then
# override the host and port when connecting to MySQL.
from lolafect.connections import (
open_ssh_tunnel_with_s3_pkey,
get_local_bind_address_from_ssh_tunnel,
close_ssh_tunnel
)
with Flow(...) as flow:
# You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
tunnel = open_ssh_tunnel_with_s3_pkey(
s3_bucket_name="some-bucket",
ssh_tunnel_credentials={...},
remote_target_host="the-mysql-host",
remote_target_port=3306,
)
connection = connect_to_mysql(
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run(
tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL
),
)
task_result = some_task_that_needs_mysql(connection=connection)
mysql_closed = close_mysql_connection(connection=connection, upstream_tasks=[task_result])
close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed])
```
**Use SQL transactions and dry running**
```python
from lolafect.connections import connect_to_mysql, close_mysql_connection
from lolafect.utils import begin_sql_transaction, end_sql_transaction
with Flow(...) as flow:
connection = connect_to_mysql(
mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
)
transaction_started = begin_sql_transaction(connection)
task_result = some_task_that_needs_mysql(
connection=connection,
upstream_task=[transaction_started]
)
transaction_finished = end_sql_transaction(
connection,
dry_run=False, # True means rollback, False means commit changes
upstream_tasks=[task_result]
)
close_mysql_connection(connection=connection, upstream_tasks=[transaction_finished])
```
### Use Great Expectations
**Run a Great Expectations validation on a MySQL query**
```python
from lolafect.data_testing import run_data_test_on_mysql
with Flow(...) as flow:
my_query = """SELECT something FROM somewhere"""
my_expectations = {...} # A bunch of things you want to validate on the result of the query
validation_results = run_data_test_on_mysql(
name="my-cool-validation",
mysql_credentials={...},
query=my_query,
expectations=my_expectations
)
if not validation_results["success"]:
print("The data is bad!!!")
```
### Slack
**Send a warning message to slack if your tasks fails**
```python
from prefect.triggers import any_failed
from lolafect.slack import SendSlackMessageTask
send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with
#different triggers. For example, you can send a message when all tasks fail, or all tasks succeed
with Flow(...) as flow:
crucial_task_result = some_crucial_task()
send_warning_message_on_any_failure(
webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS
text_to_send="Watchout, the flow failed!",
upstream_tasks=[crucial_task_result]
)
```
## Gallery
This repo also contains a gallery of example flows that you can user to better
understand `lolafect` or as templates to kickstart your own flows. You can
find these in `docs/gallery`.
## How to test
There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our
AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The
recommended policy is:
- Use the unit tests in any CI process you want.
- Use the unit tests frequently as you code.
- Do not use the integration tests in CI processes.
- Use the integration tests as milestone checks when finishing feature branches.
- Make sure to ensure integration tests are working before making a new release.
When building new tests, please keep this philosophy in mind.
IDE-agnostic:
1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`.
2. Run:
- For all tests: `pytests tests`
- Only unit tests: `pytest tests/test_unit`
- Only integration tests: `pytest tests/test_integration`
In Pycharm:
- If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test
folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration
tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use
the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that.