This commit is contained in:
Pablo Martin 2024-08-13 16:51:27 +02:00
parent 7eb697fecd
commit 34496c3128
4 changed files with 170 additions and 116 deletions

View file

@ -1,10 +1,8 @@
import datetime
import json
from psycopg2.extras import execute_batch
from psycopg2.sql import SQL
from anaxi.checkpoint import CheckpointManager
from anaxi.config import (
get_cosmos_database_config_from_file,
get_postgres_database_config_from_file,
@ -18,6 +16,7 @@ from anaxi.constants import (
from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client
from anaxi.logging import get_anaxi_logger
from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query
from anaxi.sync import SyncJob
logger = get_anaxi_logger(__name__)
@ -49,115 +48,10 @@ def run_postgres_healthcheck_process(postgres_database: str) -> None:
def run_sync_process(stream_id: str):
stream_config = get_stream_config_from_file(stream_id)
cosmos_db_config = get_cosmos_database_config_from_file(
stream_config.cosmos_database_id
)
postgres_db_config = get_postgres_database_config_from_file(
stream_config.postgres_database
)
logger.info("Preparing sync job...")
sync_job = SyncJob(stream_id=stream_id)
logger.info("Sync job ready.")
logger.info("Looking for a previous checkpoint...")
logger.info(f"Checkpoint is at: {sync_job.current_checkpoint}")
checkpoint_file_path = PATHS.checkpoints_folder_path / (
stream_config.stream_name + ".yml"
)
try:
checkpoint_manager = CheckpointManager.load_from_yml(
file_path=checkpoint_file_path
)
except FileNotFoundError:
checkpoint_manager = CheckpointManager(
file_path=checkpoint_file_path, highest_synced_timestamp=None
)
if checkpoint_manager.highest_synced_timestamp is None:
logger.info(
"Couldn't find an existing checkpoint. Will run from the stream's configured cutoff."
)
if stream_config.cutoff_timestamp is None:
logger.info(
"No cutoff has ben set. Loading since the start of the container's history."
)
checkpoint_manager.highest_synced_timestamp = ASSUMED_PREHISTORICAL_DATETIME
else:
checkpoint_manager.highest_synced_timestamp = stream_config.cutoff_timestamp
logger.info("Starting sync.")
# Connect to Cosmos
cosmos_container_client = get_container_client(
config=cosmos_db_config, container_name=stream_config.cosmos_container_name
)
# Connect to Postgres
postgres_connection = create_postgres_connection_from_config(
config=postgres_db_config
)
batch_size = 100
# Iterate through cosmos change feed function call
changed_feed_response = cosmos_container_client.query_items_change_feed(
start_time=checkpoint_manager.highest_synced_timestamp
)
docs_batch = []
cursor = postgres_connection.cursor()
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')} (
{DESTINATION_COLUMN_NAME} JSONB
);
"""
)
postgres_connection.commit()
for doc in changed_feed_response:
docs_batch.append(doc)
if len(docs_batch) < batch_size:
continue
logger.info("Reached batch size, trying to commit to database...")
commit_batch(
stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor
)
logger.info("Committed batch.")
docs_batch = []
logger.info(f"Committing dangling documents ({len(docs_batch)})")
commit_batch(
stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor
)
logger.info("Committed final batch.")
logger.info("Finished sync.")
def commit_batch(
stream_config, checkpoint_manager, postgres_connection, docs_batch, cursor
):
checkpoint_manager.set_new_goal(
goal_timestamp=datetime.datetime.fromtimestamp(docs_batch[-1]["_ts"])
)
json_data = [(json.dumps(d),) for d in docs_batch]
insert_query = (
f"INSERT INTO {stream_config.postgres_schema_name}.{stream_config.cosmos_container_name.replace('-', '_')}"
+ f" ({DESTINATION_COLUMN_NAME})"
+ " VALUES (%s)"
)
postgres_connection.rollback()
execute_batch(cursor, insert_query, json_data)
try:
postgres_connection.commit()
except Exception as e:
logger.error(
"An error has happened while trying to commit a batch. Rolling back."
)
postgres_connection.rollback()
raise e
else:
checkpoint_manager.commit_goal()
sync_job.run_sync()