diff --git a/anaxi/processes.py b/anaxi/processes.py index 5d11fd3..4d3d1b3 100644 --- a/anaxi/processes.py +++ b/anaxi/processes.py @@ -1,21 +1,12 @@ -import datetime -import json - from psycopg2.sql import SQL from anaxi.config import ( get_cosmos_database_config_from_file, get_postgres_database_config_from_file, - get_stream_config_from_file, ) -from anaxi.constants import ( - ASSUMED_PREHISTORICAL_DATETIME, - DESTINATION_COLUMN_NAME, - PATHS, -) -from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client +from anaxi.cosmos_tools import create_cosmos_client_from_config from anaxi.logging import get_anaxi_logger -from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query +from anaxi.postgres_tools import simply_query from anaxi.sync import SyncJob logger = get_anaxi_logger(__name__) @@ -46,7 +37,7 @@ def run_postgres_healthcheck_process(postgres_database: str) -> None: return -def run_sync_process(stream_id: str): +def run_sync_process(stream_id: str) -> None: logger.info("Preparing sync job...") sync_job = SyncJob(stream_id=stream_id) diff --git a/anaxi/sync.py b/anaxi/sync.py index f456e27..ed08b3a 100644 --- a/anaxi/sync.py +++ b/anaxi/sync.py @@ -28,7 +28,7 @@ class SyncJob: self, stream_id: str, batch_size: int = DEFAULT_BATCH_SIZE, - ) -> None: + ) -> "SyncJob": self.stream_config = get_stream_config_from_file(stream_id) self.cosmos_db_config = get_cosmos_database_config_from_file( @@ -48,6 +48,9 @@ class SyncJob: config=self.postgres_db_config ) + self.read_record_count = 0 + self.committed_record_count = 0 + @property def current_checkpoint(self): return self.checkpoint_manager.highest_synced_timestamp @@ -59,6 +62,7 @@ class SyncJob: docs_batch = [] for doc in changed_feed_response: + self.read_record_count += 1 docs_batch.append(doc) if len(docs_batch) < self._batch_size: continue @@ -84,6 +88,8 @@ class SyncJob: logger.info("Committed final batch.") logger.info("Finished sync job.") + logger.info(f"Total documents read: {self.read_record_count}.") + logger.info(f"Total documents written: {self.committed_record_count}.") def commit_batch(self, docs_batch, cursor): self.checkpoint_manager.set_new_goal( @@ -111,12 +117,12 @@ class SyncJob: raise e else: self.checkpoint_manager.commit_goal() + self.committed_record_count += len(docs_batch) def _prepare_source_and_sink(self): changed_feed_response = self.cosmos_container_client.query_items_change_feed( start_time=self.checkpoint_manager.highest_synced_timestamp ) - cursor = self.postgres_connection.cursor() cursor.execute( f"""