Unfinished work

This commit is contained in:
Pablo Martin 2023-04-20 15:36:16 +02:00
parent 5066fe4382
commit d9c131dcc6
2 changed files with 71 additions and 63 deletions

View file

@ -178,12 +178,14 @@ def _create_in_memory_data_context_for_trino(
class_name="Datasource", class_name="Datasource",
execution_engine={ execution_engine={
"class_name": "SqlAlchemyExecutionEngine", "class_name": "SqlAlchemyExecutionEngine",
"connection_string": f"trino://%s:%s@%s:%s/{trino_credentials['db']}/{'SCHEMA_GOES_HERE'}" "connection_string": f"trino://%s:%s@%s:%s/%s/%s"
% ( % (
trino_credentials["user"], trino_credentials["user"],
urlquote(trino_credentials["password"]), urlquote(trino_credentials["password"]),
trino_credentials["host"], trino_credentials["host"],
trino_credentials["port"], trino_credentials["port"],
trino_credentials["catalog"],
trino_credentials["schema"],
), ),
}, },
data_connectors={ data_connectors={

View file

@ -18,11 +18,13 @@ from lolafect.connections import open_ssh_tunnel_with_s3_pkey, close_ssh_tunnel
# are working properly. # are working properly.
TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite") TEST_LOLACONFIG = build_lolaconfig(flow_name="testing-suite")
#1 AS a_one,
# 'lol' AS a_string,
# NULL AS a_null
TEST_QUERY = """ TEST_QUERY = """
SELECT 1 AS a_one, SELECT *
'lol' AS a_string, from app_lm_mysql_pl.comprea.market
NULL AS a_null where id = 1
""" """
TEST_EXPECTATIONS_THAT_FIT_DATA = [ TEST_EXPECTATIONS_THAT_FIT_DATA = [
@ -55,62 +57,62 @@ TEST_EXPECTATIONS_THAT_DONT_FIT_DATA = [
), ),
] ]
#
def test_validation_on_mysql_succeeds(): # def test_validation_on_mysql_succeeds():
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( # ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, # s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, # ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], # remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], # remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
) # )
#
validation_result = run_data_test_on_mysql.run( # validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_succeeds", # name="lolafect-testing-test_validation_on_mysql_succeeds",
mysql_credentials={ # mysql_credentials={
"host": ssh_tunnel.local_bind_address[0], # "host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1], # "port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], # "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], # "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], # "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
}, # },
query=TEST_QUERY, # query=TEST_QUERY,
expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, # expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
) # )
#
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) # closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
data_test_passed = validation_result["success"] == True # data_test_passed = validation_result["success"] == True
#
assert data_test_passed # assert data_test_passed
#
#
def test_validation_on_mysql_fails(): # def test_validation_on_mysql_fails():
ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run( # ssh_tunnel = open_ssh_tunnel_with_s3_pkey.run(
s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME, # s3_bucket_name=TEST_LOLACONFIG.S3_BUCKET_NAME,
ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS, # ssh_tunnel_credentials=TEST_LOLACONFIG.SSH_TUNNEL_CREDENTIALS,
remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"], # remote_target_host=TEST_LOLACONFIG.DW_CREDENTIALS["host"],
remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"], # remote_target_port=TEST_LOLACONFIG.DW_CREDENTIALS["port"],
) # )
#
validation_result = run_data_test_on_mysql.run( # validation_result = run_data_test_on_mysql.run(
name="lolafect-testing-test_validation_on_mysql_fails", # name="lolafect-testing-test_validation_on_mysql_fails",
mysql_credentials={ # mysql_credentials={
"host": ssh_tunnel.local_bind_address[0], # "host": ssh_tunnel.local_bind_address[0],
"port": ssh_tunnel.local_bind_address[1], # "port": ssh_tunnel.local_bind_address[1],
"user": TEST_LOLACONFIG.DW_CREDENTIALS["user"], # "user": TEST_LOLACONFIG.DW_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.DW_CREDENTIALS["password"], # "password": TEST_LOLACONFIG.DW_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"], # "db": TEST_LOLACONFIG.DW_CREDENTIALS["default_db"],
}, # },
query=TEST_QUERY, # query=TEST_QUERY,
expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, # expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,
) # )
#
closed_tunnel = close_ssh_tunnel.run(ssh_tunnel) # closed_tunnel = close_ssh_tunnel.run(ssh_tunnel)
#
data_test_failed = validation_result["success"] == False # data_test_failed = validation_result["success"] == False
#
assert data_test_failed # assert data_test_failed
#
def test_validation_on_trino_succeeds(): def test_validation_on_trino_succeeds():
validation_result = run_data_test_on_trino.run( validation_result = run_data_test_on_trino.run(
@ -120,11 +122,14 @@ def test_validation_on_trino_succeeds():
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], "catalog": "data_dw",
"schema": "sandbox"
}, },
query=TEST_QUERY, query=TEST_QUERY,
expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA, expectation_configurations=TEST_EXPECTATIONS_THAT_FIT_DATA,
) )
print("###############\n" * 20)
print(validation_result)
data_test_passed = validation_result["success"] == True data_test_passed = validation_result["success"] == True
@ -139,7 +144,8 @@ def test_validation_on_trino_fails():
"port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"], "port": TEST_LOLACONFIG.TRINO_CREDENTIALS["port"],
"user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"], "user": TEST_LOLACONFIG.TRINO_CREDENTIALS["user"],
"password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"], "password": TEST_LOLACONFIG.TRINO_CREDENTIALS["password"],
"db": TEST_LOLACONFIG.TRINO_CREDENTIALS["default_db"], "catalog": "data_dw",
"schema": "sandbox"
}, },
query=TEST_QUERY, query=TEST_QUERY,
expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA, expectation_configurations=TEST_EXPECTATIONS_THAT_DONT_FIT_DATA,