diff --git a/README.md b/README.md index f8b4d15..1606340 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,18 @@ anaxis postgres-healthcheck --postgres-database - [ ] Writing into DWH - [ ] Refactors and improvements +## Development + +### Local Cosmos DB + +Microsoft provides tools to run a local emulator of Cosmos DB. The bad news is we have been unable to make it work so far, it always breaks for one reason or another. + +You can find instructions here: + +- +- + + ## What's with the name `Anaxi` is short for Anaximander. [Anaximander of Miletus](https://en.wikipedia.org/wiki/Anaximander) was a pre-Socratic Greek philosopher who lived in Miletus. He is often called the "Father of Cosmology" and founder of astronomy. diff --git a/anaxi/checkpoint.py b/anaxi/checkpoint.py new file mode 100644 index 0000000..1f691ee --- /dev/null +++ b/anaxi/checkpoint.py @@ -0,0 +1,48 @@ +import datetime +import pathlib +from typing import Union + +from anaxi.file_persistence import read_yaml, write_yaml + + +class CheckpointManager: + + highest_synced_timestamp_field_key = "highest_synced_timestamp" + + def __init__( + self, + file_path: pathlib.Path, + highest_synced_timestamp: Union[None, datetime.datetime], + ): + self.file_path = file_path + self.highest_synced_timestamp = highest_synced_timestamp + self.goal = None + + @classmethod + def load_from_yml(cls, file_path: pathlib.Path) -> "CheckpointManager": + contents = read_yaml(file_path) + + if CheckpointManager.highest_synced_timestamp_field_key not in contents.keys(): + raise ValueError(f"Invalid checkpoint state contents in file: {file_path}.") + + return CheckpointManager( + file_path=file_path, + highest_synced_timestamp=contents[ + CheckpointManager.highest_synced_timestamp_field_key + ], + ) + + def update_yml(self): + write_yaml( + path=self.file_path, + data={ + CheckpointManager.highest_synced_timestamp_field_key: self.highest_synced_timestamp + }, + ) + + def set_new_goal(self, goal_timestamp: datetime.datetime) -> None: + self.goal = goal_timestamp + + def commit_goal(self): + self.highest_synced_timestamp = self.goal + self.update_yml() diff --git a/anaxi/cli.py b/anaxi/cli.py index c3f723e..8dbd466 100644 --- a/anaxi/cli.py +++ b/anaxi/cli.py @@ -7,6 +7,7 @@ from anaxi.logging import get_anaxi_logger from anaxi.processes import ( run_cosmos_db_healthcheck_process, run_postgres_healthcheck_process, + run_sync_process, ) logger = get_anaxi_logger(__name__) @@ -47,3 +48,11 @@ def postgres_healthcheck(postgres_database): logger.info("Starting a Postgres healthcheck.") run_postgres_healthcheck_process(postgres_database) logger.info("Finished the Postgres healthcheck.") + + +@cli.command() +@click.option("--stream-id", type=click.STRING, required=True) +def sync_stream(stream_id): + logger.info("Starting a sync.") + run_sync_process(stream_id) + logger.info("Finished sync.") diff --git a/anaxi/config.py b/anaxi/config.py index f048825..e4bd400 100644 --- a/anaxi/config.py +++ b/anaxi/config.py @@ -1,7 +1,8 @@ -import logging +import datetime import pathlib from typing import Dict +from anaxi.constants import PATHS from anaxi.file_persistence import read_yaml from anaxi.logging import get_anaxi_logger @@ -12,7 +13,9 @@ class CosmosDBDatabaseConfig: config_root_key: str = "cosmos-databases" - def __init__(self, host: str, database_id: str, master_key: str) -> None: + def __init__( + self, host: str, database_id: str, master_key: str + ) -> "CosmosDBDatabaseConfig": self.host = host self.database_id = database_id self.master_key = master_key @@ -45,7 +48,9 @@ class PostgresDatabaseConfig: config_root_key: str = "postgres-databases" - def __init__(self, host: str, port: int, dbname: str, user: str, password) -> None: + def __init__( + self, host: str, port: int, dbname: str, user: str, password + ) -> "PostgresDatabaseConfig": self.host = host self.port = port self.dbname = dbname @@ -74,3 +79,92 @@ class PostgresDatabaseConfig: raise e return database_configs + + +class StreamConfig: + + config_root_key: str = "streams" + + def __init__( + self, + stream_name: str, + cosmos_database_id: str, + cosmos_container_name: str, + cutoff_timestamp: datetime.datetime, + postgres_database: str, + postgres_schema_name: str, + ) -> "StreamConfig": + self.stream_name = stream_name + self.cosmos_database_id = cosmos_database_id + self.cosmos_container_name = cosmos_container_name + self.cutoff_timestamp = cutoff_timestamp + self.postgres_database = postgres_database + self.postgres_schema_name = postgres_schema_name + + @classmethod + def from_dict(cls, a_dict) -> "StreamConfig": + return StreamConfig(**a_dict) + + @classmethod + def from_yaml(cls, path: pathlib.Path) -> Dict[str, "StreamConfig"]: + yaml_contents = read_yaml(path) + + streams_contents = yaml_contents[StreamConfig.config_root_key] + + try: + streams_configs = { + stream_id: StreamConfig.from_dict(stream_config) + for stream_id, stream_config in streams_contents.items() + } + except KeyError as e: + logger.error( + "Error reading streams config yaml file. The file seems malformed. Review the example in the repository." + ) + raise e + + return streams_configs + + +def get_postgres_database_config_from_file(postgres_database): + logger.info("Reading Postgres config file...") + postgres_configs = PostgresDatabaseConfig.from_yaml(PATHS.postgres_config_file_path) + logger.info(f"Found file with {len(postgres_configs)} entries.") + try: + relevant_postgres_config = postgres_configs[postgres_database] + except KeyError as e: + logger.error( + f"Couldn't find a config entry for database with id: {postgres_database}" + ) + raise e + return relevant_postgres_config + + +def get_cosmos_database_config_from_file(cosmos_db_id): + logger.info("Reading Cosmos DB config file...") + cosmos_db_configs = CosmosDBDatabaseConfig.from_yaml( + PATHS.cosmos_db_config_file_path + ) + logger.info(f"Found file with {len(cosmos_db_configs)} entries.") + try: + relevant_cosmos_db_config = cosmos_db_configs[cosmos_db_id] + except KeyError as e: + logger.error( + f"Couldn't find a config entry for database with id: {cosmos_db_id}" + ) + raise e + return relevant_cosmos_db_config + + +def get_stream_config_from_file(stream_id): + logger.info("Reading streams config file...") + streams_configs = StreamConfig.from_yaml(PATHS.streams_config_file_path) + logger.info(f"Found file with {len(streams_configs)} entries.") + + try: + stream_config = streams_configs[stream_id] + except KeyError: + logger.error( + f"Could not find the stream {stream_id} in the stream configurations file. You might be missing an entry in {PATHS.streams_config_file_path} or have made a typo." + ) + + return stream_config diff --git a/anaxi/constants.py b/anaxi/constants.py index a1ac282..5f99672 100644 --- a/anaxi/constants.py +++ b/anaxi/constants.py @@ -1,6 +1,10 @@ +import datetime import pathlib from dataclasses import dataclass +ASSUMED_PREHISTORICAL_DATETIME = datetime.datetime.fromtimestamp(0) +DESTINATION_COLUMN_NAME = "documents" + @dataclass class PATHS: @@ -14,3 +18,7 @@ class PATHS: postgres_config_file_path: pathlib.Path = config_home_path / pathlib.Path( "postgres.yml" ) + streams_config_file_path: pathlib.Path = config_home_path / pathlib.Path( + "streams.yml" + ) + checkpoints_folder_path: pathlib.Path = config_home_path / "checkpoints/" diff --git a/anaxi/cosmos_tools.py b/anaxi/cosmos_tools.py index dee2c3b..f2a12a5 100644 --- a/anaxi/cosmos_tools.py +++ b/anaxi/cosmos_tools.py @@ -1,4 +1,5 @@ import azure.cosmos.cosmos_client as cosmos_client +from azure.cosmos.container import ContainerProxy from anaxi.config import CosmosDBDatabaseConfig @@ -10,3 +11,13 @@ def create_cosmos_client_from_config( url=config.host, credential={"masterKey": config.master_key}, ) + + +def get_container_client( + config: CosmosDBDatabaseConfig, container_name: str +) -> ContainerProxy: + cosmos_client = create_cosmos_client_from_config(config) + database_client = cosmos_client.get_database_client(database=config.database_id) + container_client = database_client.get_container_client(container=container_name) + + return container_client diff --git a/anaxi/file_persistence.py b/anaxi/file_persistence.py index bdd4d4e..5728e5c 100644 --- a/anaxi/file_persistence.py +++ b/anaxi/file_persistence.py @@ -1,4 +1,5 @@ import pathlib +from typing import Any import yaml @@ -6,3 +7,8 @@ import yaml def read_yaml(path: pathlib.Path) -> dict: with open(path) as f: return yaml.safe_load(f) + + +def write_yaml(path: pathlib.Path, data: Any) -> None: + with open(path, "w") as outfile: + yaml.dump(data, outfile, default_flow_style=False) diff --git a/anaxi/processes.py b/anaxi/processes.py index 1b4d837..9afb0a9 100644 --- a/anaxi/processes.py +++ b/anaxi/processes.py @@ -1,28 +1,30 @@ +import datetime +import json + +from psycopg2.extras import execute_batch from psycopg2.sql import SQL -from anaxi.config import CosmosDBDatabaseConfig, PostgresDatabaseConfig -from anaxi.constants import PATHS -from anaxi.cosmos_tools import create_cosmos_client_from_config +from anaxi.checkpoint import CheckpointManager +from anaxi.config import ( + get_cosmos_database_config_from_file, + get_postgres_database_config_from_file, + get_stream_config_from_file, +) +from anaxi.constants import ( + ASSUMED_PREHISTORICAL_DATETIME, + DESTINATION_COLUMN_NAME, + PATHS, +) +from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client from anaxi.logging import get_anaxi_logger -from anaxi.postgres_tools import simply_query +from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query logger = get_anaxi_logger(__name__) def run_cosmos_db_healthcheck_process(cosmos_db_id: str) -> None: - logger.info("Reading Cosmos DB config file...") - cosmos_db_configs = CosmosDBDatabaseConfig.from_yaml( - PATHS.cosmos_db_config_file_path - ) - logger.info(f"Found file with {len(cosmos_db_configs)} entries.") - try: - relevant_cosmos_db_config = cosmos_db_configs[cosmos_db_id] - except KeyError as e: - logger.error( - f"Couldn't find a config entry for database with id: {cosmos_db_id}" - ) - raise e + relevant_cosmos_db_config = get_cosmos_database_config_from_file(cosmos_db_id) logger.info("Creating client...") cosmos_client = create_cosmos_client_from_config(relevant_cosmos_db_config) @@ -36,19 +38,126 @@ def run_cosmos_db_healthcheck_process(cosmos_db_id: str) -> None: def run_postgres_healthcheck_process(postgres_database: str) -> None: - logger.info("Reading Postgres config file...") - postgres_configs = PostgresDatabaseConfig.from_yaml(PATHS.postgres_config_file_path) - logger.info(f"Found file with {len(postgres_configs)} entries.") - try: - relevant_postgres_config = postgres_configs[postgres_database] - except KeyError as e: - logger.error( - f"Couldn't find a config entry for database with id: {postgres_database}" - ) - raise e + relevant_postgres_config = get_postgres_database_config_from_file(postgres_database) logger.info("Connecting and sending a SELECT 1...") query_result = simply_query(config=relevant_postgres_config, query=SQL("SELECT 1;")) logger.info(f"Response: {query_result}") return + + +def run_sync_process(stream_id: str): + + stream_config = get_stream_config_from_file(stream_id) + cosmos_db_config = get_cosmos_database_config_from_file( + stream_config.cosmos_database_id + ) + postgres_db_config = get_postgres_database_config_from_file( + stream_config.postgres_database + ) + + logger.info("Looking for a previous checkpoint...") + + checkpoint_file_path = PATHS.checkpoints_folder_path / ( + stream_config.stream_name + ".yml" + ) + try: + checkpoint_manager = CheckpointManager.load_from_yml( + file_path=checkpoint_file_path + ) + except FileNotFoundError: + checkpoint_manager = CheckpointManager( + file_path=checkpoint_file_path, highest_synced_timestamp=None + ) + + if checkpoint_manager.highest_synced_timestamp is None: + logger.info( + "Couldn't find an existing checkpoint. Will run from the stream's configured cutoff." + ) + if stream_config.cutoff_timestamp is None: + logger.info( + "No cutoff has ben set. Loading since the start of the container's history." + ) + checkpoint_manager.highest_synced_timestamp = ASSUMED_PREHISTORICAL_DATETIME + else: + checkpoint_manager.highest_synced_timestamp = stream_config.cutoff_timestamp + + logger.info("Starting sync.") + + # Connect to Cosmos + cosmos_container_client = get_container_client( + config=cosmos_db_config, container_name=stream_config.cosmos_container_name + ) + + # Connect to Postgres + postgres_connection = create_postgres_connection_from_config( + config=postgres_db_config + ) + + batch_size = 100 + # Iterate through cosmos change feed function call + changed_feed_response = cosmos_container_client.query_items_change_feed( + start_time=checkpoint_manager.highest_synced_timestamp + ) + + docs_batch = [] + cursor = postgres_connection.cursor() + cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')} ( + {DESTINATION_COLUMN_NAME} JSONB + ); + """ + ) + postgres_connection.commit() + for doc in changed_feed_response: + docs_batch.append(doc) + if len(docs_batch) < batch_size: + continue + + logger.info("Reached batch size, trying to commit to database...") + commit_batch( + stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor + ) + logger.info("Committed batch.") + + docs_batch = [] + + logger.info(f"Committing dangling documents ({len(docs_batch)})") + commit_batch( + stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor + ) + logger.info("Committed final batch.") + + logger.info("Finished sync.") + + +def commit_batch( + stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor +): + checkpoint_manager.set_new_goal( + goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"]) + ) + + json_data = [(json.dumps(d),) for d in docs_batch] + + insert_query = ( + f"INSERT INTO {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')}" + + f" ({DESTINATION_COLUMN_NAME})" + + " VALUES (%s)" + ) + postgres_connection.rollback() + + execute_batch(cursor, insert_query, json_data) + + try: + postgres_connection.commit() + except Exception as e: + logger.error( + "An error has happened while trying to commit a batch. Rolling back." + ) + postgres_connection.rollback() + raise e + else: + checkpoint_manager.commit_goal() diff --git a/example-checkpoint.yml b/example-checkpoint.yml new file mode 100644 index 0000000..b2261b8 --- /dev/null +++ b/example-checkpoint.yml @@ -0,0 +1 @@ +highest_synced_timestamp: 2024-08-13T09:52:43+0000 diff --git a/example-streams.yml b/example-streams.yml new file mode 100644 index 0000000..641a2c4 --- /dev/null +++ b/example-streams.yml @@ -0,0 +1,15 @@ +streams: + example-stream: + stream_name: example-stream + cosmos_database_id: some-database-in-yml-file + cosmos_container_name: a-container + cutoff_timestamp: + postgres_database: some-database-in-yml-file + postgres_schema_name: some-schema-name + another-example-stream: + stream_name: another-example-stream + cosmos_database_id: some-database-in-yml-file + cosmos_container_name: a-container + cutoff_timestamp: 2024-08-13T09:52:43+0000 + postgres_database: some-database-in-yml-file + postgres_schema_name: some-schema-name