No description
Find a file
2023-02-08 14:22:16 +01:00
lolafect Bump version 2023-02-08 14:17:51 +01:00
tests Formatting. 2023-02-02 17:21:06 +01:00
.gitignore Initial commit 2022-12-29 17:06:11 +01:00
CHANGELOG.md Update changelog 2023-02-08 14:17:45 +01:00
README.md Merge remote-tracking branch 'origin/development' into feature/run-ge 2023-02-02 17:24:16 +01:00
requirements-dev.txt Include SQLAlchemy in dependencies. 2023-02-02 17:20:46 +01:00
setup.py Include SQLAlchemy in dependencies. 2023-02-02 17:20:46 +01:00

Lolafect

Lolafect is a collection of Python bits that help us build our Prefect flows.

Quickstart

You can find below examples of how to leverage lolafect in your flows.

Note: the code excerpts below are simplified for brevity and won't run as-is. If you want to see perfect examples, you might want to check the tests in this repository.

Config

Let the LolaConfig object do the boilerplate env stuff for you

from lolafect.lolaconfig import build_lolaconfig

lolaconfig = build_lolaconfig(flow_name="some-flow")

# Now you can access all the env stuff from here
lolaconfig.FLOW_NAME
lolaconfig.FLOW_NAME_UDCS
lolaconfig.STORAGE
lolaconfig.KUBERNETES_IMAGE
lolaconfig.KUBERNETES_LABELS
lolaconfig.SLACK_WEBHOOKS
lolaconfig.DW_CREDENTIALS
lolaconfig.TRINO_CREDENTIALS
lolaconfig.SSH_TUNNEL_CREDENTIALS
lolaconfig.PREFECT_HOST
# etc

# Your flow is different from the typical one?
# You can customize the behaviour of LolaConfig
lolaconfig = build_lolaconfig(
    flow_name="some-flow",
    env_s3_bucket="my-odd-bucket",
    kubernetes_labels=["some-unusual-label"],
    kubernetes_image="the-image:not-the-production-one",
)

Connections

Connect to a Trino server

from lolafect.connections import connect_to_trino, close_trino_connection

with Flow(...) as flow:
    connection = connect_to_trino.run(
        trino_credentials=my_trino_credentials # You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS
    )
    task_result = some_trino_related_task(trino_connection=connection)
    close_trino_connection.run(
        trino_connection=connection, 
        upstream_tasks=[task_result]
    )

Open an SSH tunnel

from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel

with Flow(...) as flow:
    # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
    tunnel = open_ssh_tunnel_with_s3_pkey( 
        s3_bucket_name="some-bucket",
        ssh_tunnel_credentials={...},
        remote_target_host="some-host-probably-mysql",
        remote_target_port=12345,
    )
    task_result = some_task_that_needs_the_tunnel(tunnel=tunnel)
    # Tunnel is now alive. tunnel.is_active == True
    close_ssh_tunnel(tunnel=tunnel, upstream_tasks=[task_result])  

Connect to a MySQL instance

from lolafect.connections import connect_to_mysql, close_mysql_connection

with Flow(...) as flow:
    connection = connect_to_mysql(
        mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
    )
    task_result = some_task_that_needs_mysql(connection=connection)
    close_mysql_connection(connection=connection, upstream_tasks=[task_result])

# Want to connect through an SSH tunnel? Open the tunnel normally and then
# override the host and port when connecting to MySQL.

from lolafect.connections import (
    open_ssh_tunnel_with_s3_pkey, 
    get_local_bind_address_from_ssh_tunnel,
    close_ssh_tunnel
)
    
with Flow(...) as flow:
    # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS
    tunnel = open_ssh_tunnel_with_s3_pkey( 
        s3_bucket_name="some-bucket",
        ssh_tunnel_credentials={...},
        remote_target_host="the-mysql-host",
        remote_target_port=3306,
    )

    connection = connect_to_mysql(
        mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS
        overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run(
            tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL
        ),
    )

    task_result = some_task_that_needs_mysql(connection=connection)
    
    mysql_closed = close_mysql_connection(connection=connection, upstream_tasks=[task_result])
    close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed])

Use Great Expectations

Run a Great Expectations validation on a MySQL query

from lolafect.data_testing import run_data_test_on_mysql

with Flow(...) as flow:
      
    my_query = """SELECT something FROM somewhere"""
    my_expectations = {...} # A bunch of things you want to validate on the result of the query
    
    validation_results = run_data_test_on_mysql(
        name="my-cool-validation",
        mysql_credentials={...},
        query=my_query,
        expectations=my_expectations
    )

    if not validation_results["success"]:
        print("The data is bad!!!")

Slack

Send a warning message to slack if your tasks fails

from prefect.triggers import any_failed
from lolafect.slack import SendSlackMessageTask

send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with 
#different triggers. For example, you can send a message when all tasks fail, or all tasks succeed

with Flow(...) as flow:
    crucial_task_result = some_crucial_task()

    send_warning_message_on_any_failure(
        webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS
        text_to_send="Watchout, the flow failed!",
        upstream_tasks=[crucial_task_result]
    )

How to test

There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The recommended policy is:

  • Use the unit tests in any CI process you want.
  • Use the unit tests frequently as you code.
  • Do not use the integration tests in CI processes.
  • Use the integration tests as milestone checks when finishing feature branches.
  • Make sure to ensure integration tests are working before making a new release.

When building new tests, please keep this philosophy in mind.

IDE-agnostic:

  1. Set up a virtual environment which contains both lolafect and the dependencies listed in requirements-dev.txt.
  2. Run:
    • For all tests: pytests tests
    • Only unit tests: pytest tests/test_unit
    • Only integration tests: pytest tests/test_integration

In Pycharm:

  • If you configure pytest as the project test runner, Pycharm will most probably autodetect the test folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that.