diff --git a/README.md b/README.md new file mode 100644 index 0000000..2ec44ac --- /dev/null +++ b/README.md @@ -0,0 +1,60 @@ +# Lolafect + +Lolafect is a collection of Python bits that help us build our Prefect flows. + +## Quickstart + +You can find below examples of how to leverage `lolafect` in your flows. + +**Let the `LolaConfig` object do the boilerplate env stuff for you** + +```python +from lolafect.lolaconfig import build_lolaconfig + +lolaconfig = build_lolaconfig( + flow_name="some-flow", + env_s3_bucket="bucket", + kubernetes_labels=["some_label"], + kubernetes_image="the-image:latest", +) + +# Now you can access all the env stuff from here +lolaconfig.FLOW_NAME +lolaconfig.FLOW_NAME_UDCS +lolaconfig.STORAGE +lolaconfig.KUBERNETES_IMAGE +lolaconfig.KUBERNETES_LABELS +lolaconfig.SLACK_WEBHOOKS +# etc +``` + +**Send a warning message to slack if your tasks fails** + +```python +from prefect.triggers import any_failed +from lolafect.slack import SendSlackMessageTask + +send_warning_message_on_any_failure = SendSlackMessageTask(trigger=any_failed) # You can generate other tasks with +#different triggers. For example, you can send a message when all tasks fail, or all tasks succeed + +with Flow(...) as flow: + crucial_task_result = some_crucial_task() + + send_warning_message_on_any_failure( + webhook_url="the-channel-webhook", # You can probably try to fetch this from lolaconfig.SLACK_WEBHOOKS + text_to_send="Watchout, the flow failed!", + upstream_tasks=[crucial_task_result] + ) +``` + +## How to test + +IDE-agnostic: + +1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`. +2. Run: `pytests tests` + +In Pycharm: + +- If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test + folder and allow you to run the test suite within the IDE. \ No newline at end of file diff --git a/lolafect/__init__.py b/lolafect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lolafect/__version__.py b/lolafect/__version__.py new file mode 100644 index 0000000..d82ddf9 --- /dev/null +++ b/lolafect/__version__.py @@ -0,0 +1 @@ +__version__="0.1.0" \ No newline at end of file diff --git a/lolafect/defaults.py b/lolafect/defaults.py new file mode 100644 index 0000000..e8397bc --- /dev/null +++ b/lolafect/defaults.py @@ -0,0 +1,5 @@ +DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows" +DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" +DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" +DEFAULT_KUBERNETES_LABELS = ["k8s"] +DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" diff --git a/lolafect/lolaconfig.py b/lolafect/lolaconfig.py new file mode 100644 index 0000000..e7eefda --- /dev/null +++ b/lolafect/lolaconfig.py @@ -0,0 +1,109 @@ +from typing import List + +from prefect.storage import S3 +import boto3 + +from lolafect.defaults import ( + DEFAULT_ENV_S3_BUCKET, + DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE, + DEFAULT_KUBERNETES_IMAGE, + DEFAULT_KUBERNETES_LABELS, + DEFAULT_FLOWS_PATH_IN_BUCKET, +) +from lolafect.utils import S3FileReader + + +class LolaConfig: + """ + A global-ish container for configurations required in pretty much all flows. + """ + + def __init__( + self, + flow_name: str, + env_s3_bucket: str = None, + kubernetes_labels: List = None, + kubernetes_image: str = None, + slack_webhooks_file: str = None, + ): + """ + Init and set defaults where no value was passed. + + :param flow_name: the name of the flow. + :param env_s3_bucket: the name of the S3 bucket where env vars should be + searched. + :param kubernetes_labels: labels to be passed to the kubernetes agent. + :param kubernetes_image: image to use when running through the kubernetes agent. + :param slack_webhooks_file: path to the slack webhooks file within the env + bucket. + """ + self.FLOW_NAME = flow_name + self.FLOW_NAME_UDCS = flow_name.replace("-", "_ ") + self.S3_BUCKET_NAME = ( + DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket + ) + self.SLACK_WEBHOOKS_FILE = ( + DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE + if slack_webhooks_file is None + else slack_webhooks_file + ) + self.SLACK_WEBHOOKS = None + self.STORAGE = S3( + bucket=self.S3_BUCKET_NAME, + key=DEFAULT_FLOWS_PATH_IN_BUCKET + self.FLOW_NAME + ".py", + stored_as_script=True, + ) + self.KUBERNETES_LABELS = ( + DEFAULT_KUBERNETES_LABELS + if kubernetes_labels is None + else kubernetes_labels + ) + self.KUBERNETES_IMAGE = ( + DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image + ) + + self._s3_reader = S3FileReader(s3_client=boto3.client("s3")) + + def fetch_slack_webhooks(self, s3_reader=None) -> None: + """ + Read the slack webhooks file from S3 and store the webhooks in memory. + + :param s3_reader: a client to fetch files from S3. + :return: None + """ + + if s3_reader is None: + s3_reader = self._s3_reader + + self.SLACK_WEBHOOKS = s3_reader.read_json_from_s3_file( + bucket=self.S3_BUCKET_NAME, key=self.SLACK_WEBHOOKS_FILE + ) + + +def build_lolaconfig( + flow_name: str, + env_s3_bucket: str = None, + kubernetes_labels: List = None, + kubernetes_image: str = None, +) -> LolaConfig: + """ + Build a LolaConfig instance from the passed params. + + :param flow_name: the name of the flow. + :param env_s3_bucket: the name of the S3 bucket where env vars should be + searched. + :param kubernetes_labels: labels to be passed to the kubernetes agent. + :param kubernetes_image: image to use when running through the kubernetes agent. + :return: a ready to use LolaConfig instance. + """ + + lolaconfig = LolaConfig( + flow_name=flow_name, + env_s3_bucket=env_s3_bucket, + kubernetes_labels=kubernetes_labels, + kubernetes_image=kubernetes_image, + ) + + lolaconfig.fetch_slack_webhooks() + + return lolaconfig diff --git a/lolafect/slack.py b/lolafect/slack.py new file mode 100644 index 0000000..71141f9 --- /dev/null +++ b/lolafect/slack.py @@ -0,0 +1,44 @@ +import json + +from prefect.core import Task +import requests + + +class SendSlackMessageTask(Task): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self, webhook_url: str, text_to_send: str) -> None: + """ + Pass the details from the task to the actual function. + + :param webhook_url: the URL of the Slack webhook that should receive the + message. + :param text_to_send: the text to send to the Slack channel. + :return: None + """ + send_message_to_slack_channel(webhook_url, text_to_send) + + +def send_message_to_slack_channel(webhook_url: str, text_to_send: str) -> None: + """ + Send an HTTP POST to a Slack webhook to deliver a message in a channel. Raise + an error if Slack does not reply with a 200 OK status. + + :param webhook_url: the URL of the Slack webhook that should receive the + message. + :param text_to_send: the text to send to the Slack channel. + :return: None + """ + + slack_data = {"text": text_to_send} + response = requests.post( + webhook_url, + data=json.dumps(slack_data), + headers={"Content-Type": "application/json"}, + ) + if response.status_code != 200: + raise ValueError( + "Request to slack returned an error %s, the response is:\n%s" + % (response.status_code, response.text) + ) diff --git a/lolafect/utils.py b/lolafect/utils.py new file mode 100644 index 0000000..621e59d --- /dev/null +++ b/lolafect/utils.py @@ -0,0 +1,24 @@ +import json + + +class S3FileReader: + """ + An S3 client along with a few reading utils. + """ + + def __init__(self, s3_client): + self.s3_client = s3_client + + def read_json_from_s3_file(self, bucket: str, key: str) -> dict: + """ + Read a JSON file from an S3 location and return contents as a dict. + + :param bucket: the name of the bucket where the file is stored. + :param key: the path to the file within the bucket. + :return: the file contents. + """ + return json.loads( + self.s3_client.get_object(Bucket=bucket, Key=key)["Body"] + .read() + .decode("utf-8") + ) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..40f6a2a --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,5 @@ +prefect==1.2.2 +requests==2.28.1 +boto3==1.26.40 +pytest==7.2.0 +httpretty==1.1.4 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1b6c89e --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +import pathlib + +from setuptools import setup + +about = {} +here = pathlib.Path(__file__).absolute() +with open("lolafect/__version__.py", "r") as f: + exec(f.read(), about) + +with open("README.md", "r") as f: + readme = f.read() + +setup( + name="lolafect", + version=about["__version__"], + description="Lolafect is a collection of Python bits that help us build our Prefect flows.", + long_description=readme, + long_description_content_type="text/markdown", + author="data-team", + author_email="data@lolamarket.com", + url="https://github.com/lolamarket/data-lolafect", + packages=["lolafect"], + package_dir={"lolafect": "lolafect"}, + include_package_data=True, + python_requires=">=3.7", + install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"], +) diff --git a/tests/test_lolaconfig.py b/tests/test_lolaconfig.py new file mode 100644 index 0000000..742de77 --- /dev/null +++ b/tests/test_lolaconfig.py @@ -0,0 +1,35 @@ +from types import SimpleNamespace + +from lolafect.lolaconfig import LolaConfig + + +def test_lolaconfig_instantiates_with_defaults_properly(): + + lolaconfig = LolaConfig(flow_name="some-flow") + + +def test_lolaconfig_instantiates_without_defaults_proplery(): + + lolaconfig = LolaConfig( + flow_name="some-flow", + env_s3_bucket="bucket", + kubernetes_labels=["some_label"], + kubernetes_image="loliloli:latest", + slack_webhooks_file="the_file/is/here.json", + ) + + +def test_lolaconfig_fetches_webhooks_properly(): + + lolaconfig = LolaConfig(flow_name="some-flow") + + fake_s3_reader = SimpleNamespace() + + def mock_read_json_from_s3_file(bucket, key): + return {"a-channel-name": "a-channel-url.com"} + + fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file + + lolaconfig.fetch_slack_webhooks(s3_reader=fake_s3_reader) + + assert type(lolaconfig.SLACK_WEBHOOKS) is dict diff --git a/tests/test_slack.py b/tests/test_slack.py new file mode 100644 index 0000000..ac294fd --- /dev/null +++ b/tests/test_slack.py @@ -0,0 +1,65 @@ +import json + +import httpretty +import pytest + +from lolafect.slack import send_message_to_slack_channel, SendSlackMessageTask + + +@httpretty.activate(allow_net_connect=False, verbose=True) +def test_send_message_to_slack_channel_works_properly(): + mock_webhook_url = "http://the-webhook-url.com" + mock_message_to_channel = "Hi there!" + httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=200) + + send_message_to_slack_channel( + webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel + ) + + request_observed_by_server = httpretty.last_request() + + assert (request_observed_by_server.method == "POST") and ( + json.loads(request_observed_by_server.body.decode("UTF-8")) + == {"text": mock_message_to_channel} + ) + + +@httpretty.activate(allow_net_connect=False, verbose=True) +def test_send_message_to_slack_channel_response_400_raises_exception(): + mock_webhook_url = "http://the-webhook-url.com" + mock_message_to_channel = "Hi there!" + httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=400) + + with pytest.raises(ValueError): + send_message_to_slack_channel( + webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel + ) + + +@httpretty.activate(allow_net_connect=False, verbose=True) +def test_slack_task_reaches_server_successfully(): + mock_webhook_url = "http://the-webhook-url.com" + mock_message_to_channel = "Hi there!" + httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=200) + + slack_task = SendSlackMessageTask() + slack_task.run(webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel) + + request_observed_by_server = httpretty.last_request() + assert (request_observed_by_server.method == "POST") and ( + json.loads(request_observed_by_server.body.decode("UTF-8")) + == {"text": mock_message_to_channel} + ) + + +@httpretty.activate(allow_net_connect=False, verbose=True) +def test_slack_task_raises_value_error(): + mock_webhook_url = "http://the-webhook-url.com" + mock_message_to_channel = "Hi there!" + httpretty.register_uri(method=httpretty.POST, uri=mock_webhook_url, status=400) + + with pytest.raises(ValueError): + slack_task = SendSlackMessageTask() + slack_task.run( + webhook_url=mock_webhook_url, text_to_send=mock_message_to_channel + )