From 58e99a1869c78070ca3b3136c6b654d84f61ff95 Mon Sep 17 00:00:00 2001 From: Pablo Martin Date: Tue, 13 Aug 2024 17:27:52 +0200 Subject: [PATCH] some refactors --- anaxi/sync.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/anaxi/sync.py b/anaxi/sync.py index ed08b3a..f1d95df 100644 --- a/anaxi/sync.py +++ b/anaxi/sync.py @@ -37,7 +37,6 @@ class SyncJob: self.postgres_db_config = get_postgres_database_config_from_file( self.stream_config.postgres_database ) - self._batch_size = batch_size self.checkpoint_manager = self._create_checkpoint_manager() self.cosmos_container_client = get_container_client( @@ -50,6 +49,7 @@ class SyncJob: self.read_record_count = 0 self.committed_record_count = 0 + self._batch_size = batch_size @property def current_checkpoint(self): @@ -58,10 +58,11 @@ class SyncJob: def run_sync(self): logger.info("Starting sync.") - changed_feed_response, cursor = self._prepare_source_and_sink() + change_feed_response = self._prepare_cosmos_reader() + postgres_cursor = self._prepare_postgres_writer() docs_batch = [] - for doc in changed_feed_response: + for doc in change_feed_response: self.read_record_count += 1 docs_batch.append(doc) if len(docs_batch) < self._batch_size: @@ -70,7 +71,7 @@ class SyncJob: logger.info("Reached batch size, trying to commit to database...") self.commit_batch( docs_batch, - cursor, + postgres_cursor, ) logger.info("Committed batch.") logger.info( @@ -83,7 +84,7 @@ class SyncJob: logger.info(f"Committing dangling documents ({len(docs_batch)})") self.commit_batch( docs_batch, - cursor, + postgres_cursor, ) logger.info("Committed final batch.") @@ -119,10 +120,13 @@ class SyncJob: self.checkpoint_manager.commit_goal() self.committed_record_count += len(docs_batch) - def _prepare_source_and_sink(self): - changed_feed_response = self.cosmos_container_client.query_items_change_feed( + def _prepare_cosmos_reader(self): + change_feed_response = self.cosmos_container_client.query_items_change_feed( start_time=self.checkpoint_manager.highest_synced_timestamp ) + return change_feed_response + + def _prepare_postgres_writer(self): cursor = self.postgres_connection.cursor() cursor.execute( f""" @@ -132,7 +136,7 @@ class SyncJob: """ ) self.postgres_connection.commit() - return changed_feed_response, cursor + return cursor def _create_checkpoint_manager(self): checkpoint_file_path = PATHS.checkpoints_folder_path / (