This commit is contained in:
Pablo Martin 2024-08-13 17:17:01 +02:00
parent 34496c3128
commit 74df79d380
2 changed files with 11 additions and 14 deletions

View file

@ -1,21 +1,12 @@
import datetime
import json
from psycopg2.sql import SQL
from anaxi.config import (
get_cosmos_database_config_from_file,
get_postgres_database_config_from_file,
get_stream_config_from_file,
)
from anaxi.constants import (
ASSUMED_PREHISTORICAL_DATETIME,
DESTINATION_COLUMN_NAME,
PATHS,
)
from anaxi.cosmos_tools import create_cosmos_client_from_config, get_container_client
from anaxi.cosmos_tools import create_cosmos_client_from_config
from anaxi.logging import get_anaxi_logger
from anaxi.postgres_tools import create_postgres_connection_from_config, simply_query
from anaxi.postgres_tools import simply_query
from anaxi.sync import SyncJob
logger = get_anaxi_logger(__name__)
@ -46,7 +37,7 @@ def run_postgres_healthcheck_process(postgres_database: str) -> None:
return
def run_sync_process(stream_id: str):
def run_sync_process(stream_id: str) -> None:
logger.info("Preparing sync job...")
sync_job = SyncJob(stream_id=stream_id)

View file

@ -28,7 +28,7 @@ class SyncJob:
self,
stream_id: str,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> None:
) -> "SyncJob":
self.stream_config = get_stream_config_from_file(stream_id)
self.cosmos_db_config = get_cosmos_database_config_from_file(
@ -48,6 +48,9 @@ class SyncJob:
config=self.postgres_db_config
)
self.read_record_count = 0
self.committed_record_count = 0
@property
def current_checkpoint(self):
return self.checkpoint_manager.highest_synced_timestamp
@ -59,6 +62,7 @@ class SyncJob:
docs_batch = []
for doc in changed_feed_response:
self.read_record_count += 1
docs_batch.append(doc)
if len(docs_batch) < self._batch_size:
continue
@ -84,6 +88,8 @@ class SyncJob:
logger.info("Committed final batch.")
logger.info("Finished sync job.")
logger.info(f"Total documents read: {self.read_record_count}.")
logger.info(f"Total documents written: {self.committed_record_count}.")
def commit_batch(self, docs_batch, cursor):
self.checkpoint_manager.set_new_goal(
@ -111,12 +117,12 @@ class SyncJob:
raise e
else:
self.checkpoint_manager.commit_goal()
self.committed_record_count += len(docs_batch)
def _prepare_source_and_sink(self):
changed_feed_response = self.cosmos_container_client.query_items_change_feed(
start_time=self.checkpoint_manager.highest_synced_timestamp
)
cursor = self.postgres_connection.cursor()
cursor.execute(
f"""