2024-08-13 15:02:03 +02:00
import datetime
2024-08-09 12:41:23 +02:00
import pathlib
from typing import Dict
2024-08-13 15:02:03 +02:00
from anaxi . constants import PATHS
2024-08-09 12:41:23 +02:00
from anaxi . file_persistence import read_yaml
2024-08-09 14:45:10 +02:00
from anaxi . logging import get_anaxi_logger
2024-08-09 12:41:23 +02:00
2024-08-09 14:45:10 +02:00
logger = get_anaxi_logger ( __name__ )
2024-08-09 12:41:23 +02:00
class CosmosDBDatabaseConfig :
config_root_key : str = " cosmos-databases "
2024-08-13 15:02:03 +02:00
def __init__ (
self , host : str , database_id : str , master_key : str
) - > " CosmosDBDatabaseConfig " :
2024-08-09 12:41:23 +02:00
self . host = host
self . database_id = database_id
self . master_key = master_key
@classmethod
def from_dict ( cls , a_dict ) - > " CosmosDBDatabaseConfig " :
return CosmosDBDatabaseConfig ( * * a_dict )
@classmethod
def from_yaml ( cls , path : pathlib . Path ) - > Dict [ str , " CosmosDBDatabaseConfig " ] :
yaml_contents = read_yaml ( path )
cosmos_db_contents = yaml_contents [ CosmosDBDatabaseConfig . config_root_key ]
try :
database_configs = {
db_id : CosmosDBDatabaseConfig . from_dict ( db_config )
for db_id , db_config in cosmos_db_contents . items ( )
}
except KeyError as e :
logger . error (
" Error reading Cosmos DB config yaml file. The file seems malformed. Review the example in the repository. "
)
raise e
return database_configs
2024-08-09 14:45:10 +02:00
class PostgresDatabaseConfig :
config_root_key : str = " postgres-databases "
2024-08-13 15:02:03 +02:00
def __init__ (
self , host : str , port : int , dbname : str , user : str , password
) - > " PostgresDatabaseConfig " :
2024-08-09 14:45:10 +02:00
self . host = host
self . port = port
self . dbname = dbname
self . user = user
self . password = password
@classmethod
def from_dict ( cls , a_dict ) - > " PostgresDatabaseConfig " :
return PostgresDatabaseConfig ( * * a_dict )
@classmethod
def from_yaml ( cls , path : pathlib . Path ) - > Dict [ str , " PostgresDatabaseConfig " ] :
yaml_contents = read_yaml ( path )
postgres_contents = yaml_contents [ PostgresDatabaseConfig . config_root_key ]
try :
database_configs = {
db_name : PostgresDatabaseConfig . from_dict ( db_config )
for db_name , db_config in postgres_contents . items ( )
}
except KeyError as e :
logger . error (
" Error reading Postgres config yaml file. The file seems malformed. Review the example in the repository. "
)
raise e
return database_configs
2024-08-13 15:02:03 +02:00
class StreamConfig :
config_root_key : str = " streams "
def __init__ (
self ,
stream_name : str ,
cosmos_database_id : str ,
cosmos_container_name : str ,
cutoff_timestamp : datetime . datetime ,
postgres_database : str ,
postgres_schema_name : str ,
) - > " StreamConfig " :
self . stream_name = stream_name
self . cosmos_database_id = cosmos_database_id
self . cosmos_container_name = cosmos_container_name
self . cutoff_timestamp = cutoff_timestamp
self . postgres_database = postgres_database
self . postgres_schema_name = postgres_schema_name
@classmethod
def from_dict ( cls , a_dict ) - > " StreamConfig " :
return StreamConfig ( * * a_dict )
@classmethod
def from_yaml ( cls , path : pathlib . Path ) - > Dict [ str , " StreamConfig " ] :
yaml_contents = read_yaml ( path )
streams_contents = yaml_contents [ StreamConfig . config_root_key ]
try :
streams_configs = {
stream_id : StreamConfig . from_dict ( stream_config )
for stream_id , stream_config in streams_contents . items ( )
}
except KeyError as e :
logger . error (
" Error reading streams config yaml file. The file seems malformed. Review the example in the repository. "
)
raise e
return streams_configs
def get_postgres_database_config_from_file ( postgres_database ) :
logger . info ( " Reading Postgres config file... " )
postgres_configs = PostgresDatabaseConfig . from_yaml ( PATHS . postgres_config_file_path )
logger . info ( f " Found file with { len ( postgres_configs ) } entries. " )
try :
relevant_postgres_config = postgres_configs [ postgres_database ]
except KeyError as e :
logger . error (
f " Couldn ' t find a config entry for database with id: { postgres_database } "
)
raise e
return relevant_postgres_config
def get_cosmos_database_config_from_file ( cosmos_db_id ) :
logger . info ( " Reading Cosmos DB config file... " )
cosmos_db_configs = CosmosDBDatabaseConfig . from_yaml (
PATHS . cosmos_db_config_file_path
)
logger . info ( f " Found file with { len ( cosmos_db_configs ) } entries. " )
try :
relevant_cosmos_db_config = cosmos_db_configs [ cosmos_db_id ]
except KeyError as e :
logger . error (
f " Couldn ' t find a config entry for database with id: { cosmos_db_id } "
)
raise e
return relevant_cosmos_db_config
def get_stream_config_from_file ( stream_id ) :
logger . info ( " Reading streams config file... " )
streams_configs = StreamConfig . from_yaml ( PATHS . streams_config_file_path )
logger . info ( f " Found file with { len ( streams_configs ) } entries. " )
try :
stream_config = streams_configs [ stream_id ]
except KeyError :
logger . error (
f " Could not find the stream { stream_id } in the stream configurations file. You might be missing an entry in { PATHS . streams_config_file_path } or have made a typo. "
)
return stream_config