diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..fecf8cc --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +# Change Log + +All notable changes to this project will be documented in this file. + +## [Unreleased] + +### Added + +- `LolaConfig` can now fetch the DW credentials from S3. +- `LolaConfig` can now fetch the SSH tunnel from S3. +- `LolaConfig` can now fetch the Trino credentials from S3. +- `LolaConfig` can now fetch the Prefect Host from S3. +- `build_lolaconfig` executes all of the above when called. diff --git a/README.md b/README.md index 2ec44ac..e7b7994 100644 --- a/README.md +++ b/README.md @@ -11,12 +11,7 @@ You can find below examples of how to leverage `lolafect` in your flows. ```python from lolafect.lolaconfig import build_lolaconfig -lolaconfig = build_lolaconfig( - flow_name="some-flow", - env_s3_bucket="bucket", - kubernetes_labels=["some_label"], - kubernetes_image="the-image:latest", -) +lolaconfig = build_lolaconfig(flow_name="some-flow") # Now you can access all the env stuff from here lolaconfig.FLOW_NAME @@ -25,7 +20,20 @@ lolaconfig.STORAGE lolaconfig.KUBERNETES_IMAGE lolaconfig.KUBERNETES_LABELS lolaconfig.SLACK_WEBHOOKS +lolaconfig.DW_CREDENTIALS +lolaconfig.TRINO_CREDENTIALS +lolaconfig.SSH_TUNNEL_CREDENTIALS +lolaconfig.PREFECT_HOST # etc + +# Your flow is different from the typical one? +# You can customize the behaviour of LolaConfig +lolaconfig = build_lolaconfig( + flow_name="some-flow", + env_s3_bucket="my-odd-bucket", + kubernetes_labels=["some-unusual-label"], + kubernetes_image="the-image:not-the-production-one", +) ``` **Send a warning message to slack if your tasks fails** diff --git a/lolafect/defaults.py b/lolafect/defaults.py index e8397bc..feb66e4 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -1,4 +1,5 @@ DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows" +DEFAULT_ENV_FILE_PATH="env/env_prd.json" DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_LABELS = ["k8s"] diff --git a/lolafect/lolaconfig.py b/lolafect/lolaconfig.py index e7eefda..1a645c4 100644 --- a/lolafect/lolaconfig.py +++ b/lolafect/lolaconfig.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Callable from prefect.storage import S3 import boto3 @@ -9,10 +9,27 @@ from lolafect.defaults import ( DEFAULT_KUBERNETES_IMAGE, DEFAULT_KUBERNETES_LABELS, DEFAULT_FLOWS_PATH_IN_BUCKET, + DEFAULT_ENV_FILE_PATH, ) from lolafect.utils import S3FileReader +def _needs_env_data(method: Callable) -> Callable: + """ + Decorator to wrap methods from LolaConfig that depend on having the env + data file loaded in memory. + + :param method: the method that needs env data. + :return: the method, wrapped with a call to fetch the env data. + """ + + def wrapper(self, *args, **kwargs): + self.fetch_env_data(kwargs.get("s3_reader", None)) + return method(self, *args, **kwargs) + + return wrapper + + class LolaConfig: """ A global-ish container for configurations required in pretty much all flows. @@ -22,6 +39,7 @@ class LolaConfig: self, flow_name: str, env_s3_bucket: str = None, + env_file_path: str = None, kubernetes_labels: List = None, kubernetes_image: str = None, slack_webhooks_file: str = None, @@ -32,16 +50,20 @@ class LolaConfig: :param flow_name: the name of the flow. :param env_s3_bucket: the name of the S3 bucket where env vars should be searched. + :param env_file_path: the path to the environment file. :param kubernetes_labels: labels to be passed to the kubernetes agent. :param kubernetes_image: image to use when running through the kubernetes agent. :param slack_webhooks_file: path to the slack webhooks file within the env bucket. """ self.FLOW_NAME = flow_name - self.FLOW_NAME_UDCS = flow_name.replace("-", "_ ") + self.FLOW_NAME_UDCS = flow_name.replace("-", "_") self.S3_BUCKET_NAME = ( DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket ) + self.ENV_FILE_PATH = ( + DEFAULT_ENV_FILE_PATH if env_file_path is None else env_file_path + ) self.SLACK_WEBHOOKS_FILE = ( DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE if slack_webhooks_file is None @@ -62,8 +84,14 @@ class LolaConfig: DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image ) + self.ENV_DATA = None + self.TRINO_CREDENTIALS = None + self.SSH_TUNNEL_CREDENTIALS = None + self.DW_CREDENTIALS = None + self._s3_reader = S3FileReader(s3_client=boto3.client("s3")) + @_needs_env_data def fetch_slack_webhooks(self, s3_reader=None) -> None: """ Read the slack webhooks file from S3 and store the webhooks in memory. @@ -79,6 +107,84 @@ class LolaConfig: bucket=self.S3_BUCKET_NAME, key=self.SLACK_WEBHOOKS_FILE ) + @_needs_env_data + def fetch_trino_credentials(self, s3_reader=None) -> None: + """ + Read the env file from S3 and store the trino credentials in memory. + + :param s3_reader: a client to fetch files from S3. + :return: None + """ + + self.TRINO_CREDENTIALS = { + "host": self.ENV_DATA["trino_host"], + "user": self.ENV_DATA["trino_user"], + "password": self.ENV_DATA["trino_pass"], + "port": self.ENV_DATA["trino_port"], + } + + @_needs_env_data + def fetch_ssh_tunnel_credentials(self, s3_reader=None) -> None: + """ + Read the env file from S3 and store the SSH tunnel credentials. + + :param s3_reader: a client to fetch files from S3. + :return: None + """ + + self.SSH_TUNNEL_CREDENTIALS = { + "path_to_ssh_pkey": self.ENV_DATA["pt_ssh_pkey_path"], + "ssh_pkey_password": self.ENV_DATA["pt_ssh_pkey_passphrase"], + "ssh_username": self.ENV_DATA["pt_ssh_username"], + "ssh_port": self.ENV_DATA["pt_ssh_jumphost_port"], + "ssh_jumphost": self.ENV_DATA["pt_ssh_jumphost"], + } + + @_needs_env_data + def fetch_dw_credentials(self, s3_reader=None) -> None: + """ + Read the env file from S3 and store the DW credentials. + + :param s3_reader: a client to fetch files from S3. + :return: None + """ + + self.DW_CREDENTIALS = { + "host": self.ENV_DATA["datadw_host"], + "user": self.ENV_DATA["datadw_user"], + "password": self.ENV_DATA["datadw_pass"], + "port": self.ENV_DATA["datadw_port"], + } + + @_needs_env_data + def fetch_prefect_host(self, s3_reader=None) -> None: + """ + Read the env file from S3 and store the prefect_host. + + :param s3_reader: a client to fetch files from S3. + :return: None + """ + + self.PREFECT_HOST = self.ENV_DATA["prefect_host"] + + def fetch_env_data(self, s3_reader=None) -> None: + """ + Read the env file from S3 with the default or a passed s3_reader and + store the contents in the object. + + :param s3_reader: an optional s3_reader to use instead of the default + one. + :return: None + """ + if self.ENV_DATA is not None: + return + + if s3_reader is None: + s3_reader = self._s3_reader + self.ENV_DATA = s3_reader.read_json_from_s3_file( + bucket=self.S3_BUCKET_NAME, key=self.ENV_FILE_PATH + ) + def build_lolaconfig( flow_name: str, @@ -105,5 +211,9 @@ def build_lolaconfig( ) lolaconfig.fetch_slack_webhooks() + lolaconfig.fetch_trino_credentials() + lolaconfig.fetch_ssh_tunnel_credentials() + lolaconfig.fetch_dw_credentials() + lolaconfig.fetch_prefect_host() return lolaconfig diff --git a/tests/test_lolaconfig.py b/tests/test_lolaconfig.py index 742de77..64b6094 100644 --- a/tests/test_lolaconfig.py +++ b/tests/test_lolaconfig.py @@ -4,12 +4,10 @@ from lolafect.lolaconfig import LolaConfig def test_lolaconfig_instantiates_with_defaults_properly(): - lolaconfig = LolaConfig(flow_name="some-flow") def test_lolaconfig_instantiates_without_defaults_proplery(): - lolaconfig = LolaConfig( flow_name="some-flow", env_s3_bucket="bucket", @@ -20,7 +18,6 @@ def test_lolaconfig_instantiates_without_defaults_proplery(): def test_lolaconfig_fetches_webhooks_properly(): - lolaconfig = LolaConfig(flow_name="some-flow") fake_s3_reader = SimpleNamespace() @@ -33,3 +30,81 @@ def test_lolaconfig_fetches_webhooks_properly(): lolaconfig.fetch_slack_webhooks(s3_reader=fake_s3_reader) assert type(lolaconfig.SLACK_WEBHOOKS) is dict + + +def test_lolaconfig_fetches_trino_creds_properly(): + lolaconfig = LolaConfig(flow_name="some-flow") + + fake_s3_reader = SimpleNamespace() + + def mock_read_json_from_s3_file(bucket, key): + return { + "trino_host": "some_host", + "trino_user": "some_user", + "trino_pass": "some_password", + "trino_port": "some_port", + } + + fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file + + lolaconfig.fetch_trino_credentials(s3_reader=fake_s3_reader) + + assert type(lolaconfig.TRINO_CREDENTIALS) is dict + + +def test_lolaconfig_fetches_ssh_tunnel_creds_properly(): + lolaconfig = LolaConfig(flow_name="some-flow") + + fake_s3_reader = SimpleNamespace() + + def mock_read_json_from_s3_file(bucket, key): + return { + "pt_ssh_pkey_path": "some-path", + "pt_ssh_pkey_passphrase": "some-password", + "pt_ssh_username": "some-username", + "pt_ssh_jumphost_port": "some-port", + "pt_ssh_jumphost": "some-jumphost", + } + + fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file + + lolaconfig.fetch_ssh_tunnel_credentials(s3_reader=fake_s3_reader) + + assert type(lolaconfig.SSH_TUNNEL_CREDENTIALS) is dict + + +def test_lolaconfig_fetches_dw_creds_properly(): + lolaconfig = LolaConfig(flow_name="some-flow") + + fake_s3_reader = SimpleNamespace() + + def mock_read_json_from_s3_file(bucket, key): + return { + "datadw_host": "some_host", + "datadw_user": "some_user", + "datadw_pass": "some_password", + "datadw_port": "some_port", + } + + fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file + + lolaconfig.fetch_dw_credentials(s3_reader=fake_s3_reader) + + assert type(lolaconfig.DW_CREDENTIALS) is dict + + +def test_lolaconfig_fetches_prefect_host_properly(): + lolaconfig = LolaConfig(flow_name="some-flow") + + fake_s3_reader = SimpleNamespace() + + def mock_read_json_from_s3_file(bucket, key): + return { + "prefect_host": "some_host", + } + + fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file + + lolaconfig.fetch_prefect_host(s3_reader=fake_s3_reader) + + assert type(lolaconfig.PREFECT_HOST) is str