silly-event-rollup/create_events.py
2025-06-14 23:42:25 +02:00

141 lines
3.1 KiB
Python

import os
from dotenv import load_dotenv
import psycopg2
import uuid
import json
load_dotenv()
conn_params = {
"host": os.getenv("POSTGRES_HOST"),
"port": os.getenv("POSTGRES_PORT"),
"dbname": os.getenv("POSTGRES_DB"),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
}
class User:
def __init__(self, cursor, name, age, hair_color):
self.cursor = cursor
self.id = uuid.uuid4()
self.name = name
self.age = age
self.hair_color = hair_color
self._fire_created_user_event()
def update_user_details(self, name=None, age=None, hair_color=None):
if name:
self.name = name
if age:
self.age = age
if hair_color:
self.hair_color = hair_color
self._fire_updated_user_event()
def delete_user(self):
self._fire_deleted_user_event()
def _fire_created_user_event(self):
event = CreateUserEvent(self)
self._fire_event(event)
def _fire_updated_user_event(self):
event = UpdateUserEvent(self)
self._fire_event(event)
def _fire_deleted_user_event(self):
event = DeleteUserEvent(self)
self._fire_event(event)
def _fire_event(self, event):
query = f"INSERT INTO user_events (event_payload) VALUES ('{event.to_json()}')"
self.cursor.execute(query)
class CreateUserEvent:
def __init__(self, user):
self._user = user
def to_json(self):
return json.dumps(
{
"event_type": "user_created",
"payload": {
"id": self._user.id,
"name": self._user.name,
"age": self._user.age,
"hair_color": self._user.hair_color,
},
},
default=str,
)
class UpdateUserEvent:
def __init__(self, user):
self._user = user
def to_json(self):
return json.dumps(
{
"event_type": "user_updated",
"payload": {
"id": self._user.id,
"name": self._user.name,
"age": self._user.age,
"hair_color": self._user.hair_color,
},
},
default=str,
)
class DeleteUserEvent:
def __init__(self, user):
self._user = user
def to_json(self):
return json.dumps(
{
"event_type": "user_deleted",
"payload": {"id": self._user.id},
},
default=str,
)
def main():
try:
# Connect to PostgreSQL
conn = psycopg2.connect(**conn_params)
cur = conn.cursor()
my_user = User(cursor=cur, name="John", age=95, hair_color="Gray")
my_user.update_user_details(age=96)
my_user.delete_user()
# Commit the transaction
conn.commit()
print("Done")
except Exception as e:
print("Error:", e)
finally:
if cur:
cur.close()
if conn:
conn.close()
if __name__ == "__main__":
main()