commit
d0dcff8a3f
11 changed files with 375 additions and 0 deletions
60
README.md
Normal file
60
README.md
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
# Lolafect
|
||||
|
||||
Lolafect is a collection of Python bits that help us build our Prefect flows.
|
||||
|
||||
## Quickstart
|
||||
|
||||
You can find below examples of how to leverage `lolafect` in your flows.
|
||||
|
||||
**Let the `LolaConfig` object do the boilerplate env stuff for you**
|
||||
|
||||
```python
|
||||
from lolafect.lolaconfig import build_lolaconfig
|
||||
|
||||
lolaconfig = build_lolaconfig(
|
||||
flow_name="some-flow",
|
||||
env_s3_bucket="bucket",
|
||||
kubernetes_labels=["some_label"],
|
||||
kubernetes_image="the-image:latest",
|
||||
)
|
||||
|
||||
# Now you can access all the env stuff from here
|
||||
lolaconfig.FLOW_NAME
|
||||
lolaconfig.FLOW_NAME_UDCS
|
||||
lolaconfig.STORAGE
|
||||
lolaconfig.KUBERNETES_IMAGE
|
||||
lolaconfig.KUBERNETES_LABELS
|
||||
lolaconfig.SLACK_WEBHOOKS
|
||||
# etc
|
||||
```
|
||||
|
||||
**Send a warning message to slack if your tasks fails**
|
||||
|
||||
```python
|
||||
from prefect.triggers import any_failed
|
||||
from lolafect.slack import SendSlackMessageTask
|
||||
|
||||
send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with
|
||||
#different triggers. For example, you can send a message when all tasks fail, or all tasks succeed
|
||||
|
||||
with Flow(...) as flow:
|
||||
crucial_task_result = some_crucial_task()
|
||||
|
||||
send_warning_message_on_any_failure(
|
||||
webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS
|
||||
text_to_send="Watchout, the flow failed!",
|
||||
upstream_tasks=[crucial_task_result]
|
||||
)
|
||||
```
|
||||
|
||||
## How to test
|
||||
|
||||
IDE-agnostic:
|
||||
|
||||
1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`.
|
||||
2. Run: `pytests tests`
|
||||
|
||||
In Pycharm:
|
||||
|
||||
- If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test
|
||||
folder and allow you to run the test suite within the IDE.
|
||||
0
lolafect/__init__.py
Normal file
0
lolafect/__init__.py
Normal file
1
lolafect/__version__.py
Normal file
1
lolafect/__version__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
__version__="0.1.0"
|
||||
5
lolafect/defaults.py
Normal file
5
lolafect/defaults.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows"
|
||||
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json"
|
||||
DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production"
|
||||
DEFAULT_KUBERNETES_LABELS = ["k8s"]
|
||||
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"
|
||||
109
lolafect/lolaconfig.py
Normal file
109
lolafect/lolaconfig.py
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
from typing import List
|
||||
|
||||
from prefect.storage import S3
|
||||
import boto3
|
||||
|
||||
from lolafect.defaults import (
|
||||
DEFAULT_ENV_S3_BUCKET,
|
||||
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE,
|
||||
DEFAULT_KUBERNETES_IMAGE,
|
||||
DEFAULT_KUBERNETES_LABELS,
|
||||
DEFAULT_FLOWS_PATH_IN_BUCKET,
|
||||
)
|
||||
from lolafect.utils import S3FileReader
|
||||
|
||||
|
||||
class LolaConfig:
|
||||
"""
|
||||
A global-ish container for configurations required in pretty much all flows.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
flow_name: str,
|
||||
env_s3_bucket: str = None,
|
||||
kubernetes_labels: List = None,
|
||||
kubernetes_image: str = None,
|
||||
slack_webhooks_file: str = None,
|
||||
):
|
||||
"""
|
||||
Init and set defaults where no value was passed.
|
||||
|
||||
:param flow_name: the name of the flow.
|
||||
:param env_s3_bucket: the name of the S3 bucket where env vars should be
|
||||
searched.
|
||||
:param kubernetes_labels: labels to be passed to the kubernetes agent.
|
||||
:param kubernetes_image: image to use when running through the kubernetes agent.
|
||||
:param slack_webhooks_file: path to the slack webhooks file within the env
|
||||
bucket.
|
||||
"""
|
||||
self.FLOW_NAME = flow_name
|
||||
self.FLOW_NAME_UDCS = flow_name.replace("-", "_ ")
|
||||
self.S3_BUCKET_NAME = (
|
||||
DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket
|
||||
)
|
||||
self.SLACK_WEBHOOKS_FILE = (
|
||||
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE
|
||||
if slack_webhooks_file is None
|
||||
else slack_webhooks_file
|
||||
)
|
||||
self.SLACK_WEBHOOKS = None
|
||||
self.STORAGE = S3(
|
||||
bucket=self.S3_BUCKET_NAME,
|
||||
key=DEFAULT_FLOWS_PATH_IN_BUCKET + self.FLOW_NAME + ".py",
|
||||
stored_as_script=True,
|
||||
)
|
||||
self.KUBERNETES_LABELS = (
|
||||
DEFAULT_KUBERNETES_LABELS
|
||||
if kubernetes_labels is None
|
||||
else kubernetes_labels
|
||||
)
|
||||
self.KUBERNETES_IMAGE = (
|
||||
DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image
|
||||
)
|
||||
|
||||
self._s3_reader = S3FileReader(s3_client=boto3.client("s3"))
|
||||
|
||||
def fetch_slack_webhooks(self, s3_reader=None) -> None:
|
||||
"""
|
||||
Read the slack webhooks file from S3 and store the webhooks in memory.
|
||||
|
||||
:param s3_reader: a client to fetch files from S3.
|
||||
:return: None
|
||||
"""
|
||||
|
||||
if s3_reader is None:
|
||||
s3_reader = self._s3_reader
|
||||
|
||||
self.SLACK_WEBHOOKS = s3_reader.read_json_from_s3_file(
|
||||
bucket=self.S3_BUCKET_NAME, key=self.SLACK_WEBHOOKS_FILE
|
||||
)
|
||||
|
||||
|
||||
def build_lolaconfig(
|
||||
flow_name: str,
|
||||
env_s3_bucket: str = None,
|
||||
kubernetes_labels: List = None,
|
||||
kubernetes_image: str = None,
|
||||
) -> LolaConfig:
|
||||
"""
|
||||
Build a LolaConfig instance from the passed params.
|
||||
|
||||
:param flow_name: the name of the flow.
|
||||
:param env_s3_bucket: the name of the S3 bucket where env vars should be
|
||||
searched.
|
||||
:param kubernetes_labels: labels to be passed to the kubernetes agent.
|
||||
:param kubernetes_image: image to use when running through the kubernetes agent.
|
||||
:return: a ready to use LolaConfig instance.
|
||||
"""
|
||||
|
||||
lolaconfig = LolaConfig(
|
||||
flow_name=flow_name,
|
||||
env_s3_bucket=env_s3_bucket,
|
||||
kubernetes_labels=kubernetes_labels,
|
||||
kubernetes_image=kubernetes_image,
|
||||
)
|
||||
|
||||
lolaconfig.fetch_slack_webhooks()
|
||||
|
||||
return lolaconfig
|
||||
44
lolafect/slack.py
Normal file
44
lolafect/slack.py
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
import json
|
||||
|
||||
from prefect.core import Task
|
||||
import requests
|
||||
|
||||
|
||||
class SendSlackMessageTask(Task):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def run(self, webhook_url: str, text_to_send: str) -> None:
|
||||
"""
|
||||
Pass the details from the task to the actual function.
|
||||
|
||||
:param webhook_url: the URL of the Slack webhook that should receive the
|
||||
message.
|
||||
:param text_to_send: the text to send to the Slack channel.
|
||||
:return: None
|
||||
"""
|
||||
send_message_to_slack_channel(webhook_url, text_to_send)
|
||||
|
||||
|
||||
def send_message_to_slack_channel(webhook_url: str, text_to_send: str) -> None:
|
||||
"""
|
||||
Send an HTTP POST to a Slack webhook to deliver a message in a channel. Raise
|
||||
an error if Slack does not reply with a 200 OK status.
|
||||
|
||||
:param webhook_url: the URL of the Slack webhook that should receive the
|
||||
message.
|
||||
:param text_to_send: the text to send to the Slack channel.
|
||||
:return: None
|
||||
"""
|
||||
|
||||
slack_data = {"text": text_to_send}
|
||||
response = requests.post(
|
||||
webhook_url,
|
||||
data=json.dumps(slack_data),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(
|
||||
"Request to slack returned an error %s, the response is:\n%s"
|
||||
% (response.status_code, response.text)
|
||||
)
|
||||
24
lolafect/utils.py
Normal file
24
lolafect/utils.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
import json
|
||||
|
||||
|
||||
class S3FileReader:
|
||||
"""
|
||||
An S3 client along with a few reading utils.
|
||||
"""
|
||||
|
||||
def __init__(self, s3_client):
|
||||
self.s3_client = s3_client
|
||||
|
||||
def read_json_from_s3_file(self, bucket: str, key: str) -> dict:
|
||||
"""
|
||||
Read a JSON file from an S3 location and return contents as a dict.
|
||||
|
||||
:param bucket: the name of the bucket where the file is stored.
|
||||
:param key: the path to the file within the bucket.
|
||||
:return: the file contents.
|
||||
"""
|
||||
return json.loads(
|
||||
self.s3_client.get_object(Bucket=bucket, Key=key)["Body"]
|
||||
.read()
|
||||
.decode("utf-8")
|
||||
)
|
||||
5
requirements-dev.txt
Normal file
5
requirements-dev.txt
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
prefect==1.2.2
|
||||
requests==2.28.1
|
||||
boto3==1.26.40
|
||||
pytest==7.2.0
|
||||
httpretty==1.1.4
|
||||
27
setup.py
Normal file
27
setup.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
import pathlib
|
||||
|
||||
from setuptools import setup
|
||||
|
||||
about = {}
|
||||
here = pathlib.Path(__file__).absolute()
|
||||
with open("lolafect/__version__.py", "r") as f:
|
||||
exec(f.read(), about)
|
||||
|
||||
with open("README.md", "r") as f:
|
||||
readme = f.read()
|
||||
|
||||
setup(
|
||||
name="lolafect",
|
||||
version=about["__version__"],
|
||||
description="Lolafect is a collection of Python bits that help us build our Prefect flows.",
|
||||
long_description=readme,
|
||||
long_description_content_type="text/markdown",
|
||||
author="data-team",
|
||||
author_email="data@lolamarket.com",
|
||||
url="https://github.com/lolamarket/data-lolafect",
|
||||
packages=["lolafect"],
|
||||
package_dir={"lolafect": "lolafect"},
|
||||
include_package_data=True,
|
||||
python_requires=">=3.7",
|
||||
install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"],
|
||||
)
|
||||
35
tests/test_lolaconfig.py
Normal file
35
tests/test_lolaconfig.py
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
from types import SimpleNamespace
|
||||
|
||||
from lolafect.lolaconfig import LolaConfig
|
||||
|
||||
|
||||
def test_lolaconfig_instantiates_with_defaults_properly():
|
||||
|
||||
lolaconfig = LolaConfig(flow_name="some-flow")
|
||||
|
||||
|
||||
def test_lolaconfig_instantiates_without_defaults_proplery():
|
||||
|
||||
lolaconfig = LolaConfig(
|
||||
flow_name="some-flow",
|
||||
env_s3_bucket="bucket",
|
||||
kubernetes_labels=["some_label"],
|
||||
kubernetes_image="loliloli:latest",
|
||||
slack_webhooks_file="the_file/is/here.json",
|
||||
)
|
||||
|
||||
|
||||
def test_lolaconfig_fetches_webhooks_properly():
|
||||
|
||||
lolaconfig = LolaConfig(flow_name="some-flow")
|
||||
|
||||
fake_s3_reader = SimpleNamespace()
|
||||
|
||||
def mock_read_json_from_s3_file(bucket, key):
|
||||
return {"a-channel-name": "a-channel-url.com"}
|
||||
|
||||
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
|
||||
|
||||
lolaconfig.fetch_slack_webhooks(s3_reader=fake_s3_reader)
|
||||
|
||||
assert type(lolaconfig.SLACK_WEBHOOKS) is dict
|
||||
65
tests/test_slack.py
Normal file
65
tests/test_slack.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
import json
|
||||
|
||||
import httpretty
|
||||
import pytest
|
||||
|
||||
from lolafect.slack import send_message_to_slack_channel, SendSlackMessageTask
|
||||
|
||||
|
||||
@httpretty.activate(allow_net_connect=False, verbose=True)
|
||||
def test_send_message_to_slack_channel_works_properly():
|
||||
mock_webhook_url = "http://the-webhook-url.com"
|
||||
mock_message_to_channel = "Hi there!"
|
||||
httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=200)
|
||||
|
||||
send_message_to_slack_channel(
|
||||
webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel
|
||||
)
|
||||
|
||||
request_observed_by_server = httpretty.last_request()
|
||||
|
||||
assert (request_observed_by_server.method == "POST") and (
|
||||
json.loads(request_observed_by_server.body.decode("UTF-8"))
|
||||
== {"text": mock_message_to_channel}
|
||||
)
|
||||
|
||||
|
||||
@httpretty.activate(allow_net_connect=False, verbose=True)
|
||||
def test_send_message_to_slack_channel_response_400_raises_exception():
|
||||
mock_webhook_url = "http://the-webhook-url.com"
|
||||
mock_message_to_channel = "Hi there!"
|
||||
httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=400)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
send_message_to_slack_channel(
|
||||
webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel
|
||||
)
|
||||
|
||||
|
||||
@httpretty.activate(allow_net_connect=False, verbose=True)
|
||||
def test_slack_task_reaches_server_successfully():
|
||||
mock_webhook_url = "http://the-webhook-url.com"
|
||||
mock_message_to_channel = "Hi there!"
|
||||
httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=200)
|
||||
|
||||
slack_task = SendSlackMessageTask()
|
||||
slack_task.run(webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel)
|
||||
|
||||
request_observed_by_server = httpretty.last_request()
|
||||
assert (request_observed_by_server.method == "POST") and (
|
||||
json.loads(request_observed_by_server.body.decode("UTF-8"))
|
||||
== {"text": mock_message_to_channel}
|
||||
)
|
||||
|
||||
|
||||
@httpretty.activate(allow_net_connect=False, verbose=True)
|
||||
def test_slack_task_raises_value_error():
|
||||
mock_webhook_url = "http://the-webhook-url.com"
|
||||
mock_message_to_channel = "Hi there!"
|
||||
httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=400)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
slack_task = SendSlackMessageTask()
|
||||
slack_task.run(
|
||||
webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue