LolaConfig and defaults

This commit is contained in:
Pablo Martin 2022-12-30 15:25:56 +01:00
parent e81707f177
commit f1ed3832e5
2 changed files with 112 additions and 0 deletions

5
lolafect/defaults.py Normal file
View file

@ -0,0 +1,5 @@
DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows"
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json"
DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production"
DEFAULT_KUBERNETES_LABELS = ["k8s"]
DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/"

107
lolafect/lolaconfig.py Normal file
View file

@ -0,0 +1,107 @@
import json
from typing import List
from prefect.storage import S3
import boto3
from lolafect.defaults import (
DEFAULT_ENV_S3_BUCKET,
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE,
DEFAULT_KUBERNETES_IMAGE,
DEFAULT_KUBERNETES_LABELS,
DEFAULT_FLOWS_PATH_IN_BUCKET,
)
class LolaConfig:
"""
A global-ish container for configurations required in pretty much all flows.
"""
def __init__(
self,
flow_name: str,
env_s3_bucket: str = None,
kubernetes_labels: List = None,
kubernetes_image: str = None,
slack_webhooks_file: str = None,
):
"""
Init and set defaults where no value was passed.
:param flow_name: the name of the flow.
:param env_s3_bucket: the name of the S3 bucket where env vars should be
searched.
:param kubernetes_labels: labels to be passed to the kubernetes agent.
:param kubernetes_image: image to use when running through the kubernetes agent.
:param slack_webhooks_file: path to the slack webhooks file within the env
bucket.
"""
self.FLOW_NAME = flow_name
self.FLOW_NAME_UDCS = flow_name.replace("-", "_ ")
self.S3_BUCKET_NAME = (
DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket
)
self.SLACK_WEBHOOKS_FILE = (
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE
if slack_webhooks_file is None
else slack_webhooks_file
)
self.SLACK_WEBHOOKS = None
self.STORAGE = S3(
bucket=self.S3_BUCKET_NAME,
key=DEFAULT_FLOWS_PATH_IN_BUCKET + self.FLOW_NAME + ".py",
stored_as_script=True,
)
self.KUBERNETES_LABELS = (
DEFAULT_KUBERNETES_LABELS
if kubernetes_labels is None
else kubernetes_labels
)
self.KUBERNETES_IMAGE = (
DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image
)
def fetch_slack_webhooks(self) -> None:
"""
Read the slack webhooks file from S3 and store the webhooks in memory.
:return: None
"""
self.SLACK_WEBHOOKS = json.loads(
boto3.client("s3")
.get_object(Bucket=self.S3_BUCKET_NAME, Key=self.SLACK_WEBHOOKS_FILE)[
"Body"
]
.read()
.decode("utf-8")
)
def build_lolaconfig(
flow_name: str,
env_s3_bucket: str = None,
kubernetes_labels: List = None,
kubernetes_image: str = None,
) -> LolaConfig:
"""
Build a LolaConfig instance from the passed params.
:param flow_name: the name of the flow.
:param env_s3_bucket: the name of the S3 bucket where env vars should be
searched.
:param kubernetes_labels: labels to be passed to the kubernetes agent.
:param kubernetes_image: image to use when running through the kubernetes agent.
:return: a ready to use LolaConfig instance.
"""
lolaconfig = LolaConfig(
flow_name=flow_name,
env_s3_bucket=env_s3_bucket,
kubernetes_labels=kubernetes_labels,
kubernetes_image=kubernetes_image,
)
lolaconfig.fetch_slack_webhooks()
return lolaconfig