Merge pull request #4 from lolamarket/feature/fetch-db-credentials

Credentials and other ENV vars through `LolaConfig`
This commit is contained in:
pablolola 2023-01-18 17:21:33 +01:00 committed by GitHub
commit f8634360fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 218 additions and 11 deletions

13
CHANGELOG.md Normal file
View file

@ -0,0 +1,13 @@
# Change Log
All notable changes to this project will be documented in this file.
## [Unreleased]
### Added
- `LolaConfig` can now fetch the DW credentials from S3.
- `LolaConfig` can now fetch the SSH tunnel from S3.
- `LolaConfig` can now fetch the Trino credentials from S3.
- `LolaConfig` can now fetch the Prefect Host from S3.
- `build_lolaconfig` executes all of the above when called.

View file

@ -11,12 +11,7 @@ You can find below examples of how to leverage `lolafect` in your flows.
```python
from lolafect.lolaconfig import build_lolaconfig
lolaconfig = build_lolaconfig(
flow_name="some-flow",
env_s3_bucket="bucket",
kubernetes_labels=["some_label"],
kubernetes_image="the-image:latest",
)
lolaconfig = build_lolaconfig(flow_name="some-flow")
# Now you can access all the env stuff from here
lolaconfig.FLOW_NAME
@ -25,7 +20,20 @@ lolaconfig.STORAGE
lolaconfig.KUBERNETES_IMAGE
lolaconfig.KUBERNETES_LABELS
lolaconfig.SLACK_WEBHOOKS
lolaconfig.DW_CREDENTIALS
lolaconfig.TRINO_CREDENTIALS
lolaconfig.SSH_TUNNEL_CREDENTIALS
lolaconfig.PREFECT_HOST
# etc
# Your flow is different from the typical one?
# You can customize the behaviour of LolaConfig
lolaconfig = build_lolaconfig(
flow_name="some-flow",
env_s3_bucket="my-odd-bucket",
kubernetes_labels=["some-unusual-label"],
kubernetes_image="the-image:not-the-production-one",
)
```
**Send a warning message to slack if your tasks fails**

View file

@ -1,4 +1,5 @@
DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows"
DEFAULT_ENV_FILE_PATH="env/env_prd.json"
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json"
DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production"
DEFAULT_KUBERNETES_LABELS = ["k8s"]

View file

