# Lolafect Lolafect is a collection of Python bits that help us build our Prefect flows. ## Quickstart You can find below examples of how to leverage `lolafect` in your flows. **_Note: the code excerpts below are simplified for brevity and won't run as-is. If you want to see perfect examples, you might want to check the tests in this repository._** ### Config **Let the `LolaConfig` object do the boilerplate env stuff for you** ```python from lolafect.lolaconfig import build_lolaconfig lolaconfig = build_lolaconfig(flow_name="some-flow") # Now you can access all the env stuff from here lolaconfig.FLOW_NAME lolaconfig.FLOW_NAME_UDCS lolaconfig.STORAGE lolaconfig.KUBERNETES_IMAGE lolaconfig.KUBERNETES_LABELS lolaconfig.SLACK_WEBHOOKS lolaconfig.DW_CREDENTIALS lolaconfig.TRINO_CREDENTIALS lolaconfig.SSH_TUNNEL_CREDENTIALS lolaconfig.PREFECT_HOST # etc # Your flow is different from the typical one? # You can customize the behaviour of LolaConfig lolaconfig = build_lolaconfig( flow_name="some-flow", env_s3_bucket="my-odd-bucket", kubernetes_labels=["some-unusual-label"], kubernetes_image="the-image:not-the-production-one", ) ``` ### Connections **Connect to a Trino server** ```python from lolafect.connections import connect_to_trino, close_trino_connection with Flow(...) as flow: connection = connect_to_trino.run( trino_credentials=my_trino_credentials # You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS ) task_result = some_trino_related_task(trino_connection=connection) close_trino_connection.run( trino_connection=connection, upstream_tasks=[task_result] ) ``` **Open an SSH tunnel** ```python from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel with Flow(...) as flow: # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS tunnel = open_ssh_tunnel_with_s3_pkey( s3_bucket_name="some-bucket", ssh_tunnel_credentials={...}, remote_target_host="some-host-probably-mysql", remote_target_port=12345, ) # Tunnel is now alive. tunnel.is_active == True close_ssh_tunnel(tunnel=tunnel) ``` **Connect to a MySQL instance** ```python from lolafect.connections import connect_to_mysql, close_mysql_connection with Flow(...) as flow: connection = connect_to_mysql.run( mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS ) connection.cursor().execute("SELECT 1") close_mysql_connection.run(connection=connection) # Want to connect through an SSH tunnel? Open the tunnel normally and then # override the host and port when connecting to MySQL. from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, get_local_bind_address_from_ssh_tunnel, close_ssh_tunnel ) with Flow(...) as flow: # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS tunnel = open_ssh_tunnel_with_s3_pkey( s3_bucket_name="some-bucket", ssh_tunnel_credentials={...}, remote_target_host="the-mysql-host", remote_target_port=3306, ) connection = connect_to_mysql.run( mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL ), ) connection.cursor().execute("SELECT 1") close_mysql_connection.run(connection=connection) close_ssh_tunnel.run(tunnel=tunnel) ``` ### Use Great Expectations **Run a Great Expectations validation on a MySQL query** ```python from lolafect.connections import connect_to_mysql from lolafect.data_testing import run_data_test_on_mysql with Flow(...) as flow: a_mysql_connection = connect_to_mysql(...) my_query = """SELECT something FROM somewhere""" my_expectations = {...} # A bunch of things you want to validate on the result of the query validation_results = run_validation_on_mysql( mysql_connection=a_mysql_connection, query=my_query, expectations=my_expectations ) ``` ### Slack **Send a warning message to slack if your tasks fails** ```python from prefect.triggers import any_failed from lolafect.slack import SendSlackMessageTask send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with #different triggers. For example, you can send a message when all tasks fail, or all tasks succeed with Flow(...) as flow: crucial_task_result = some_crucial_task() send_warning_message_on_any_failure( webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS text_to_send="Watchout, the flow failed!", upstream_tasks=[crucial_task_result] ) ``` ## How to test There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The recommended policy is: - Use the unit tests in any CI process you want. - Use the unit tests frequently as you code. - Do not use the integration tests in CI processes. - Use the integration tests as milestone checks when finishing feature branches. - Make sure to ensure integration tests are working before making a new release. When building new tests, please keep this philosophy in mind. IDE-agnostic: 1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`. 2. Run: - For all tests: `pytests tests` - Only unit tests: `pytest tests/test_unit` - Only integration tests: `pytest tests/test_integration` In Pycharm: - If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that.