import datetime import json from psycopg2.extras import execute_batch from anaxi.checkpoint import CheckpointManager from anaxi.config import ( get_cosmos_database_config_from_file, get_postgres_database_config_from_file, get_stream_config_from_file, ) from anaxi.constants import ( ASSUMED_PREHISTORICAL_DATETIME, DEFAULT_BATCH_SIZE, DESTINATION_COLUMN_NAME, PATHS, ) from anaxi.cosmos_tools import get_container_client from anaxi.logging import get_anaxi_logger from anaxi.postgres_tools import create_postgres_connection_from_config logger = get_anaxi_logger(__name__) class SyncJob: def __init__( self, stream_id: str, batch_size: int = DEFAULT_BATCH_SIZE, ) -> "SyncJob": self.stream_config = get_stream_config_from_file(stream_id) self.cosmos_db_config = get_cosmos_database_config_from_file( self.stream_config.cosmos_database_id ) self.postgres_db_config = get_postgres_database_config_from_file( self.stream_config.postgres_database ) self.checkpoint_manager = self._create_checkpoint_manager() self.cosmos_container_client = get_container_client( config=self.cosmos_db_config, container_name=self.stream_config.cosmos_container_name, ) self.postgres_connection = create_postgres_connection_from_config( config=self.postgres_db_config ) self.read_record_count = 0 self.committed_record_count = 0 self._batch_size = batch_size @property def current_checkpoint(self): return self.checkpoint_manager.highest_synced_timestamp def run_sync(self): logger.info("Starting sync.") change_feed_response = self._prepare_cosmos_reader() postgres_cursor = self._prepare_postgres_writer() docs_batch = [] for doc in change_feed_response: self.read_record_count += 1 docs_batch.append(doc) if len(docs_batch) < self._batch_size: continue logger.info("Reached batch size, trying to commit to database...") self.commit_batch( docs_batch, postgres_cursor, ) logger.info("Committed batch.") logger.info( f"Checkpoint is a now at: {self.checkpoint_manager.highest_synced_timestamp}" ) docs_batch = [] if docs_batch: logger.info(f"Committing dangling documents ({len(docs_batch)})") self.commit_batch( docs_batch, postgres_cursor, ) logger.info("Committed final batch.") logger.info("Finished sync job.") logger.info(f"Total documents read: {self.read_record_count}.") logger.info(f"Total documents written: {self.committed_record_count}.") def commit_batch(self, docs_batch, cursor): self.checkpoint_manager.set_new_goal( goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"]) ) json_data = [(json.dumps(d),) for d in docs_batch] insert_query = ( f"INSERT INTO {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')}" + f" ({DESTINATION_COLUMN_NAME})" + " VALUES (%s)" ) self.postgres_connection.rollback() execute_batch(cursor, insert_query, json_data) try: self.postgres_connection.commit() except Exception as e: logger.error( "An error has happened while trying to commit a batch. Rolling back." ) self.postgres_connection.rollback() raise e else: self.checkpoint_manager.commit_goal() self.committed_record_count += len(docs_batch) def _prepare_cosmos_reader(self): change_feed_response = self.cosmos_container_client.query_items_change_feed( start_time=self.checkpoint_manager.highest_synced_timestamp ) return change_feed_response def _prepare_postgres_writer(self): cursor = self.postgres_connection.cursor() cursor.execute( f""" CREATE TABLE IF NOT EXISTS {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')} ( {DESTINATION_COLUMN_NAME} JSONB ); """ ) self.postgres_connection.commit() return cursor def _create_checkpoint_manager(self): checkpoint_file_path = PATHS.checkpoints_folder_path / ( self.stream_config.stream_name + ".yml" ) try: checkpoint_manager = CheckpointManager.load_from_yml( file_path=checkpoint_file_path ) except FileNotFoundError: checkpoint_manager = CheckpointManager( file_path=checkpoint_file_path, highest_synced_timestamp=None ) if checkpoint_manager.highest_synced_timestamp is None: logger.info( "Couldn't find an existing checkpoint. Will run from the stream's configured cutoff." ) if self.stream_config.cutoff_timestamp is None: logger.info( "No cutoff has ben set. Loading since the start of the container's history." ) checkpoint_manager.highest_synced_timestamp = ( ASSUMED_PREHISTORICAL_DATETIME ) else: checkpoint_manager.highest_synced_timestamp = ( self.stream_config.cutoff_timestamp ) return checkpoint_manager