@ -1,4 +1,4 @@
from typing import List
from typing import List, Callable
from prefect.storage import S3
import boto3
@ -9,10 +9,27 @@ from lolafect.defaults import (
DEFAULT_KUBERNETES_IMAGE,
DEFAULT_KUBERNETES_LABELS,
DEFAULT_FLOWS_PATH_IN_BUCKET,
DEFAULT_ENV_FILE_PATH,
)
from lolafect.utils import S3FileReader
def _needs_env_data(method: Callable) -> Callable:
"""
Decorator to wrap methods from LolaConfig that depend on having the env
data file loaded in memory.
:param method: the method that needs env data.
:return: the method, wrapped with a call to fetch the env data.
"""
def wrapper(self, *args, **kwargs):
self.fetch_env_data(kwargs.get("s3_reader", None))
return method(self, *args, **kwargs)
return wrapper
class LolaConfig:
"""
A global-ish container for configurations required in pretty much all flows.
@ -22,6 +39,7 @@ class LolaConfig:
self,
flow_name: str,
env_s3_bucket: str = None,
env_file_path: str = None,
kubernetes_labels: List = None,
kubernetes_image: str = None,
slack_webhooks_file: str = None,
@ -32,16 +50,20 @@ class LolaConfig:
:param flow_name: the name of the flow.
:param env_s3_bucket: the name of the S3 bucket where env vars should be
searched.
:param env_file_path: the path to the environment file.
:param kubernetes_labels: labels to be passed to the kubernetes agent.
:param kubernetes_image: image to use when running through the kubernetes agent.
:param slack_webhooks_file: path to the slack webhooks file within the env
bucket.
"""
self.FLOW_NAME = flow_name
self.FLOW_NAME_UDCS = flow_name.replace("-", "_ ")
self.FLOW_NAME_UDCS = flow_name.replace("-", "_")
self.S3_BUCKET_NAME = (
DEFAULT_ENV_S3_BUCKET if env_s3_bucket is None else env_s3_bucket
)
self.ENV_FILE_PATH = (
DEFAULT_ENV_FILE_PATH if env_file_path is None else env_file_path
)
self.SLACK_WEBHOOKS_FILE = (
DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE
if slack_webhooks_file is None
@ -62,8 +84,14 @@ class LolaConfig:
DEFAULT_KUBERNETES_IMAGE if kubernetes_image is None else kubernetes_image
)
self.ENV_DATA = None
self.TRINO_CREDENTIALS = None
self.SSH_TUNNEL_CREDENTIALS = None
self.DW_CREDENTIALS = None
self._s3_reader = S3FileReader(s3_client=boto3.client("s3"))
@_needs_env_data
def fetch_slack_webhooks(self, s3_reader=None) -> None:
"""
Read the slack webhooks file from S3 and store the webhooks in memory.
@ -79,6 +107,84 @@ class LolaConfig:
bucket=self.S3_BUCKET_NAME, key=self.SLACK_WEBHOOKS_FILE
)
@_needs_env_data
def fetch_trino_credentials(self, s3_reader=None) -> None:
"""
Read the env file from S3 and store the trino credentials in memory.
:param s3_reader: a client to fetch files from S3.
:return: None
"""
self.TRINO_CREDENTIALS = {
"host": self.ENV_DATA["trino_host"],
"user": self.ENV_DATA["trino_user"],
"password": self.ENV_DATA["trino_pass"],
"port": self.ENV_DATA["trino_port"],
}
@_needs_env_data
def fetch_ssh_tunnel_credentials(self, s3_reader=None) -> None:
"""
Read the env file from S3 and store the SSH tunnel credentials.
:param s3_reader: a client to fetch files from S3.
:return: None
"""
self.SSH_TUNNEL_CREDENTIALS = {
"path_to_ssh_pkey": self.ENV_DATA["pt_ssh_pkey_path"],
"ssh_pkey_password": self.ENV_DATA["pt_ssh_pkey_passphrase"],
"ssh_username": self.ENV_DATA["pt_ssh_username"],
"ssh_port": self.ENV_DATA["pt_ssh_jumphost_port"],
"ssh_jumphost": self.ENV_DATA["pt_ssh_jumphost"],
}
@_needs_env_data
def fetch_dw_credentials(self, s3_reader=None) -> None:
"""
Read the env file from S3 and store the DW credentials.
:param s3_reader: a client to fetch files from S3.
:return: None
"""
self.DW_CREDENTIALS = {
"host": self.ENV_DATA["datadw_host"],
"user": self.ENV_DATA["datadw_user"],
"password": self.ENV_DATA["datadw_pass"],
"port": self.ENV_DATA["datadw_port"],
}
@_needs_env_data
def fetch_prefect_host(self, s3_reader=None) -> None:
"""
Read the env file from S3 and store the prefect_host.
:param s3_reader: a client to fetch files from S3.
:return: None
"""
self.PREFECT_HOST = self.ENV_DATA["prefect_host"]
def fetch_env_data(self, s3_reader=None) -> None:
"""
Read the env file from S3 with the default or a passed s3_reader and
store the contents in the object.
:param s3_reader: an optional s3_reader to use instead of the default
one.
:return: None
"""
if self.ENV_DATA is not None:
return
if s3_reader is None:
s3_reader = self._s3_reader
self.ENV_DATA = s3_reader.read_json_from_s3_file(
bucket=self.S3_BUCKET_NAME, key=self.ENV_FILE_PATH
)
def build_lolaconfig(
flow_name: str,
@ -105,5 +211,9 @@ def build_lolaconfig(
)
lolaconfig.fetch_slack_webhooks()
lolaconfig.fetch_trino_credentials()
lolaconfig.fetch_ssh_tunnel_credentials()
lolaconfig.fetch_dw_credentials()
lolaconfig.fetch_prefect_host()
return lolaconfig

View file

@ -4,12 +4,10 @@ from lolafect.lolaconfig import LolaConfig
def test_lolaconfig_instantiates_with_defaults_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
def test_lolaconfig_instantiates_without_defaults_proplery():
lolaconfig = LolaConfig(
flow_name="some-flow",
env_s3_bucket="bucket",
@ -20,7 +18,6 @@ def test_lolaconfig_instantiates_without_defaults_proplery():
def test_lolaconfig_fetches_webhooks_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
fake_s3_reader = SimpleNamespace()
@ -33,3 +30,81 @@ def test_lolaconfig_fetches_webhooks_properly():
lolaconfig.fetch_slack_webhooks(s3_reader=fake_s3_reader)
assert type(lolaconfig.SLACK_WEBHOOKS) is dict
def test_lolaconfig_fetches_trino_creds_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
fake_s3_reader = SimpleNamespace()
def mock_read_json_from_s3_file(bucket, key):
return {
"trino_host": "some_host",
"trino_user": "some_user",
"trino_pass": "some_password",
"trino_port": "some_port",
}
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
lolaconfig.fetch_trino_credentials(s3_reader=fake_s3_reader)
assert type(lolaconfig.TRINO_CREDENTIALS) is dict
def test_lolaconfig_fetches_ssh_tunnel_creds_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
fake_s3_reader = SimpleNamespace()
def mock_read_json_from_s3_file(bucket, key):
return {
"pt_ssh_pkey_path": "some-path",
"pt_ssh_pkey_passphrase": "some-password",
"pt_ssh_username": "some-username",
"pt_ssh_jumphost_port": "some-port",
"pt_ssh_jumphost": "some-jumphost",
}
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
lolaconfig.fetch_ssh_tunnel_credentials(s3_reader=fake_s3_reader)
assert type(lolaconfig.SSH_TUNNEL_CREDENTIALS) is dict
def test_lolaconfig_fetches_dw_creds_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
fake_s3_reader = SimpleNamespace()
def mock_read_json_from_s3_file(bucket, key):
return {
"datadw_host": "some_host",
"datadw_user": "some_user",
"datadw_pass": "some_password",
"datadw_port": "some_port",
}
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
lolaconfig.fetch_dw_credentials(s3_reader=fake_s3_reader)
assert type(lolaconfig.DW_CREDENTIALS) is dict
def test_lolaconfig_fetches_prefect_host_properly():
lolaconfig = LolaConfig(flow_name="some-flow")
fake_s3_reader = SimpleNamespace()
def mock_read_json_from_s3_file(bucket, key):
return {
"prefect_host": "some_host",
}
fake_s3_reader.read_json_from_s3_file = mock_read_json_from_s3_file
lolaconfig.fetch_prefect_host(s3_reader=fake_s3_reader)
assert type(lolaconfig.PREFECT_HOST) is str