Naive duplication of MySQL elements but for Trino. Not working yet.
This commit is contained in:
parent
5b97864e8d
commit
19211eee73
1 changed files with 87 additions and 1 deletions
|
|
@ -68,7 +68,43 @@ def run_data_test_on_trino(
|
||||||
expectation_configurations: List[ExpectationConfiguration],
|
expectation_configurations: List[ExpectationConfiguration],
|
||||||
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
|
great_expectations_s3_bucket: str = DEFAULT_GREAT_EXPECTATIONS_S3_BUCKET,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
raise NotImplementedError("WIP")
|
"""
|
||||||
|
Validate a query and an expectation suite against a given Trino server.
|
||||||
|
|
||||||
|
:param name: a unique name for the data test.
|
||||||
|
:param trino_credentials: credentials for the Trino cluster.
|
||||||
|
:param query: the query to test against.
|
||||||
|
:param expectation_configurations: the expectations on the dataset.
|
||||||
|
:param great_expectations_s3_bucket: the bucket where Great Expectations
|
||||||
|
files live.
|
||||||
|
:return: the result of the data test.
|
||||||
|
"""
|
||||||
|
logger = prefect.context.get("logger")
|
||||||
|
|
||||||
|
logger.info("Creating data context.")
|
||||||
|
data_context = _create_in_memory_data_context_for_trino(
|
||||||
|
trino_credentials, great_expectations_s3_bucket
|
||||||
|
)
|
||||||
|
logger.info("Data context created.")
|
||||||
|
logger.info("Creating expectation suite.")
|
||||||
|
data_context = _create_expectation_suite(
|
||||||
|
data_context, name, expectation_configurations
|
||||||
|
)
|
||||||
|
logger.info("Expectation suite created.")
|
||||||
|
logger.info("Creating checkpoint.")
|
||||||
|
data_context = _create_checkpoint(
|
||||||
|
data_context,
|
||||||
|
f"{trino_credentials['host']}:{trino_credentials['port']}",
|
||||||
|
query,
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
logger.info("Checkpoint created.")
|
||||||
|
logger.info("Running checkpoint.")
|
||||||
|
results = data_context.run_checkpoint(f"{name}_checkpoint")
|
||||||
|
logger.info("Checkpoint finished.")
|
||||||
|
logger.info(f"Validation result: {results['success']}")
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
def _create_in_memory_data_context_for_mysql(
|
def _create_in_memory_data_context_for_mysql(
|
||||||
mysql_credentials: dict,
|
mysql_credentials: dict,
|
||||||
|
|
@ -120,6 +156,56 @@ def _create_in_memory_data_context_for_mysql(
|
||||||
|
|
||||||
return data_context
|
return data_context
|
||||||
|
|
||||||
|
def _create_in_memory_data_context_for_trino(
|
||||||
|
trino_credentials: dict,
|
||||||
|
great_expectations_s3_bucket: str,
|
||||||
|
) -> AbstractDataContext:
|
||||||
|
"""
|
||||||
|
Create a DataContext without a YAML config file and specify a Trino
|
||||||
|
datasource.
|
||||||
|
|
||||||
|
:param trino_credentials: the creds to the mysql where the query will be
|
||||||
|
executed.
|
||||||
|
:param great_expectations_s3_bucket: the name of the bucket where Great
|
||||||
|
Exepctations files while be stored.
|
||||||
|
:return: the data context.
|
||||||
|
"""
|
||||||
|
|
||||||
|
data_context = BaseDataContext(
|
||||||
|
project_config=DataContextConfig(
|
||||||
|
datasources={
|
||||||
|
f"{trino_credentials['host']}:{trino_credentials['port']}": DatasourceConfig(
|
||||||
|
class_name="Datasource",
|
||||||
|
execution_engine={
|
||||||
|
"class_name": "SqlAlchemyExecutionEngine",
|
||||||
|
"connection_string": f"trino://%s:%s@%s:%s/{trino_credentials['db']}/{'SCHEMA_GOES_HERE'}"
|
||||||
|
% (
|
||||||
|
trino_credentials["user"],
|
||||||
|
urlquote(trino_credentials["password"]),
|
||||||
|
trino_credentials["host"],
|
||||||
|
trino_credentials["port"],
|
||||||
|
),
|
||||||
|
},
|
||||||
|
data_connectors={
|
||||||
|
"default_runtime_data_connector_name": {
|
||||||
|
"class_name": "RuntimeDataConnector",
|
||||||
|
"batch_identifiers": ["default_identifier_name"],
|
||||||
|
},
|
||||||
|
"default_inferred_data_connector_name": {
|
||||||
|
"class_name": "InferredAssetSqlDataConnector",
|
||||||
|
"name": "whole_table",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
store_backend_defaults=S3StoreBackendDefaults(
|
||||||
|
default_bucket_name=great_expectations_s3_bucket
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return data_context
|
||||||
|
|
||||||
|
|
||||||
def _create_expectation_suite(
|
def _create_expectation_suite(
|
||||||
data_context: AbstractDataContext,
|
data_context: AbstractDataContext,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue