From 7962bd11e693001e1044768c592258b0d15297ae Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:27:39 +0100 Subject: [PATCH 01/35] Added trino client do dependencies. --- requirements-dev.txt | 3 ++- setup.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 40f6a2a..f92f5bc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,4 +2,5 @@ prefect==1.2.2 requests==2.28.1 boto3==1.26.40 pytest==7.2.0 -httpretty==1.1.4 \ No newline at end of file +httpretty==1.1.4 +trino==0.321.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1b6c89e..ec38921 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,5 @@ setup( package_dir={"lolafect": "lolafect"}, include_package_data=True, python_requires=">=3.7", - install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40"], + install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40", "trino==0.321.0"], ) From caa69d85cdd69f27781a043e9a6144a7644659a9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:42:57 +0100 Subject: [PATCH 02/35] Add trino task. --- lolafect/connections.py | 35 +++++++++++++++++++++++++++++++++++ lolafect/defaults.py | 1 + 2 files changed, 36 insertions(+) create mode 100644 lolafect/connections.py diff --git a/lolafect/connections.py b/lolafect/connections.py new file mode 100644 index 0000000..fc0854a --- /dev/null +++ b/lolafect/connections.py @@ -0,0 +1,35 @@ +import datetime + +from prefect import task +from trino.auth import BasicAuthentication +import trino + +from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME + + +@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=10)) +def connect_to_trino( + trino_credentials: dict, http_schema: str = DEFAULT_TRINO_HTTP_SCHEME +) -> trino.dbapi.Connection: + """ + Open a connection to the specified trino instance and return it. + + + :param trino_credentials: a dict with the host, port, user and password. + :param http_schema: which http schema to use in the connection. + :return: + """ + print("Connecting to Trino.") + connection = trino.dbapi.connect( + host=trino_credentials["host"], + port=trino_credentials["port"], + user=trino_credentials["user"], + http_scheme=http_schema, + auth=BasicAuthentication( + trino_credentials["user"], + trino_credentials["password"], + ), + ) + print("Connected to Trino.") + + return connection diff --git a/lolafect/defaults.py b/lolafect/defaults.py index feb66e4..1d4af84 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -4,3 +4,4 @@ DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" +DEFAULT_TRINO_HTTP_SCHEME = "https" From 28831bea8c996635bcd425d2f3036e4248958b7e Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Thu, 19 Jan 2023 17:45:57 +0100 Subject: [PATCH 03/35] Improve logging. --- lolafect/connections.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index fc0854a..c2fec0c 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -1,5 +1,6 @@ import datetime +import prefect from prefect import task from trino.auth import BasicAuthentication import trino @@ -14,12 +15,15 @@ def connect_to_trino( """ Open a connection to the specified trino instance and return it. - :param trino_credentials: a dict with the host, port, user and password. :param http_schema: which http schema to use in the connection. :return: """ - print("Connecting to Trino.") + logger = prefect.context.get("logger") + logger.info( + f"Connecting to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) + connection = trino.dbapi.connect( host=trino_credentials["host"], port=trino_credentials["port"], @@ -30,6 +34,8 @@ def connect_to_trino( trino_credentials["password"], ), ) - print("Connected to Trino.") + logger.info( + f"Connected to Trino at {trino_credentials['host']}:{trino_credentials['port']}." + ) return connection From 1e0faf702f3f32d851f6eca33018a88fb79ff536 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 20 Jan 2023 15:01:07 +0100 Subject: [PATCH 04/35] Added integration test for connecting to Trino --- tests/test_integration/test_connections.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 tests/test_integration/test_connections.py diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py new file mode 100644 index 0000000..5b79a46 --- /dev/null +++ b/tests/test_integration/test_connections.py @@ -0,0 +1,18 @@ +import trino + +from lolafect.lolaconfig import build_lolaconfig +from lolafect.connections import connect_to_trino + +# This testing suite requires: +# - The calling shell to have permission in AWS +# - The calling shell to be within the Mercadão network +TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") + + +def test_that_trino_connector_works_properly(): + + connection = connect_to_trino.run(TEST_LOLACONFIG.TRINO_CREDENTIALS) + + connection.cursor().execute("SELECT 1") + + assert type(connection) == trino.dbapi.Connection From a15b2f4a861fbbd5c0a79ed4c9a9af353cc62c52 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 20 Jan 2023 15:53:45 +0100 Subject: [PATCH 05/35] Add close connection, update tests. --- lolafect/connections.py | 11 +++++++++++ tests/test_integration/test_connections.py | 8 ++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index c2fec0c..eb14237 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -2,6 +2,7 @@ import datetime import prefect from prefect import task +from prefect.triggers import all_finished from trino.auth import BasicAuthentication import trino @@ -39,3 +40,13 @@ def connect_to_trino( ) return connection + + +@task(trigger=all_finished) +def close_trino_connection(trino_connection: trino.dbapi.Connection): + logger = prefect.context.get("logger") + if isinstance(trino_connection, trino.dbapi.Connection): + trino_connection.close() + logger.info("Trino connection closed successfully.") + return + logger.info("No connection received.") diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 5b79a46..bbf8ddd 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,7 +1,7 @@ import trino from lolafect.lolaconfig import build_lolaconfig -from lolafect.connections import connect_to_trino +from lolafect.connections import connect_to_trino, close_trino_connection # This testing suite requires: # - The calling shell to have permission in AWS @@ -9,10 +9,10 @@ from lolafect.connections import connect_to_trino TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") -def test_that_trino_connector_works_properly(): +def test_that_trino_connect_and_disconnect_works_properly(): - connection = connect_to_trino.run(TEST_LOLACONFIG.TRINO_CREDENTIALS) + connection = connect_to_trino.run(trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS) connection.cursor().execute("SELECT 1") - assert type(connection) == trino.dbapi.Connection + close_trino_connection.run(trino_connection=connection) \ No newline at end of file From 26d1f10a003edf1df88296043c38223e47057f66 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 12:46:56 +0100 Subject: [PATCH 06/35] Improved comments --- tests/test_integration/test_connections.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index bbf8ddd..cd3c083 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,11 +1,18 @@ -import trino - from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import connect_to_trino, close_trino_connection +#__ __ _____ _ _ _____ _ _ _____ _ +#\ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) # This testing suite requires: # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network +# - Do not use this tests as part of CI/CD pipelines since they are not idempotent and +# rely on many external. Instead, use them manually to check yourself that things are +# working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") From 910373f19ce611c47b47bc79e31cf8d2a4100134 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:06:19 +0100 Subject: [PATCH 07/35] Docstrings and typing. --- lolafect/connections.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index eb14237..40c450f 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -43,7 +43,14 @@ def connect_to_trino( @task(trigger=all_finished) -def close_trino_connection(trino_connection: trino.dbapi.Connection): +def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: + """ + Close a Trino connection, or do nothing if what has been passed is not a + Trino connection. + + :param trino_connection: a trino connection. + :return: None + """ logger = prefect.context.get("logger") if isinstance(trino_connection, trino.dbapi.Connection): trino_connection.close() From 73eae76739ab5194d92fcc6d751f61f89a771185 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:06:48 +0100 Subject: [PATCH 08/35] Formatting --- tests/test_integration/test_connections.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index cd3c083..5156f52 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,8 +1,8 @@ from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import connect_to_trino, close_trino_connection -#__ __ _____ _ _ _____ _ _ _____ _ -#\ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | +# __ __ _____ _ _ _____ _ _ _____ _ +# \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | # \ \ /\ / / \ | |__) | \| | | | | \| | | __| | # \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | # \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| @@ -18,8 +18,10 @@ TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") def test_that_trino_connect_and_disconnect_works_properly(): - connection = connect_to_trino.run(trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS) + connection = connect_to_trino.run( + trino_credentials=TEST_LOLACONFIG.TRINO_CREDENTIALS + ) connection.cursor().execute("SELECT 1") - close_trino_connection.run(trino_connection=connection) \ No newline at end of file + close_trino_connection.run(trino_connection=connection) From aabdbe0508be192065496f25aacd04fed0950675 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:07:22 +0100 Subject: [PATCH 09/35] Fixed comments --- tests/test_integration/test_connections.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 5156f52..b8e32be 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -11,8 +11,8 @@ from lolafect.connections import connect_to_trino, close_trino_connection # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network # - Do not use this tests as part of CI/CD pipelines since they are not idempotent and -# rely on many external. Instead, use them manually to check yourself that things are -# working properly. +# rely external resources. Instead, use them manually to check yourself that things +# are working properly. TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") From d9b7c9a9e25a527cbc4b3acc2dc9e7eda0a5ab61 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:07:35 +0100 Subject: [PATCH 10/35] Fixed comments --- tests/test_integration/test_connections.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index b8e32be..74944e0 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -3,10 +3,10 @@ from lolafect.connections import connect_to_trino, close_trino_connection # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | -# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | -# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | -# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| -# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) +# \ \ /\ / / \ | |__) | \| | | | | \| | | __| | +# \ \/ \/ / /\ \ | _ /| . ` | | | | . ` | | |_ | | +# \ /\ / ____ \| | \ \| |\ |_| |_| |\ | |__| |_| +# \/ \/_/ \_\_| \_\_| \_|_____|_| \_|\_____(_) # This testing suite requires: # - The calling shell to have permission in AWS # - The calling shell to be within the Mercadão network From 738c4515181f42583d34325dfef1878e5e547cc4 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:08:21 +0100 Subject: [PATCH 11/35] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78124e9..32fb418 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +- Added Trino connection capabilities in `connections` module. + + ## [0.2.0] - 2023-01-19 ### Added From 6cefb7ed03ce3e5cd6bfd1f371ccd5407232b7a3 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:27:17 +0100 Subject: [PATCH 12/35] Update readme with connection example --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index e7b7994..d04f3c8 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,19 @@ lolaconfig = build_lolaconfig( ) ``` +**Connect to a Trino server** + +```python +from lolafect.connections import connect_to_trino, close_trino_connection + + connection = connect_to_trino.run( + trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS + ) + connection.cursor().execute("SELECT 1") + close_trino_connection.run(trino_connection=connection) +``` + + **Send a warning message to slack if your tasks fails** ```python From bc524358ff6b8911c2054e6fadb792c1932029ab Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:28:08 +0100 Subject: [PATCH 13/35] Moved unit tests to folder --- tests/{ => test_unit}/test_lolaconfig.py | 0 tests/{ => test_unit}/test_slack.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/{ => test_unit}/test_lolaconfig.py (100%) rename tests/{ => test_unit}/test_slack.py (100%) diff --git a/tests/test_lolaconfig.py b/tests/test_unit/test_lolaconfig.py similarity index 100% rename from tests/test_lolaconfig.py rename to tests/test_unit/test_lolaconfig.py diff --git a/tests/test_slack.py b/tests/test_unit/test_slack.py similarity index 100% rename from tests/test_slack.py rename to tests/test_unit/test_slack.py From e355e771b0308855049bdccf80ac6fa26955064e Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:35:37 +0100 Subject: [PATCH 14/35] Updated docs on how to run tests. --- README.md | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d04f3c8..581b955 100644 --- a/README.md +++ b/README.md @@ -70,12 +70,30 @@ with Flow(...) as flow: ## How to test +There are two test suites: unit tests and integration tests. Integration tests are prepared to plug to some of our +AWS resources, hence they are not fully reliable since they require specific credentials and permissions. The +recommended policy is: + +- Use the unit tests in any CI process you want. +- Use the unit tests frequently as you code. +- Do not use the integration tests in CI processes. +- Use the integration tests as milestone checks when finishing feature branches. +- Make sure to ensure integration tests are working before making a new release. + +When building new tests, please keep this philosophy in mind. + + IDE-agnostic: 1. Set up a virtual environment which contains both `lolafect` and the dependencies listed in `requirements-dev.txt`. -2. Run: `pytests tests` +2. Run: + - For all tests: `pytests tests` + - Only unit tests: `pytest tests/test_unit` + - Only integration tests: `pytest tests/test_integration` In Pycharm: - If you configure `pytest` as the project test runner, Pycharm will most probably autodetect the test - folder and allow you to run the test suite within the IDE. \ No newline at end of file + folder and allow you to run the test suite within the IDE. However, Pycharm has troubles running the integration + tests since the shell it runs from does not have the AWS credentials. Hence, for now we recommend you to only use + the Pycharm integrated test runner for the unit tests. You can easily set up a Run Configuration for that. \ No newline at end of file From ee489edd6193859c0bae96dac7a5cbb5fc2f4020 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 14:37:12 +0100 Subject: [PATCH 15/35] Improved code example --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 581b955..9160eb3 100644 --- a/README.md +++ b/README.md @@ -41,11 +41,15 @@ lolaconfig = build_lolaconfig( ```python from lolafect.connections import connect_to_trino, close_trino_connection +with Flow(...) as flow: connection = connect_to_trino.run( trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS ) - connection.cursor().execute("SELECT 1") - close_trino_connection.run(trino_connection=connection) + task_result = some_trino_related_task(trino_connection=connection) + close_trino_connection.run( + trino_connection=connection, + upstream_tasks=[task_result] + ) ``` From fae5888c5264db342fe1073e072cf503082b3925 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 15:00:27 +0100 Subject: [PATCH 16/35] Added requirements to project --- requirements-dev.txt | 4 +++- setup.py | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index f92f5bc..2adb75e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,4 +3,6 @@ requests==2.28.1 boto3==1.26.40 pytest==7.2.0 httpretty==1.1.4 -trino==0.321.0 \ No newline at end of file +trino==0.321.0 +sshtunnel==0.4.0 +PyMySQL==1.0.2 \ No newline at end of file diff --git a/setup.py b/setup.py index ec38921..3ac4e7f 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,12 @@ setup( package_dir={"lolafect": "lolafect"}, include_package_data=True, python_requires=">=3.7", - install_requires=["prefect==1.2.2", "requests==2.28.1", "boto3==1.26.40", "trino==0.321.0"], + install_requires=[ + "prefect==1.2.2", + "requests==2.28.1", + "boto3==1.26.40", + "trino==0.321.0", + "sshtunnel==0.4.0", + "PyMySQL==1.0.2" + ], ) From 9feda795e9d7788294e5b3dd2d043a746dee1fb5 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 15:01:09 +0100 Subject: [PATCH 17/35] Simple copy from project functions --- lolafect/connections.py | 85 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/lolafect/connections.py b/lolafect/connections.py index 40c450f..250c389 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -5,6 +5,8 @@ from prefect import task from prefect.triggers import all_finished from trino.auth import BasicAuthentication import trino +import pymysql +import sshtunnel from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME @@ -57,3 +59,86 @@ def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: logger.info("Trino connection closed successfully.") return logger.info("No connection received.") + + +@task(log_stdout=True, nout=2) +def connect_to_dw(use_ssh_tunnel): + print("Connecting to DW") + import pymysql + + mysql_host = LOLACONFIG.DW_CREDENTIALS["host"] + tunnel = None + if use_ssh_tunnel: + print("Going to open an SSH tunnel.") + from sshtunnel import SSHTunnelForwarder + + temp_file_path = "temp" + try: + boto3.client("s3").download_file( + LOLACONFIG.S3_BUCKET_NAME, + LOLACONFIG.SSH_TUNNEL_CREDENTIALS["path_to_ssh_pkey"], + temp_file_path, + ) + tunnel = SSHTunnelForwarder( + ssh_host=( + LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_jumphost"], + LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_port"], + ), + ssh_username=LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_username"], + ssh_pkey=temp_file_path, + remote_bind_address=( + LOLACONFIG.DW_CREDENTIALS["host"], + LOLACONFIG.DW_CREDENTIALS["port"], + ), + local_bind_address=("127.0.0.1", LOLACONFIG.DW_CREDENTIALS["port"]), + ssh_private_key_password=LOLACONFIG.SSH_TUNNEL_CREDENTIALS[ + "ssh_pkey_password" + ], + ) + except Exception as e: + raise e + finally: + # No matter what happens above, we always must delete the temp copy of the key + os.remove(temp_file_path) + + tunnel.start() + print("SSH tunnel is now open.") + mysql_host = "127.0.0.1" + + db_connection = pymysql.connect( + host=mysql_host, + port=LOLACONFIG.DW_CREDENTIALS["port"], + user=LOLACONFIG.DW_CREDENTIALS["user"], + password=LOLACONFIG.DW_CREDENTIALS["password"], + database="dw_xl", + ) + + # Customizing this attributes to retrieve them later in the flow + db_connection.raw_user = LOLACONFIG.DW_CREDENTIALS["user"] + db_connection.raw_password = LOLACONFIG.DW_CREDENTIALS["password"] + + print("Connected to DW.") + + return db_connection, tunnel + + +@task(log_stdout=True, trigger=all_finished) +def close_dw_connection(dw_connection): + import pymysql + + if isinstance(dw_connection, pymysql.Connection): + dw_connection.close() + print("DW connection closed successfully.") + return + print("No connection received.") + + +@task(log_stdout=True, trigger=all_finished) +def close_ssh_tunnel(ssh_tunnel): + from sshtunnel import SSHTunnelForwarder + + if isinstance(ssh_tunnel, SSHTunnelForwarder): + ssh_tunnel.stop() + print("SSH tunnel closed successfully.") + return + print("No connection received.") \ No newline at end of file From e4e1f423091a48b8be8b619bb66762cceb96bf40 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 15:49:54 +0100 Subject: [PATCH 18/35] Adapting references --- lolafect/connections.py | 77 +++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 250c389..34cc490 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -1,4 +1,6 @@ import datetime +import os +from typing import Tuple import prefect from prefect import task @@ -6,7 +8,8 @@ from prefect.triggers import all_finished from trino.auth import BasicAuthentication import trino import pymysql -import sshtunnel +import boto3 +from sshtunnel import SSHTunnelForwarder from lolafect.defaults import DEFAULT_TRINO_HTTP_SCHEME @@ -62,38 +65,42 @@ def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: @task(log_stdout=True, nout=2) -def connect_to_dw(use_ssh_tunnel): - print("Connecting to DW") - import pymysql +def connect_to_dw( + mysql_credentials: dict, + use_ssh_tunnel: bool, + s3_bucket_name, + ssh_tunnel_credentials, +) -> Tuple[pymysql.Connection, SSHTunnelForwarder]: + logger = prefect.context.get("logger") + logger.info( + f"Connecting to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}." + ) - mysql_host = LOLACONFIG.DW_CREDENTIALS["host"] + mysql_host = mysql_credentials["host"] tunnel = None if use_ssh_tunnel: - print("Going to open an SSH tunnel.") - from sshtunnel import SSHTunnelForwarder + logger.info("Going to open an SSH tunnel.") temp_file_path = "temp" try: boto3.client("s3").download_file( - LOLACONFIG.S3_BUCKET_NAME, - LOLACONFIG.SSH_TUNNEL_CREDENTIALS["path_to_ssh_pkey"], + s3_bucket_name, + ssh_tunnel_credentials["path_to_ssh_pkey"], temp_file_path, ) tunnel = SSHTunnelForwarder( ssh_host=( - LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_jumphost"], - LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_port"], + ssh_tunnel_credentials["ssh_jumphost"], + ssh_tunnel_credentials["ssh_port"], ), - ssh_username=LOLACONFIG.SSH_TUNNEL_CREDENTIALS["ssh_username"], + ssh_username=ssh_tunnel_credentials["ssh_username"], ssh_pkey=temp_file_path, remote_bind_address=( - LOLACONFIG.DW_CREDENTIALS["host"], - LOLACONFIG.DW_CREDENTIALS["port"], + mysql_credentials["host"], + mysql_credentials["port"], ), - local_bind_address=("127.0.0.1", LOLACONFIG.DW_CREDENTIALS["port"]), - ssh_private_key_password=LOLACONFIG.SSH_TUNNEL_CREDENTIALS[ - "ssh_pkey_password" - ], + local_bind_address=("127.0.0.1", mysql_credentials["port"]), + ssh_private_key_password=mysql_credentials["ssh_pkey_password"], ) except Exception as e: raise e @@ -102,43 +109,45 @@ def connect_to_dw(use_ssh_tunnel): os.remove(temp_file_path) tunnel.start() - print("SSH tunnel is now open.") + logger.info( + f"SSH tunnel is now open and listening at{mysql_credentials['host']}:{mysql_credentials['port']}." + ) mysql_host = "127.0.0.1" db_connection = pymysql.connect( host=mysql_host, - port=LOLACONFIG.DW_CREDENTIALS["port"], - user=LOLACONFIG.DW_CREDENTIALS["user"], - password=LOLACONFIG.DW_CREDENTIALS["password"], + port=mysql_credentials["port"], + user=mysql_credentials["user"], + password=mysql_credentials["password"], database="dw_xl", ) # Customizing this attributes to retrieve them later in the flow - db_connection.raw_user = LOLACONFIG.DW_CREDENTIALS["user"] - db_connection.raw_password = LOLACONFIG.DW_CREDENTIALS["password"] + db_connection.raw_user = mysql_credentials["user"] + db_connection.raw_password = mysql_credentials["password"] - print("Connected to DW.") + logger.info(f"Connected to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}.") return db_connection, tunnel -@task(log_stdout=True, trigger=all_finished) -def close_dw_connection(dw_connection): - import pymysql +@task(trigger=all_finished) +def close_dw_connection(dw_connection: pymysql.Connection) -> None: + logger = prefect.context.get("logger") if isinstance(dw_connection, pymysql.Connection): dw_connection.close() - print("DW connection closed successfully.") + logger.info("DW connection closed successfully.") return - print("No connection received.") + logger.info("No connection received.") @task(log_stdout=True, trigger=all_finished) -def close_ssh_tunnel(ssh_tunnel): - from sshtunnel import SSHTunnelForwarder +def close_ssh_tunnel(ssh_tunnel: SSHTunnelForwarder) -> None: + logger = prefect.context.get("logger") if isinstance(ssh_tunnel, SSHTunnelForwarder): ssh_tunnel.stop() - print("SSH tunnel closed successfully.") + logger.info("SSH tunnel closed successfully.") return - print("No connection received.") \ No newline at end of file + logger.info("No connection received.") From f75b832903fa30475de6ff3c7c57852f376c15dc Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 17:54:17 +0100 Subject: [PATCH 19/35] Create context manager for temp download of ssh key file. Tests. --- lolafect/connections.py | 34 +++++++++++++++++-- tests/test_integration/test_connections.py | 38 +++++++++++++++++++++- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 34cc490..d823db9 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -1,6 +1,7 @@ import datetime import os from typing import Tuple +from contextlib import contextmanager import prefect from prefect import task @@ -64,8 +65,37 @@ def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: logger.info("No connection received.") -@task(log_stdout=True, nout=2) -def connect_to_dw( +@contextmanager +def _temp_secret_file_from_s3( + s3_bucket_name: str, s3_file_key: str, local_temp_file_path: str +) -> str: + """ + Downloads a file from S3 and ensures that it will be deleted once the + context is exited from, even in the face of an exception. + + :param s3_bucket_name: the bucket where the file lives. + :param s3_file_key: the key of the file within the bucket. + :param local_temp_file_path: the path where the file should be stored + temporarily. + :return: the local file path. + """ + boto3.client("s3").download_file( + s3_bucket_name, + s3_file_key, + local_temp_file_path, + ) + try: + yield local_temp_file_path + except Exception as e: + raise e + finally: + # Regardless of what happens in the context manager, we always delete the temp + # copy of the private key. + os.remove(local_temp_file_path) + + +@task(nout=2) +def connect_to_mysql( mysql_credentials: dict, use_ssh_tunnel: bool, s3_bucket_name, diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 74944e0..a3349a4 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,5 +1,7 @@ +import pathlib + from lolafect.lolaconfig import build_lolaconfig -from lolafect.connections import connect_to_trino, close_trino_connection +from lolafect.connections import connect_to_trino, close_trino_connection, _temp_secret_file_from_s3 # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -25,3 +27,37 @@ def test_that_trino_connect_and_disconnect_works_properly(): connection.cursor().execute("SELECT 1") close_trino_connection.run(trino_connection=connection) + + +def test_temporal_download_of_secret_file_works_properly_in_happy_path(): + + temp_file_name = "test_temp_file" + + with _temp_secret_file_from_s3( + TEST_LOLACONFIG.S3_BUCKET_NAME, + s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, + local_temp_file_path=temp_file_name + ) as temp: + temp_file_found_when_in_context_manager = pathlib.Path(temp).exists() + + temp_file_missing_when_outside_context_manager = not pathlib.Path(temp_file_name).exists() + + assert temp_file_found_when_in_context_manager and temp_file_missing_when_outside_context_manager + +def test_temporal_download_of_secret_file_works_properly_even_with_exception(): + temp_file_name = "test_temp_file" + + try: + with _temp_secret_file_from_s3( + TEST_LOLACONFIG.S3_BUCKET_NAME, + s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, + local_temp_file_path=temp_file_name + ) as temp: + temp_file_found_when_in_context_manager = pathlib.Path(temp).exists() + raise Exception # Something nasty happens within the context manager + except: + pass # We go with the test, ignoring the forced exception + + temp_file_missing_when_outside_context_manager = not pathlib.Path(temp_file_name).exists() + + assert temp_file_found_when_in_context_manager and temp_file_missing_when_outside_context_manager \ No newline at end of file From dd07cd7959a1b8fc91915c8887f7845c0c22ab94 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 18:21:23 +0100 Subject: [PATCH 20/35] Test formatting --- tests/test_integration/test_connections.py | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index a3349a4..771baa3 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -35,14 +35,20 @@ def test_temporal_download_of_secret_file_works_properly_in_happy_path(): with _temp_secret_file_from_s3( TEST_LOLACONFIG.S3_BUCKET_NAME, - s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, - local_temp_file_path=temp_file_name + s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, + local_temp_file_path=temp_file_name, ) as temp: temp_file_found_when_in_context_manager = pathlib.Path(temp).exists() - temp_file_missing_when_outside_context_manager = not pathlib.Path(temp_file_name).exists() + temp_file_missing_when_outside_context_manager = not pathlib.Path( + temp_file_name + ).exists() + + assert ( + temp_file_found_when_in_context_manager + and temp_file_missing_when_outside_context_manager + ) - assert temp_file_found_when_in_context_manager and temp_file_missing_when_outside_context_manager def test_temporal_download_of_secret_file_works_properly_even_with_exception(): temp_file_name = "test_temp_file" @@ -50,14 +56,19 @@ def test_temporal_download_of_secret_file_works_properly_even_with_exception(): try: with _temp_secret_file_from_s3( TEST_LOLACONFIG.S3_BUCKET_NAME, - s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, - local_temp_file_path=temp_file_name + s3_file_key="env/env_prd.json", # Not a secret file, but then again, this is a test, + local_temp_file_path=temp_file_name, ) as temp: temp_file_found_when_in_context_manager = pathlib.Path(temp).exists() - raise Exception # Something nasty happens within the context manager + raise Exception # Something nasty happens within the context manager except: - pass # We go with the test, ignoring the forced exception + pass # We go with the test, ignoring the forced exception - temp_file_missing_when_outside_context_manager = not pathlib.Path(temp_file_name).exists() + temp_file_missing_when_outside_context_manager = not pathlib.Path( + temp_file_name + ).exists() - assert temp_file_found_when_in_context_manager and temp_file_missing_when_outside_context_manager \ No newline at end of file + assert ( + temp_file_found_when_in_context_manager + and temp_file_missing_when_outside_context_manager + ) From 8c66a843b9c51ba3c440e6fd027d2f715ac7b00a Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 18:38:12 +0100 Subject: [PATCH 21/35] Add ssh opening functions and test. --- lolafect/connections.py | 85 ++++++++++++++++++++++ tests/test_integration/test_connections.py | 22 +++++- 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index d823db9..a1fa4a1 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -94,6 +94,91 @@ def _temp_secret_file_from_s3( os.remove(local_temp_file_path) +@task() +def open_ssh_tunnel_with_s3_pkey( + s3_bucket_name: str, + ssh_tunnel_credentials: dict, + remote_target_host: str, + remote_target_port: int, +) -> SSHTunnelForwarder: + """ + Temporarily fetch a ssh key from S3 and then proceed to open a SSH tunnel + using it. + + :param s3_bucket_name: the bucket where the file lives. + :param ssh_tunnel_credentials: the details of the jumpthost SSH + connection. + :param remote_target_host: the remote host to tunnel to. + :param remote_target_port: the remote port to tunnel to. + :return: the tunnel, already open. + """ + logger = prefect.context.get("logger") + logger.info("Going to open an SSH tunnel.") + + temp_file_path = "temp" + local_bind_host = "127.0.0.1" + local_bind_port = 12345 + + with _temp_secret_file_from_s3( + s3_bucket_name=s3_bucket_name, + s3_file_key=ssh_tunnel_credentials["path_to_ssh_pkey"], + local_temp_file_path=temp_file_path, + ) as ssh_key_file: + tunnel = open_ssh_tunnel( + local_bind_host, + local_bind_port, + remote_target_host, + remote_target_port, + ssh_key_file, + ssh_tunnel_credentials, + ) + logger.info( + f"SSH tunnel is now open and listening at {local_bind_host}:{local_bind_port}.\n" + f"Tunnel forwards to {remote_target_host}:{remote_target_port}" + ) + + return tunnel + + +def open_ssh_tunnel( + local_bind_host: str, + local_bind_port: int, + remote_target_host: str, + remote_target_port: int, + ssh_key_file_path: str, + ssh_tunnel_credentials: dict, +) -> SSHTunnelForwarder: + """ + Configure and start an SSH tunnel. + + :param local_bind_host: the local host address to bind the tunnel to. + :param local_bind_port: the local port address to bind the tunnel to. + :param remote_target_host: the remote host to forward to. + :param remote_target_port: the remote port to forward to. + :param ssh_key_file_path: the path to the ssh key. + :param ssh_tunnel_credentials: the details of the jumpthost SSH + connection. + :return: the tunnel, already open. + """ + + tunnel = SSHTunnelForwarder( + ssh_host=( + ssh_tunnel_credentials["ssh_jumphost"], + ssh_tunnel_credentials["ssh_port"], + ), + ssh_username=ssh_tunnel_credentials["ssh_username"], + ssh_pkey=ssh_key_file_path, + remote_bind_address=( + remote_target_host, + remote_target_port, + ), + local_bind_address=(local_bind_host, local_bind_port), + ssh_private_key_password=ssh_tunnel_credentials["ssh_pkey_password"], + ) + tunnel.start() + return tunnel + + @task(nout=2) def connect_to_mysql( mysql_credentials: dict, diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 771baa3..52c471a 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,7 +1,12 @@ import pathlib from lolafect.lolaconfig import build_lolaconfig -from lolafect.connections import connect_to_trino, close_trino_connection, _temp_secret_file_from_s3 +from lolafect.connections import ( + connect_to_trino, + close_trino_connection, + _temp_secret_file_from_s3, + open_ssh_tunnel_with_s3_pkey, +) # __ __ _____ _ _ _____ _ _ _____ _ # \ \ / /\ | __ \| \ | |_ _| \ | |/ ____| | @@ -72,3 +77,18 @@ def test_temporal_download_of_secret_file_works_properly_even_with_exception(): temp_file_found_when_in_context_manager and temp_file_missing_when_outside_context_manager ) + + +def test_opening_and_closing_ssh_tunnel_works_properly(): + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + ) + tunnel_was_active = tunnel.is_active + tunnel.close() + + tunnel_is_no_longer_active = not tunnel.is_active + + assert tunnel_was_active and tunnel_is_no_longer_active From e2e7f8fb9332941afad9ee5bfbd4ff7ec803e261 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Mon, 23 Jan 2023 18:38:52 +0100 Subject: [PATCH 22/35] Small details --- lolafect/connections.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index a1fa4a1..3e48f96 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -215,7 +215,7 @@ def connect_to_mysql( mysql_credentials["port"], ), local_bind_address=("127.0.0.1", mysql_credentials["port"]), - ssh_private_key_password=mysql_credentials["ssh_pkey_password"], + ssh_private_key_password=ssh_tunnel_credentials["ssh_pkey_password"], ) except Exception as e: raise e @@ -241,13 +241,15 @@ def connect_to_mysql( db_connection.raw_user = mysql_credentials["user"] db_connection.raw_password = mysql_credentials["password"] - logger.info(f"Connected to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}.") + logger.info( + f"Connected to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}." + ) return db_connection, tunnel @task(trigger=all_finished) -def close_dw_connection(dw_connection: pymysql.Connection) -> None: +def close_mysql_connection(dw_connection: pymysql.Connection) -> None: logger = prefect.context.get("logger") if isinstance(dw_connection, pymysql.Connection): @@ -257,7 +259,7 @@ def close_dw_connection(dw_connection: pymysql.Connection) -> None: logger.info("No connection received.") -@task(log_stdout=True, trigger=all_finished) +@task(trigger=all_finished) def close_ssh_tunnel(ssh_tunnel: SSHTunnelForwarder) -> None: logger = prefect.context.get("logger") From 3564ae99f785462d853fd10bf84f033393ff5746 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 09:57:54 +0100 Subject: [PATCH 23/35] Splitting MySQL and SSH tunnel bits. SSH tunnel is now tested. --- lolafect/connections.py | 83 +++++++++++----------- tests/test_integration/test_connections.py | 27 ++++++- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 3e48f96..3d85f17 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -100,6 +100,8 @@ def open_ssh_tunnel_with_s3_pkey( ssh_tunnel_credentials: dict, remote_target_host: str, remote_target_port: int, + local_bind_host: str = "127.0.0.1", + local_bind_port: int = 12345 ) -> SSHTunnelForwarder: """ Temporarily fetch a ssh key from S3 and then proceed to open a SSH tunnel @@ -110,14 +112,14 @@ def open_ssh_tunnel_with_s3_pkey( connection. :param remote_target_host: the remote host to tunnel to. :param remote_target_port: the remote port to tunnel to. + :param local_bind_host: the host for the local bind address. + :param local_bind_port: the port for the local bind address. :return: the tunnel, already open. """ logger = prefect.context.get("logger") logger.info("Going to open an SSH tunnel.") temp_file_path = "temp" - local_bind_host = "127.0.0.1" - local_bind_port = 12345 with _temp_secret_file_from_s3( s3_bucket_name=s3_bucket_name, @@ -179,59 +181,54 @@ def open_ssh_tunnel( return tunnel -@task(nout=2) +@task() +def get_local_bind_address_from_ssh_tunnel( + tunnel: SSHTunnelForwarder, +) -> Tuple[str, int]: + """ + A silly wrapper to be able to unpack the local bind address of a tunnel + within a Prefect flow. + + :param tunnel: an SSH tunnel. + :return: the local bind address of the SSH tunnel, as a tuple with host + and port. + """ + return tunnel.local_bind_address + + +@task() def connect_to_mysql( - mysql_credentials: dict, - use_ssh_tunnel: bool, - s3_bucket_name, - ssh_tunnel_credentials, -) -> Tuple[pymysql.Connection, SSHTunnelForwarder]: + mysql_credentials: dict, overriding_host_and_port: Tuple[str, int] = None +) -> pymysql.Connection: + """ + Create a connection to a MySQL server, optionally using a host and port + different from the ones where the MySQL server is located. + + :param mysql_credentials: a dict with the connection details to the MySQL + instance. + :param overriding_host_and_port: an optional tuple containing a different + host and port. Useful to route through an SSH tunnel. + :return: the connection to the MySQL server. + """ logger = prefect.context.get("logger") logger.info( f"Connecting to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}." ) mysql_host = mysql_credentials["host"] - tunnel = None - if use_ssh_tunnel: - logger.info("Going to open an SSH tunnel.") + mysql_port = mysql_credentials["port"] - temp_file_path = "temp" - try: - boto3.client("s3").download_file( - s3_bucket_name, - ssh_tunnel_credentials["path_to_ssh_pkey"], - temp_file_path, - ) - tunnel = SSHTunnelForwarder( - ssh_host=( - ssh_tunnel_credentials["ssh_jumphost"], - ssh_tunnel_credentials["ssh_port"], - ), - ssh_username=ssh_tunnel_credentials["ssh_username"], - ssh_pkey=temp_file_path, - remote_bind_address=( - mysql_credentials["host"], - mysql_credentials["port"], - ), - local_bind_address=("127.0.0.1", mysql_credentials["port"]), - ssh_private_key_password=ssh_tunnel_credentials["ssh_pkey_password"], - ) - except Exception as e: - raise e - finally: - # No matter what happens above, we always must delete the temp copy of the key - os.remove(temp_file_path) - - tunnel.start() + if overriding_host_and_port: + # Since there is a tunnel, we actually want to connect to the local + # address of the tunnel, and not straight into the MySQL server. + mysql_host, mysql_port = overriding_host_and_port logger.info( - f"SSH tunnel is now open and listening at{mysql_credentials['host']}:{mysql_credentials['port']}." + f"Overriding the passed MySQL host and port with {mysql_host}:{mysql_port}." ) - mysql_host = "127.0.0.1" db_connection = pymysql.connect( host=mysql_host, - port=mysql_credentials["port"], + port=mysql_port, user=mysql_credentials["user"], password=mysql_credentials["password"], database="dw_xl", @@ -245,7 +242,7 @@ def connect_to_mysql( f"Connected to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}." ) - return db_connection, tunnel + return db_connection @task(trigger=all_finished) diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 52c471a..7f647c1 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -1,11 +1,15 @@ import pathlib +from prefect.tasks.core.function import FunctionTask + from lolafect.lolaconfig import build_lolaconfig from lolafect.connections import ( connect_to_trino, close_trino_connection, _temp_secret_file_from_s3, open_ssh_tunnel_with_s3_pkey, + get_local_bind_address_from_ssh_tunnel, + close_ssh_tunnel, ) # __ __ _____ _ _ _____ _ _ _____ _ @@ -80,15 +84,34 @@ def test_temporal_download_of_secret_file_works_properly_even_with_exception(): def test_opening_and_closing_ssh_tunnel_works_properly(): + + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + tunnel = open_ssh_tunnel_with_s3_pkey.run( s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, ) tunnel_was_active = tunnel.is_active - tunnel.close() + + local_bind_host_matches = ( + get_local_bind_address_from_ssh_tunnel.run(tunnel)[0] == test_local_bind_host + ) + local_bind_port_matches = ( + get_local_bind_address_from_ssh_tunnel.run(tunnel)[1] == test_local_bind_port + ) + + close_ssh_tunnel.run(tunnel) tunnel_is_no_longer_active = not tunnel.is_active - assert tunnel_was_active and tunnel_is_no_longer_active + assert ( + tunnel_was_active + and tunnel_is_no_longer_active + and local_bind_host_matches + and local_bind_port_matches + ) From f4f231d15d712c47286efc49aa6e57d3ecbbb7be Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 11:00:39 +0100 Subject: [PATCH 24/35] Test for connecting to MySQL --- lolafect/connections.py | 16 +++++----- tests/test_integration/test_connections.py | 35 ++++++++++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 3d85f17..450c4e4 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -101,7 +101,7 @@ def open_ssh_tunnel_with_s3_pkey( remote_target_host: str, remote_target_port: int, local_bind_host: str = "127.0.0.1", - local_bind_port: int = 12345 + local_bind_port: int = 12345, ) -> SSHTunnelForwarder: """ Temporarily fetch a ssh key from S3 and then proceed to open a SSH tunnel @@ -164,7 +164,7 @@ def open_ssh_tunnel( """ tunnel = SSHTunnelForwarder( - ssh_host=( + ssh_address_or_host=( ssh_tunnel_credentials["ssh_jumphost"], ssh_tunnel_credentials["ssh_port"], ), @@ -246,22 +246,22 @@ def connect_to_mysql( @task(trigger=all_finished) -def close_mysql_connection(dw_connection: pymysql.Connection) -> None: +def close_mysql_connection(connection: pymysql.Connection) -> None: logger = prefect.context.get("logger") - if isinstance(dw_connection, pymysql.Connection): - dw_connection.close() + if isinstance(connection, pymysql.Connection): + connection.close() logger.info("DW connection closed successfully.") return logger.info("No connection received.") @task(trigger=all_finished) -def close_ssh_tunnel(ssh_tunnel: SSHTunnelForwarder) -> None: +def close_ssh_tunnel(tunnel: SSHTunnelForwarder) -> None: logger = prefect.context.get("logger") - if isinstance(ssh_tunnel, SSHTunnelForwarder): - ssh_tunnel.stop() + if isinstance(tunnel, SSHTunnelForwarder): + tunnel.stop() logger.info("SSH tunnel closed successfully.") return logger.info("No connection received.") diff --git a/tests/test_integration/test_connections.py b/tests/test_integration/test_connections.py index 7f647c1..2d03039 100644 --- a/tests/test_integration/test_connections.py +++ b/tests/test_integration/test_connections.py @@ -10,6 +10,8 @@ from lolafect.connections import ( open_ssh_tunnel_with_s3_pkey, get_local_bind_address_from_ssh_tunnel, close_ssh_tunnel, + connect_to_mysql, + close_mysql_connection, ) # __ __ _____ _ _ _____ _ _ _____ _ @@ -115,3 +117,36 @@ def test_opening_and_closing_ssh_tunnel_works_properly(): and local_bind_host_matches and local_bind_port_matches ) + + +def test_connect_query_and_disconnect_from_mysql_with_tunnel(): + + test_local_bind_host = "127.0.0.1" + test_local_bind_port = 12345 + + tunnel = open_ssh_tunnel_with_s3_pkey.run( + s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, + ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, + remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], + remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], + local_bind_host=test_local_bind_host, + local_bind_port=test_local_bind_port, + ) + + connection = connect_to_mysql.run( + mysql_credentials=TEST_LOLACONFIG.DW_CREDENTIALS, + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel + ), + ) + + connection_was_open = connection.open + + connection.cursor().execute("SELECT 1") + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) + + connection_is_closed = not connection.open + + assert connection_was_open and connection_is_closed From 35472c172739268b3ad2c59f955983e664017dd9 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 13:36:59 +0100 Subject: [PATCH 25/35] Refactoring and tidy up things. --- lolafect/connections.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 450c4e4..caf3fdf 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -62,7 +62,13 @@ def close_trino_connection(trino_connection: trino.dbapi.Connection) -> None: trino_connection.close() logger.info("Trino connection closed successfully.") return - logger.info("No connection received.") + logger.warning( + f"Instead of a Trino connection, a {type(trino_connection)} was received." + ) + raise DeprecationWarning( + "This method will only accept the type 'trino.dbapi.Connection' in next major release.\n" + "Please, update your code accordingly." + ) @contextmanager @@ -247,21 +253,41 @@ def connect_to_mysql( @task(trigger=all_finished) def close_mysql_connection(connection: pymysql.Connection) -> None: + """ + Close a MySQL connection, or do nothing if something different is passed. + + :param connection: a MySQL connection. + :return: None + """ logger = prefect.context.get("logger") if isinstance(connection, pymysql.Connection): connection.close() - logger.info("DW connection closed successfully.") + logger.info("MySQL connection closed successfully.") return - logger.info("No connection received.") + logger.warning(f"Instead of a MySQL connection, a {type(connection)} was received.") + raise DeprecationWarning( + "This method will only accept the type 'pymysql.Connection' in next major release.\n" + "Please, update your code accordingly." + ) @task(trigger=all_finished) def close_ssh_tunnel(tunnel: SSHTunnelForwarder) -> None: + """ + Close a SSH tunnel, or do nothing if something different is passed. + + :param tunnel: a SSH tunnel. + :return: + """ logger = prefect.context.get("logger") if isinstance(tunnel, SSHTunnelForwarder): tunnel.stop() logger.info("SSH tunnel closed successfully.") return - logger.info("No connection received.") + logger.warning(f"Instead of a SSH tunnel, a {type(tunnel)} was received.") + raise DeprecationWarning( + "This method will only accept the type 'SSHTunnelForwarder' in next major release.\n" + "Please, update your code accordingly." + ) From 56d33435dc8a833e285135a7cebea649d18a8359 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 13:40:47 +0100 Subject: [PATCH 26/35] Cleaning. --- lolafect/connections.py | 46 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index caf3fdf..223c9f4 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -24,7 +24,7 @@ def connect_to_trino( :param trino_credentials: a dict with the host, port, user and password. :param http_schema: which http schema to use in the connection. - :return: + :return: the connection to trino. """ logger = prefect.context.get("logger") logger.info( @@ -96,7 +96,7 @@ def _temp_secret_file_from_s3( raise e finally: # Regardless of what happens in the context manager, we always delete the temp - # copy of the private key. + # copy of the secret file. os.remove(local_temp_file_path) @@ -187,6 +187,27 @@ def open_ssh_tunnel( return tunnel +@task(trigger=all_finished) +def close_ssh_tunnel(tunnel: SSHTunnelForwarder) -> None: + """ + Close a SSH tunnel, or do nothing if something different is passed. + + :param tunnel: a SSH tunnel. + :return: + """ + logger = prefect.context.get("logger") + + if isinstance(tunnel, SSHTunnelForwarder): + tunnel.stop() + logger.info("SSH tunnel closed successfully.") + return + logger.warning(f"Instead of a SSH tunnel, a {type(tunnel)} was received.") + raise DeprecationWarning( + "This method will only accept the type 'SSHTunnelForwarder' in next major release.\n" + "Please, update your code accordingly." + ) + + @task() def get_local_bind_address_from_ssh_tunnel( tunnel: SSHTunnelForwarder, @@ -270,24 +291,3 @@ def close_mysql_connection(connection: pymysql.Connection) -> None: "This method will only accept the type 'pymysql.Connection' in next major release.\n" "Please, update your code accordingly." ) - - -@task(trigger=all_finished) -def close_ssh_tunnel(tunnel: SSHTunnelForwarder) -> None: - """ - Close a SSH tunnel, or do nothing if something different is passed. - - :param tunnel: a SSH tunnel. - :return: - """ - logger = prefect.context.get("logger") - - if isinstance(tunnel, SSHTunnelForwarder): - tunnel.stop() - logger.info("SSH tunnel closed successfully.") - return - logger.warning(f"Instead of a SSH tunnel, a {type(tunnel)} was received.") - raise DeprecationWarning( - "This method will only accept the type 'SSHTunnelForwarder' in next major release.\n" - "Please, update your code accordingly." - ) From 025f2b7a5c5210bc78f60299a48bdbfcb2bdfd72 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 13:42:20 +0100 Subject: [PATCH 27/35] Updated the changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32fb418..f44d7a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,9 @@ All notable changes to this project will be documented in this file. ### Added -- Added Trino connection capabilities in `connections` module. +- Added Trino connection capabilities in the `connections` module. +- Added MySQL connection capabilities in the `connections` module. +- Added SSH tunneling capabilities in the `connections` module. ## [0.2.0] - 2023-01-19 From 3797615b39bcdedad4a3901a1ca8ddf216b572c8 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 24 Jan 2023 14:28:04 +0100 Subject: [PATCH 28/35] Updated the readme --- README.md | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9160eb3..5d4b6f4 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,12 @@ Lolafect is a collection of Python bits that help us build our Prefect flows. You can find below examples of how to leverage `lolafect` in your flows. +**_Note: the code excerpts below are simplified for brevity and won't run +as-is. If you want to see perfect examples, you might want to check the tests +in this repository._** + +### Config + **Let the `LolaConfig` object do the boilerplate env stuff for you** ```python @@ -36,6 +42,8 @@ lolaconfig = build_lolaconfig( ) ``` +### Connections + **Connect to a Trino server** ```python @@ -43,7 +51,7 @@ from lolafect.connections import connect_to_trino, close_trino_connection with Flow(...) as flow: connection = connect_to_trino.run( - trino_credentials=my_trino_credentials #You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS + trino_credentials=my_trino_credentials # You can probably try to fetch this from lolaconfig.TRINO_CREDENTIALS ) task_result = some_trino_related_task(trino_connection=connection) close_trino_connection.run( @@ -52,6 +60,66 @@ with Flow(...) as flow: ) ``` +**Open an SSH tunnel** + +```python +from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel + +with Flow(...) as flow: + # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS + tunnel = open_ssh_tunnel_with_s3_pkey( + s3_bucket_name="some-bucket", + ssh_tunnel_credentials={...}, + remote_target_host="some-host-probably-mysql", + remote_target_port=12345, + ) + # Tunnel is now alive. tunnel.is_active == True + close_ssh_tunnel(tunnel=tunnel) +``` + +**Connect to a MySQL instance** +```python +from lolafect.connections import connect_to_mysql, close_mysql_connection + +with Flow(...) as flow: + connection = connect_to_mysql.run( + mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS + ) + connection.cursor().execute("SELECT 1") + close_mysql_connection.run(connection=connection) + +# Want to connect through an SSH tunnel? Open the tunnel normally and then +# override the host and port when connecting to MySQL. + +from lolafect.connections import ( + open_ssh_tunnel_with_s3_pkey, + get_local_bind_address_from_ssh_tunnel, + close_ssh_tunnel +) + +with Flow(...) as flow: + # You probably want to fetch these args from lolaconfig.SSH_CREDENTIALS and lolaconfig.DW_CREDENTIALS + tunnel = open_ssh_tunnel_with_s3_pkey( + s3_bucket_name="some-bucket", + ssh_tunnel_credentials={...}, + remote_target_host="the-mysql-host", + remote_target_port=3306, + ) + + connection = connect_to_mysql.run( + mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS + overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( + tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL + ), + ) + + connection.cursor().execute("SELECT 1") + + close_mysql_connection.run(connection=connection) + close_ssh_tunnel.run(tunnel=tunnel) +``` + +### Slack **Send a warning message to slack if your tasks fails** From 4c0fd0f179861b5de286f891fa63294193eaddfa Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Wed, 25 Jan 2023 10:05:03 +0100 Subject: [PATCH 29/35] Change version to dev --- lolafect/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/__version__.py b/lolafect/__version__.py index 0590644..7e876b1 100644 --- a/lolafect/__version__.py +++ b/lolafect/__version__.py @@ -1 +1 @@ -__version__="0.2.0" \ No newline at end of file +__version__="dev" \ No newline at end of file From 28c066ad6fdfe96068d0ae7392855b5e98433ca5 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 13:24:28 +0100 Subject: [PATCH 30/35] Bump version --- lolafect/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lolafect/__version__.py b/lolafect/__version__.py index 7e876b1..4e3f3d3 100644 --- a/lolafect/__version__.py +++ b/lolafect/__version__.py @@ -1 +1 @@ -__version__="dev" \ No newline at end of file +__version__="0.3.0" \ No newline at end of file From b3123c24702e534c791c4088cb91fb93ea621fa0 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 13:24:52 +0100 Subject: [PATCH 31/35] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f44d7a9..008da48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. -## [Unreleased] +## [0.3.0] - 2023-01-27 ### Added From 6c8dc4f2074c3d1f34db6a3550b03fc5adee5188 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 13:36:51 +0100 Subject: [PATCH 32/35] Minor improvements in readme --- README.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5d4b6f4..8258e82 100644 --- a/README.md +++ b/README.md @@ -73,8 +73,9 @@ with Flow(...) as flow: remote_target_host="some-host-probably-mysql", remote_target_port=12345, ) + task_result = some_task_that_needs_the_tunnel(tunnel=tunnel) # Tunnel is now alive. tunnel.is_active == True - close_ssh_tunnel(tunnel=tunnel) + close_ssh_tunnel(tunnel=tunnel, upstream_tasks=[task_result]) ``` **Connect to a MySQL instance** @@ -82,11 +83,11 @@ with Flow(...) as flow: from lolafect.connections import connect_to_mysql, close_mysql_connection with Flow(...) as flow: - connection = connect_to_mysql.run( + connection = connect_to_mysql( mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS ) - connection.cursor().execute("SELECT 1") - close_mysql_connection.run(connection=connection) + task_result = some_task_that_needs_mysql(connection=connection) + close_mysql_connection(connection=connection, upstream_tasks=[task_result]) # Want to connect through an SSH tunnel? Open the tunnel normally and then # override the host and port when connecting to MySQL. @@ -106,17 +107,17 @@ with Flow(...) as flow: remote_target_port=3306, ) - connection = connect_to_mysql.run( + connection = connect_to_mysql( mysql_credentials={...}, # You probably want to get this from TEST_LOLACONFIG.DW_CREDENTIALS overriding_host_and_port=get_local_bind_address_from_ssh_tunnel.run( tunnel=tunnel # This will open the connection through the SSH tunnel instead of straight to MySQL ), ) - connection.cursor().execute("SELECT 1") - - close_mysql_connection.run(connection=connection) - close_ssh_tunnel.run(tunnel=tunnel) + task_result = some_task_that_needs_mysql(connection=connection) + + mysql_closed = close_mysql_connection(connection=connection, upstream_tasks=[task_result]) + close_ssh_tunnel.run(tunnel=tunnel, upstream_tasks=[mysql_closed]) ``` ### Slack From 737bdcab7f3a151cc462c18711afab9f7cb0d57c Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 15:21:49 +0100 Subject: [PATCH 33/35] Last minute fix, remove hardcoded database from mysql connection. --- lolafect/connections.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 223c9f4..a333653 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -257,8 +257,7 @@ def connect_to_mysql( host=mysql_host, port=mysql_port, user=mysql_credentials["user"], - password=mysql_credentials["password"], - database="dw_xl", + password=mysql_credentials["password"] ) # Customizing this attributes to retrieve them later in the flow From d5bc0fb28b52203ea2c9648f1b6378b60c67b210 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 15:22:47 +0100 Subject: [PATCH 34/35] Formatting. --- lolafect/__version__.py | 2 +- lolafect/connections.py | 2 +- lolafect/defaults.py | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lolafect/__version__.py b/lolafect/__version__.py index 4e3f3d3..493f741 100644 --- a/lolafect/__version__.py +++ b/lolafect/__version__.py @@ -1 +1 @@ -__version__="0.3.0" \ No newline at end of file +__version__ = "0.3.0" diff --git a/lolafect/connections.py b/lolafect/connections.py index a333653..12699ca 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -257,7 +257,7 @@ def connect_to_mysql( host=mysql_host, port=mysql_port, user=mysql_credentials["user"], - password=mysql_credentials["password"] + password=mysql_credentials["password"], ) # Customizing this attributes to retrieve them later in the flow diff --git a/lolafect/defaults.py b/lolafect/defaults.py index 1d4af84..5c6c15f 100644 --- a/lolafect/defaults.py +++ b/lolafect/defaults.py @@ -1,7 +1,9 @@ -DEFAULT_ENV_S3_BUCKET="pdo-prefect-flows" -DEFAULT_ENV_FILE_PATH="env/env_prd.json" +DEFAULT_ENV_S3_BUCKET = "pdo-prefect-flows" +DEFAULT_ENV_FILE_PATH = "env/env_prd.json" DEFAULT_PATH_TO_SLACK_WEBHOOKS_FILE = "env/slack_webhooks.json" -DEFAULT_KUBERNETES_IMAGE = "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" -DEFAULT_KUBERNETES_LABELS = ["k8s"] +DEFAULT_KUBERNETES_IMAGE = ( + "373245262072.dkr.ecr.eu-central-1.amazonaws.com/pdo-data-prefect:production" +) +DEFAULT_KUBERNETES_LABELS = ["k8s"] DEFAULT_FLOWS_PATH_IN_BUCKET = "flows/" DEFAULT_TRINO_HTTP_SCHEME = "https" From 143158a3fbbf67e331f3afb6f3ddba3b69475611 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Fri, 27 Jan 2023 15:29:32 +0100 Subject: [PATCH 35/35] Remove unnecessary trick for Great Expec. --- lolafect/connections.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lolafect/connections.py b/lolafect/connections.py index 12699ca..33e2e5f 100644 --- a/lolafect/connections.py +++ b/lolafect/connections.py @@ -260,10 +260,6 @@ def connect_to_mysql( password=mysql_credentials["password"], ) - # Customizing this attributes to retrieve them later in the flow - db_connection.raw_user = mysql_credentials["user"] - db_connection.raw_password = mysql_credentials["password"] - logger.info( f"Connected to MySQL at {mysql_credentials['host']}:{mysql_credentials['port']}." )