147 lines
3.5 KiB
Python
147 lines
3.5 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
import psycopg2
|
|
import uuid
|
|
import json
|
|
|
|
load_dotenv()
|
|
|
|
conn_params = {
|
|
"host": os.getenv("POSTGRES_HOST"),
|
|
"port": os.getenv("POSTGRES_PORT"),
|
|
"dbname": os.getenv("POSTGRES_DB"),
|
|
"user": os.getenv("POSTGRES_USER"),
|
|
"password": os.getenv("POSTGRES_PASSWORD"),
|
|
}
|
|
|
|
|
|
class User:
|
|
|
|
def __init__(self, cursor, name, age, hair_color):
|
|
self.cursor = cursor
|
|
self.id = uuid.uuid4()
|
|
self.name = name
|
|
self.age = age
|
|
self.hair_color = hair_color
|
|
|
|
self._fire_created_user_event()
|
|
|
|
def update_user_details(self, name=None, age=None, hair_color=None):
|
|
if name:
|
|
self.name = name
|
|
if age:
|
|
self.age = age
|
|
if hair_color:
|
|
self.hair_color = hair_color
|
|
|
|
self._fire_updated_user_event()
|
|
|
|
def delete_user(self):
|
|
self._fire_deleted_user_event()
|
|
|
|
def _fire_created_user_event(self):
|
|
event = CreateUserEvent(self)
|
|
self._fire_event(event)
|
|
|
|
def _fire_updated_user_event(self):
|
|
event = UpdateUserEvent(self)
|
|
self._fire_event(event)
|
|
|
|
def _fire_deleted_user_event(self):
|
|
event = DeleteUserEvent(self)
|
|
self._fire_event(event)
|
|
|
|
def _fire_event(self, event):
|
|
query = f"INSERT INTO user_events (event_payload) VALUES ('{event.to_json()}')"
|
|
self.cursor.execute(query)
|
|
|
|
|
|
class CreateUserEvent:
|
|
|
|
def __init__(self, user):
|
|
self._user = user
|
|
|
|
def to_json(self):
|
|
return json.dumps(
|
|
{
|
|
"event_type": "user_created",
|
|
"payload": {
|
|
"id": self._user.id,
|
|
"name": self._user.name,
|
|
"age": self._user.age,
|
|
"hair_color": self._user.hair_color,
|
|
},
|
|
},
|
|
default=str,
|
|
)
|
|
|
|
|
|
class UpdateUserEvent:
|
|
def __init__(self, user):
|
|
self._user = user
|
|
|
|
def to_json(self):
|
|
return json.dumps(
|
|
{
|
|
"event_type": "user_updated",
|
|
"payload": {
|
|
"id": self._user.id,
|
|
"name": self._user.name,
|
|
"age": self._user.age,
|
|
"hair_color": self._user.hair_color,
|
|
},
|
|
},
|
|
default=str,
|
|
)
|
|
|
|
|
|
class DeleteUserEvent:
|
|
def __init__(self, user):
|
|
self._user = user
|
|
|
|
def to_json(self):
|
|
return json.dumps(
|
|
{
|
|
"event_type": "user_deleted",
|
|
"payload": {"id": self._user.id},
|
|
},
|
|
default=str,
|
|
)
|
|
|
|
|
|
def main():
|
|
|
|
try:
|
|
# Connect to PostgreSQL
|
|
conn = psycopg2.connect(**conn_params)
|
|
cur = conn.cursor()
|
|
|
|
my_user = User(cursor=cur, name="John", age=95, hair_color="Gray")
|
|
my_user.update_user_details(age=96)
|
|
my_user.delete_user()
|
|
|
|
my_other_user = User(cursor=cur, name="Jane", age=3, hair_color="Fire Red")
|
|
my_other_user.update_user_details(age=15, hair_color="Golden Blonde")
|
|
my_other_user.update_user_details(age=35, hair_color="Touches of gray")
|
|
|
|
my_immutable_user = User(
|
|
cursor=cur, name="The Dude", age=33, hair_color="Muffin Orange"
|
|
)
|
|
|
|
# Commit the transaction
|
|
conn.commit()
|
|
|
|
print("Done")
|
|
|
|
except Exception as e:
|
|
print("Error:", e)
|
|
|
|
finally:
|
|
if cur:
|
|
cur.close()
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|