data-anaxi/anaxi/sync.py

170 lines
5.6 KiB
Python
Raw Normal View History

2024-08-13 16:51:27 +02:00
import datetime
import json
from psycopg2.extras import execute_batch
from anaxi.checkpoint import CheckpointManager
from anaxi.config import (
get_cosmos_database_config_from_file,
get_postgres_database_config_from_file,
get_stream_config_from_file,
)
from anaxi.constants import (
ASSUMED_PREHISTORICAL_DATETIME,
2024-08-13 17:28:52 +02:00
COSMOS_DB_TIMESTAMP_FIELD_KEY,
2024-08-13 16:51:27 +02:00
DEFAULT_BATCH_SIZE,
DESTINATION_COLUMN_NAME,
PATHS,
)
from anaxi.cosmos_tools import get_container_client
from anaxi.logging import get_anaxi_logger
from anaxi.postgres_tools import create_postgres_connection_from_config
logger = get_anaxi_logger(__name__)
class SyncJob:
def __init__(
self,
stream_id: str,
batch_size: int = DEFAULT_BATCH_SIZE,
2024-08-13 17:17:01 +02:00
) -> "SyncJob":
2024-08-13 16:51:27 +02:00
self.stream_config = get_stream_config_from_file(stream_id)
self.cosmos_db_config = get_cosmos_database_config_from_file(
self.stream_config.cosmos_database_id
)
self.postgres_db_config = get_postgres_database_config_from_file(
self.stream_config.postgres_database
)
self.checkpoint_manager = self._create_checkpoint_manager()
self.cosmos_container_client = get_container_client(
config=self.cosmos_db_config,
container_name=self.stream_config.cosmos_container_name,
)
self.postgres_connection = create_postgres_connection_from_config(
config=self.postgres_db_config
)
2024-08-13 17:17:01 +02:00
self.read_record_count = 0
self.committed_record_count = 0
2024-08-13 17:27:52 +02:00
self._batch_size = batch_size
2024-08-13 17:17:01 +02:00
2024-08-13 16:51:27 +02:00
@property
def current_checkpoint(self):
return self.checkpoint_manager.highest_synced_timestamp
def run_sync(self):
logger.info("Starting sync.")
2024-08-13 17:27:52 +02:00
change_feed_response = self._prepare_cosmos_reader()
postgres_cursor = self._prepare_postgres_writer()
2024-08-13 16:51:27 +02:00
docs_batch = []
2024-08-13 17:27:52 +02:00
for doc in change_feed_response:
2024-08-13 17:17:01 +02:00
self.read_record_count += 1
2024-08-13 16:51:27 +02:00
docs_batch.append(doc)
if len(docs_batch) < self._batch_size:
continue
logger.info("Reached batch size, trying to commit to database...")
self.commit_batch(
docs_batch,
2024-08-13 17:27:52 +02:00
postgres_cursor,
2024-08-13 16:51:27 +02:00
)
logger.info("Committed batch.")
docs_batch = []
if docs_batch:
logger.info(f"Committing dangling documents ({len(docs_batch)})")
self.commit_batch(
docs_batch,
2024-08-13 17:27:52 +02:00
postgres_cursor,
2024-08-13 16:51:27 +02:00
)
logger.info("Committed final batch.")
logger.info("Finished sync job.")
2024-08-13 17:17:01 +02:00
logger.info(f"Total documents read: {self.read_record_count}.")
logger.info(f"Total documents written: {self.committed_record_count}.")
2024-08-13 16:51:27 +02:00
def commit_batch(self, docs_batch, cursor):
self.checkpoint_manager.set_new_goal(
2024-08-13 17:28:52 +02:00
goal_timestamp=datetime.datetime.fromtimestamp(
docs_batch[-1][COSMOS_DB_TIMESTAMP_FIELD_KEY]
)
2024-08-13 16:51:27 +02:00
)
json_data = [(json.dumps(d),) for d in docs_batch]
insert_query = (
f"INSERT INTO {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')}"
+ f" ({DESTINATION_COLUMN_NAME})"
+ " VALUES (%s)"
)
self.postgres_connection.rollback()
execute_batch(cursor, insert_query, json_data)
try:
self.postgres_connection.commit()
except Exception as e:
logger.error(
"An error has happened while trying to commit a batch. Rolling back."
)
self.postgres_connection.rollback()
raise e
else:
self.checkpoint_manager.commit_goal()
2024-08-13 17:17:01 +02:00
self.committed_record_count += len(docs_batch)
2024-08-13 16:51:27 +02:00
2024-08-13 17:27:52 +02:00
def _prepare_cosmos_reader(self):
change_feed_response = self.cosmos_container_client.query_items_change_feed(
2024-08-13 16:51:27 +02:00
start_time=self.checkpoint_manager.highest_synced_timestamp
)
2024-08-13 17:27:52 +02:00
return change_feed_response
def _prepare_postgres_writer(self):
2024-08-13 16:51:27 +02:00
cursor = self.postgres_connection.cursor()
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.stream_config.postgres_schema_name}.{self.stream_config.cosmos_container_name.replace('-', '_')} (
{DESTINATION_COLUMN_NAME} JSONB
);
"""
)
self.postgres_connection.commit()
2024-08-13 17:27:52 +02:00
return cursor
2024-08-13 16:51:27 +02:00
def _create_checkpoint_manager(self):
checkpoint_file_path = PATHS.checkpoints_folder_path / (
self.stream_config.stream_name + ".yml"
)
try:
checkpoint_manager = CheckpointManager.load_from_yml(
file_path=checkpoint_file_path
)
except FileNotFoundError:
checkpoint_manager = CheckpointManager(
file_path=checkpoint_file_path, highest_synced_timestamp=None
)
if checkpoint_manager.highest_synced_timestamp is None:
logger.info(
"Couldn't find an existing checkpoint. Will run from the stream's configured cutoff."
)
if self.stream_config.cutoff_timestamp is None:
logger.info(
"No cutoff has ben set. Loading since the start of the container's history."
)
checkpoint_manager.highest_synced_timestamp = (
ASSUMED_PREHISTORICAL_DATETIME
)
else:
checkpoint_manager.highest_synced_timestamp = (
self.stream_config.cutoff_timestamp
)
return checkpoint_manager