import datetime import json from psycopg2.extras import execute_batch from psycopg2.sql import SQL from anaxi.checkpoint import CheckpointManager from anaxi.config import ( get_cosmos_database_config_from_file, get_postgres_database_config_from_file, get_stream_config_from_file, ) from anaxi.constants import ( ASSUMED_PREHISTORICAL_DATETIME, DESTINATION_COLUMN_NAME, PATHS, ) from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client from anaxi.logging import get_anaxi_logger from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query logger = get_anaxi_logger(__name__) def run_cosmos_db_healthcheck_process(cosmos_db_id: str) -> None: relevant_cosmos_db_config = get_cosmos_database_config_from_file(cosmos_db_id) logger.info("Creating client...") cosmos_client = create_cosmos_client_from_config(relevant_cosmos_db_config) logger.info("Client created.") logger.info("Sending a SELECT 1;") response = cosmos_client.query_databases(query="SELECT 1") logger.info(f"Response: {response.next()}") return def run_postgres_healthcheck_process(postgres_database: str) -> None: relevant_postgres_config = get_postgres_database_config_from_file(postgres_database) logger.info("Connecting and sending a SELECT 1...") query_result = simply_query(config=relevant_postgres_config, query=SQL("SELECT 1;")) logger.info(f"Response: {query_result}") return def run_sync_process(stream_id: str): stream_config = get_stream_config_from_file(stream_id) cosmos_db_config = get_cosmos_database_config_from_file( stream_config.cosmos_database_id ) postgres_db_config = get_postgres_database_config_from_file( stream_config.postgres_database ) logger.info("Looking for a previous checkpoint...") checkpoint_file_path = PATHS.checkpoints_folder_path / ( stream_config.stream_name + ".yml" ) try: checkpoint_manager = CheckpointManager.load_from_yml( file_path=checkpoint_file_path ) except FileNotFoundError: checkpoint_manager = CheckpointManager( file_path=checkpoint_file_path, highest_synced_timestamp=None ) if checkpoint_manager.highest_synced_timestamp is None: logger.info( "Couldn't find an existing checkpoint. Will run from the stream's configured cutoff." ) if stream_config.cutoff_timestamp is None: logger.info( "No cutoff has ben set. Loading since the start of the container's history." ) checkpoint_manager.highest_synced_timestamp = ASSUMED_PREHISTORICAL_DATETIME else: checkpoint_manager.highest_synced_timestamp = stream_config.cutoff_timestamp logger.info("Starting sync.") # Connect to Cosmos cosmos_container_client = get_container_client( config=cosmos_db_config, container_name=stream_config.cosmos_container_name ) # Connect to Postgres postgres_connection = create_postgres_connection_from_config( config=postgres_db_config ) batch_size = 100 # Iterate through cosmos change feed function call changed_feed_response = cosmos_container_client.query_items_change_feed( start_time=checkpoint_manager.highest_synced_timestamp ) docs_batch = [] cursor = postgres_connection.cursor() cursor.execute( f""" CREATE TABLE IF NOT EXISTS {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')} ( {DESTINATION_COLUMN_NAME} JSONB ); """ ) postgres_connection.commit() for doc in changed_feed_response: docs_batch.append(doc) if len(docs_batch) < batch_size: continue logger.info("Reached batch size, trying to commit to database...") commit_batch( stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor ) logger.info("Committed batch.") docs_batch = [] logger.info(f"Committing dangling documents ({len(docs_batch)})") commit_batch( stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor ) logger.info("Committed final batch.") logger.info("Finished sync.") def commit_batch( stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor ): checkpoint_manager.set_new_goal( goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"]) ) json_data = [(json.dumps(d),) for d in docs_batch] insert_query = ( f"INSERT INTO {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')}" + f" ({DESTINATION_COLUMN_NAME})" + " VALUES (%s)" ) postgres_connection.rollback() execute_batch(cursor, insert_query, json_data) try: postgres_connection.commit() except Exception as e: logger.error( "An error has happened while trying to commit a batch. Rolling back." ) postgres_connection.rollback() raise e else: checkpoint_manager.commit_goal()