223 lines
7.1 KiB
Python
223 lines
7.1 KiB
Python
from typing import List, Callable
|
|
|
|
from prefect.storage import S3
|
|
import boto3
|
|
|
|
from lolafect.defaults import (
|
|
DEFAULT_ENV_S3_BUCKET,
|
|
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE,
|
|
DEFAULT_KUBERNETES_IMAGE,
|
|
DEFAULT_KUBERNETES_LABELS,
|
|
DEFAULT_FLOWS_PATH_IN_BUCKET,
|
|
DEFAULT_ENV_FILE_PATH,
|
|
)
|
|
from lolafect.utils import S3FileReader
|
|
|
|
|
|
def _needs_env_data(method: Callable) -> Callable:
|
|
"""
|
|
Decorator to wrap methods from LolaConfig that depend on having the env
|
|
data file loaded in memory.
|
|
|
|
:param method: the method that needs env data.
|
|
:return: the method, wrapped with a call to fetch the env data.
|
|
"""
|
|
|
|
def wrapper(self, *args, **kwargs):
|
|
# Many methods that use this decorator used to provide the option of passing in
|
|
# an s3_reader different than the default one. Here, we check if the kwargs
|
|
# contain one and, if so, use that instead of the default one.
|
|
self.fetch_env_data(kwargs.get("s3_reader", None))
|
|
return method(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class LolaConfig:
|
|
"""
|
|
A global-ish container for configurations required in pretty much all flows.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
flow_name: str,
|
|
env_s3_bucket: str = None,
|
|
env_file_path: str = None,
|
|
kubernetes_labels: List = None,
|
|
kubernetes_image: str = None,
|
|
slack_webhooks_file: str = None,
|
|
):
|
|
"""
|
|
Init and set defaults where no value was passed.
|
|
|
|
:param flow_name: the name of the flow.
|
|
:param env_s3_bucket: the name of the S3 bucket where env vars should be
|
|
searched.
|
|
:param env_file_path: the path to the environment file.
|
|
:param kubernetes_labels: labels to be passed to the kubernetes agent.
|
|
:param kubernetes_image: image to use when running through the kubernetes agent.
|
|
:param slack_webhooks_file: path to the slack webhooks file within the env
|
|
bucket.
|
|
"""
|
|
self.FLOW_NAME = flow_name
|
|
self.FLOW_NAME_UDCS = flow_name.replace("-", "_")
|
|
self.S3_BUCKET_NAME = (
|
|
DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket
|
|
)
|
|
self.ENV_FILE_PATH = (
|
|
DEFAULT_ENV_FILE_PATH if env_file_path is None else env_file_path
|
|
)
|
|
self.SLACK_WEBHOOKS_FILE = (
|
|
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE
|
|
if slack_webhooks_file is None
|
|
else slack_webhooks_file
|
|
)
|
|
self.SLACK_WEBHOOKS = None
|
|
self.STORAGE = S3(
|
|
bucket=self.S3_BUCKET_NAME,
|
|
key=DEFAULT_FLOWS_PATH_IN_BUCKET + self.FLOW_NAME + ".py",
|
|
stored_as_script=True,
|
|
)
|
|
self.KUBERNETES_LABELS = (
|
|
DEFAULT_KUBERNETES_LABELS
|
|
if kubernetes_labels is None
|
|
else kubernetes_labels
|
|
)
|
|
self.KUBERNETES_IMAGE = (
|
|
DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image
|
|
)
|
|
|
|
self.ENV_DATA = None
|
|
self.TRINO_CREDENTIALS = None
|
|
self.SSH_TUNNEL_CREDENTIALS = None
|
|
self.DW_CREDENTIALS = None
|
|
|
|
self._s3_reader = S3FileReader(s3_client=boto3.client("s3"))
|
|
|
|
@_needs_env_data
|
|
def fetch_slack_webhooks(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the slack webhooks file from S3 and store the webhooks in memory.
|
|
|
|
:param s3_reader: a client to fetch files from S3.
|
|
:return: None
|
|
"""
|
|
|
|
if s3_reader is None:
|
|
s3_reader = self._s3_reader
|
|
|
|
self.SLACK_WEBHOOKS = s3_reader.read_json_from_s3_file(
|
|
bucket=self.S3_BUCKET_NAME, key=self.SLACK_WEBHOOKS_FILE
|
|
)
|
|
|
|
@_needs_env_data
|
|
def fetch_trino_credentials(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the env file from S3 and store the trino credentials in memory.
|
|
|
|
:param s3_reader: a client to fetch files from S3.
|
|
:return: None
|
|
"""
|
|
|
|
self.TRINO_CREDENTIALS = {
|
|
"host": self.ENV_DATA["trino_host"],
|
|
"user": self.ENV_DATA["trino_user"],
|
|
"password": self.ENV_DATA["trino_pass"],
|
|
"port": self.ENV_DATA["trino_port"],
|
|
}
|
|
|
|
@_needs_env_data
|
|
def fetch_ssh_tunnel_credentials(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the env file from S3 and store the SSH tunnel credentials.
|
|
|
|
:param s3_reader: a client to fetch files from S3.
|
|
:return: None
|
|
"""
|
|
|
|
self.SSH_TUNNEL_CREDENTIALS = {
|
|
"path_to_ssh_pkey": self.ENV_DATA["pt_ssh_pkey_path"],
|
|
"ssh_pkey_password": self.ENV_DATA["pt_ssh_pkey_passphrase"],
|
|
"ssh_username": self.ENV_DATA["pt_ssh_username"],
|
|
"ssh_port": self.ENV_DATA["pt_ssh_jumphost_port"],
|
|
"ssh_jumphost": self.ENV_DATA["pt_ssh_jumphost"],
|
|
}
|
|
|
|
@_needs_env_data
|
|
def fetch_dw_credentials(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the env file from S3 and store the DW credentials.
|
|
|
|
:param s3_reader: a client to fetch files from S3.
|
|
:return: None
|
|
"""
|
|
|
|
self.DW_CREDENTIALS = {
|
|
"host": self.ENV_DATA["datadw_host"],
|
|
"user": self.ENV_DATA["datadw_user"],
|
|
"password": self.ENV_DATA["datadw_pass"],
|
|
"port": self.ENV_DATA["datadw_port"],
|
|
"default_db": self.ENV_DATA["datadw_default_db"]
|
|
}
|
|
|
|
@_needs_env_data
|
|
def fetch_prefect_host(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the env file from S3 and store the prefect_host.
|
|
|
|
:param s3_reader: a client to fetch files from S3.
|
|
:return: None
|
|
"""
|
|
|
|
self.PREFECT_HOST = self.ENV_DATA["prefect_host"]
|
|
|
|
def fetch_env_data(self, s3_reader=None) -> None:
|
|
"""
|
|
Read the env file from S3 with the default or a passed s3_reader and
|
|
store the contents in the object.
|
|
|
|
:param s3_reader: an optional s3_reader to use instead of the default
|
|
one.
|
|
:return: None
|
|
"""
|
|
if self.ENV_DATA is not None:
|
|
return
|
|
|
|
if s3_reader is None:
|
|
s3_reader = self._s3_reader
|
|
self.ENV_DATA = s3_reader.read_json_from_s3_file(
|
|
bucket=self.S3_BUCKET_NAME, key=self.ENV_FILE_PATH
|
|
)
|
|
|
|
|
|
def build_lolaconfig(
|
|
flow_name: str,
|
|
env_s3_bucket: str = None,
|
|
kubernetes_labels: List = None,
|
|
kubernetes_image: str = None,
|
|
) -> LolaConfig:
|
|
"""
|
|
Build a LolaConfig instance from the passed params.
|
|
|
|
:param flow_name: the name of the flow.
|
|
:param env_s3_bucket: the name of the S3 bucket where env vars should be
|
|
searched.
|
|
:param kubernetes_labels: labels to be passed to the kubernetes agent.
|
|
:param kubernetes_image: image to use when running through the kubernetes agent.
|
|
:return: a ready to use LolaConfig instance.
|
|
"""
|
|
|
|
lolaconfig = LolaConfig(
|
|
flow_name=flow_name,
|
|
env_s3_bucket=env_s3_bucket,
|
|
kubernetes_labels=kubernetes_labels,
|
|
kubernetes_image=kubernetes_image,
|
|
)
|
|
|
|
lolaconfig.fetch_slack_webhooks()
|
|
lolaconfig.fetch_trino_credentials()
|
|
lolaconfig.fetch_ssh_tunnel_credentials()
|
|
lolaconfig.fetch_dw_credentials()
|
|
lolaconfig.fetch_prefect_host()
|
|
|
|
return lolaconfig
|