data-anaxi/anaxi/checkpoint.py

57 lines
1.7 KiB
Python
Raw Normal View History

2024-08-13 15:02:03 +02:00
import datetime
import pathlib
from typing import Union
from anaxi.file_persistence import read_yaml, write_yaml
2024-08-13 17:44:51 +02:00
from anaxi.logging import get_anaxi_logger
logger = get_anaxi_logger(__name__)
2024-08-13 15:02:03 +02:00
class CheckpointManager:
highest_synced_timestamp_field_key = "highest_synced_timestamp"
def __init__(
self,
file_path: pathlib.Path,
highest_synced_timestamp: Union[None, datetime.datetime],
):
self.file_path = file_path
self.highest_synced_timestamp = highest_synced_timestamp
self.goal = None
@classmethod
def load_from_yml(cls, file_path: pathlib.Path) -> "CheckpointManager":
contents = read_yaml(file_path)
if CheckpointManager.highest_synced_timestamp_field_key not in contents.keys():
raise ValueError(f"Invalid checkpoint state contents in file: {file_path}.")
return CheckpointManager(
file_path=file_path,
2024-08-13 16:51:27 +02:00
highest_synced_timestamp=datetime.datetime.fromisoformat(
contents[CheckpointManager.highest_synced_timestamp_field_key]
),
2024-08-13 15:02:03 +02:00
)
def update_yml(self):
write_yaml(
path=self.file_path,
data={
2024-08-13 17:38:29 +02:00
CheckpointManager.highest_synced_timestamp_field_key: self.highest_synced_timestamp.astimezone(
tz=datetime.timezone.utc
).isoformat()
2024-08-13 15:02:03 +02:00
},
)
2024-08-13 17:44:51 +02:00
logger.info(
f"New checkpoint at {self.highest_synced_timestamp.astimezone(tz=datetime.timezone.utc)}"
)
2024-08-13 15:02:03 +02:00
def set_new_goal(self, goal_timestamp: datetime.datetime) -> None:
self.goal = goal_timestamp
def commit_goal(self):
self.highest_synced_timestamp = self.goal
self.update_yml()