From 34496c3128adb5e81ddc407d686771a43af6ed7a Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 13 Aug 2024 16:51:27 +0200 Subject: [PATCH] works --- anaxi/checkpoint.py | 8 +-- anaxi/constants.py | 1 + anaxi/processes.py | 118 ++------------------------------ anaxi/sync.py | 159 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 116 deletions(-) create mode 100644 anaxi/sync.py diff --git a/anaxi/checkpoint.py b/anaxi/checkpoint.py index 1f691ee..9c78792 100644 --- a/anaxi/checkpoint.py +++ b/anaxi/checkpoint.py @@ -27,16 +27,16 @@ class CheckpointManager: return CheckpointManager( file_path=file_path, - highest_synced_timestamp=contents[ - CheckpointManager.highest_synced_timestamp_field_key - ], + highest_synced_timestamp=datetime.datetime.fromisoformat( + contents[CheckpointManager.highest_synced_timestamp_field_key] + ), ) def update_yml(self): write_yaml( path=self.file_path, data={ - CheckpointManager.highest_synced_timestamp_field_key: self.highest_synced_timestamp + CheckpointManager.highest_synced_timestamp_field_key: self.highest_synced_timestamp.isoformat() }, ) diff --git a/anaxi/constants.py b/anaxi/constants.py index 5f99672..872dd91 100644 --- a/anaxi/constants.py +++ b/anaxi/constants.py @@ -4,6 +4,7 @@ from dataclasses import dataclass ASSUMED_PREHISTORICAL_DATETIME = datetime.datetime.fromtimestamp(0) DESTINATION_COLUMN_NAME = "documents" +DEFAULT_BATCH_SIZE = 100 @dataclass diff --git a/anaxi/processes.py b/anaxi/processes.py index 9afb0a9..5d11fd3 100644 --- a/anaxi/processes.py +++ b/anaxi/processes.py @@ -1,10 +1,8 @@ import datetime import json -from psycopg2.extras import execute_batch from psycopg2.sql import SQL -from anaxi.checkpoint import CheckpointManager from anaxi.config import ( get_cosmos_database_config_from_file, get_postgres_database_config_from_file, @@ -18,6 +16,7 @@ from anaxi.constants import ( from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client from anaxi.logging import get_anaxi_logger from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query +from anaxi.sync import SyncJob logger = get_anaxi_logger(__name__) @@ -49,115 +48,10 @@ def run_postgres_healthcheck_process(postgres_database: str) -> None: def run_sync_process(stream_id: str): - stream_config = get_stream_config_from_file(stream_id) - cosmos_db_config = get_cosmos_database_config_from_file( - stream_config.cosmos_database_id - ) - postgres_db_config = get_postgres_database_config_from_file( - stream_config.postgres_database - ) + logger.info("Preparing sync job...") + sync_job = SyncJob(stream_id=stream_id) + logger.info("Sync job ready.") - logger.info("Looking for a previous checkpoint...") + logger.info(f"Checkpoint is at: {sync_job.current_checkpoint}") - checkpoint_file_path = PATHS.checkpoints_folder_path / ( - stream_config.stream_name + ".yml" - ) - try: - checkpoint_manager = CheckpointManager.load_from_yml( - file_path=checkpoint_file_path - ) - except FileNotFoundError: - checkpoint_manager = CheckpointManager( - file_path=checkpoint_file_path, highest_synced_timestamp=None - ) - - if checkpoint_manager.highest_synced_timestamp is None: - logger.info( - "Couldn't find an existing checkpoint. Will run from the stream's configured cutoff." - ) - if stream_config.cutoff_timestamp is None: - logger.info( - "No cutoff has ben set. Loading since the start of the container's history." - ) - checkpoint_manager.highest_synced_timestamp = ASSUMED_PREHISTORICAL_DATETIME - else: - checkpoint_manager.highest_synced_timestamp = stream_config.cutoff_timestamp - - logger.info("Starting sync.") - - # Connect to Cosmos - cosmos_container_client = get_container_client( - config=cosmos_db_config, container_name=stream_config.cosmos_container_name - ) - - # Connect to Postgres - postgres_connection = create_postgres_connection_from_config( - config=postgres_db_config - ) - - batch_size = 100 - # Iterate through cosmos change feed function call - changed_feed_response = cosmos_container_client.query_items_change_feed( - start_time=checkpoint_manager.highest_synced_timestamp - ) - - docs_batch = [] - cursor = postgres_connection.cursor() - cursor.execute( - f""" - CREATE TABLE IF NOT EXISTS {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')} ( - {DESTINATION_COLUMN_NAME} JSONB - ); - """ - ) - postgres_connection.commit() - for doc in changed_feed_response: - docs_batch.append(doc) - if len(docs_batch) < batch_size: - continue - - logger.info("Reached batch size, trying to commit to database...") - commit_batch( - stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor - ) - logger.info("Committed batch.") - - docs_batch = [] - - logger.info(f"Committing dangling documents ({len(docs_batch)})") - commit_batch( - stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor - ) - logger.info("Committed final batch.") - - logger.info("Finished sync.") - - -def commit_batch( - stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor -): - checkpoint_manager.set_new_goal( - goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"]) - ) - - json_data = [(json.dumps(d),) for d in docs_batch] - - insert_query = ( - f"INSERT INTO {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')}" - + f" ({DESTINATION_COLUMN_NAME})" - + " VALUES (%s)" - ) - postgres_connection.rollback() - - execute_batch(cursor, insert_query, json_data) - - try: - postgres_connection.commit() - except Exception as e: - logger.error( - "An error has happened while trying to commit a batch. Rolling back." - ) - postgres_connection.rollback() - raise e - else: - checkpoint_manager.commit_goal() + sync_job.run_sync() diff --git a/anaxi/sync.py b/anaxi/sync.py new file mode 100644 index 0000000..f456e27 --- /dev/null +++ b/anaxi/sync.py @@ -0,0 +1,159 @@ +import datetime +import json + +from psycopg2.extras import execute_batch + +from anaxi.checkpoint import CheckpointManager +from anaxi.config import ( + get_cosmos_database_config_from_file, + get_postgres_database_config_from_file, + get_stream_config_from_file, +) +from anaxi.constants import ( + ASSUMED_PREHISTORICAL_DATETIME, + DEFAULT_BATCH_SIZE, + DESTINATION_COLUMN_NAME, + PATHS, +) +from anaxi.cosmos_tools import get_container_client +from anaxi.logging import get_anaxi_logger +from anaxi.postgres_tools import create_postgres_connection_from_config + +logger = get_anaxi_logger(__name__) + + +class SyncJob: + + def __init__( + self, + stream_id: str, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> None: + + self.stream_config = get_stream_config_from_file(stream_id) + self.cosmos_db_config = get_cosmos_database_config_from_file( + self.stream_config.cosmos_database_id + ) + self.postgres_db_config = get_postgres_database_config_from_file( + self.stream_config.postgres_database + ) + self._batch_size = batch_size + + self.checkpoint_manager = self._create_checkpoint_manager() + self.cosmos_container_client = get_container_client( + config=self.cosmos_db_config, + container_name=self.stream_config.cosmos_container_name, + ) + self.postgres_connection = create_postgres_connection_from_config( + config=self.postgres_db_config + ) + + @property + def current_checkpoint(self): + return self.checkpoint_manager.highest_synced_timestamp + + def run_sync(self): + logger.info("Starting sync.") + + changed_feed_response, cursor = self._prepare_source_and_sink() + + docs_batch = [] + for doc in changed_feed_response: + docs_batch.append(doc) + if len(docs_batch) < self._batch_size: + continue + + logger.info("Reached batch size, trying to commit to database...") + self.commit_batch( + docs_batch, + cursor, + ) + logger.info("Committed batch.") + logger.info( + f"Checkpoint is a now at: {self.checkpoint_manager.highest_synced_timestamp}" + ) + + docs_batch = [] + + if docs_batch: + logger.info(f"Committing dangling documents ({len(docs_batch)})") + self.commit_batch( + docs_batch, + cursor, + ) + logger.info("Committed final batch.") + + logger.info("Finished sync job.") + + def commit_batch(self, docs_batch, cursor): + self.checkpoint_manager.set_new_goal( + goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"]) + ) + + json_data = [(json.dumps(d),) for d in docs_batch] + + insert_query = ( + f"INSERT INTO {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')}" + + f" ({DESTINATION_COLUMN_NAME})" + + " VALUES (%s)" + ) + self.postgres_connection.rollback() + + execute_batch(cursor, insert_query, json_data) + + try: + self.postgres_connection.commit() + except Exception as e: + logger.error( + "An error has happened while trying to commit a batch. Rolling back." + ) + self.postgres_connection.rollback() + raise e + else: + self.checkpoint_manager.commit_goal() + + def _prepare_source_and_sink(self): + changed_feed_response = self.cosmos_container_client.query_items_change_feed( + start_time=self.checkpoint_manager.highest_synced_timestamp + ) + + cursor = self.postgres_connection.cursor() + cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')} ( + {DESTINATION_COLUMN_NAME} JSONB + ); + """ + ) + self.postgres_connection.commit() + return changed_feed_response, cursor + + def _create_checkpoint_manager(self): + checkpoint_file_path = PATHS.checkpoints_folder_path / ( + self.stream_config.stream_name + ".yml" + ) + try: + checkpoint_manager = CheckpointManager.load_from_yml( + file_path=checkpoint_file_path + ) + except FileNotFoundError: + checkpoint_manager = CheckpointManager( + file_path=checkpoint_file_path, highest_synced_timestamp=None + ) + + if checkpoint_manager.highest_synced_timestamp is None: + logger.info( + "Couldn't find an existing checkpoint. Will run from the stream's configured cutoff." + ) + if self.stream_config.cutoff_timestamp is None: + logger.info( + "No cutoff has ben set. Loading since the start of the container's history." + ) + checkpoint_manager.highest_synced_timestamp = ( + ASSUMED_PREHISTORICAL_DATETIME + ) + else: + checkpoint_manager.highest_synced_timestamp = ( + self.stream_config.cutoff_timestamp + ) + return checkpoint_manager