diff --git a/alembic/versions/abfbb3d96037_create_user_event_table.py b/alembic/versions/abfbb3d96037_create_user_event_table.py index 949e916..26b7665 100644 --- a/alembic/versions/abfbb3d96037_create_user_event_table.py +++ b/alembic/versions/abfbb3d96037_create_user_event_table.py @@ -11,7 +11,6 @@ from typing import Sequence, Union from alembic import op import sqlalchemy as sa import sqlalchemy.dialects.postgresql as psql -import uuid # revision identifiers, used by Alembic. revision: str = "abfbb3d96037" @@ -22,16 +21,29 @@ depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: """Upgrade schema.""" + + op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto;") + op.create_table( - "user_event_table", - sa.Column("id", sa.UUID(as_uuid=True), nullable=False, default=uuid.uuid4), - sa.Column("sequence", sa.Integer(), nullable=False), + "user_events", + sa.Column( + "id", + sa.UUID(as_uuid=True), + nullable=False, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column("sequence", sa.Integer(), nullable=False, autoincrement=True), sa.Column("event_payload", psql.JSONB, nullable=False), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), sa.PrimaryKeyConstraint("id", "sequence"), ) def downgrade() -> None: """Downgrade schema.""" - op.drop_table("user_event_table") + op.drop_table("user_event") diff --git a/create_events.py b/create_events.py new file mode 100644 index 0000000..5a683b1 --- /dev/null +++ b/create_events.py @@ -0,0 +1,141 @@ +import os +from dotenv import load_dotenv +import psycopg2 +import uuid +import json + +load_dotenv() + +conn_params = { + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT"), + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), +} + + +class User: + + def __init__(self, cursor, name, age, hair_color): + self.cursor = cursor + self.id = uuid.uuid4() + self.name = name + self.age = age + self.hair_color = hair_color + + self._fire_created_user_event() + + def update_user_details(self, name=None, age=None, hair_color=None): + if name: + self.name = name + if age: + self.age = age + if hair_color: + self.hair_color = hair_color + + self._fire_updated_user_event() + + def delete_user(self): + self._fire_deleted_user_event() + + def _fire_created_user_event(self): + event = CreateUserEvent(self) + self._fire_event(event) + + def _fire_updated_user_event(self): + event = UpdateUserEvent(self) + self._fire_event(event) + + def _fire_deleted_user_event(self): + event = DeleteUserEvent(self) + self._fire_event(event) + + def _fire_event(self, event): + query = f"INSERT INTO user_events (event_payload) VALUES ('{event.to_json()}')" + self.cursor.execute(query) + + +class CreateUserEvent: + + def __init__(self, user): + self._user = user + + def to_json(self): + return json.dumps( + { + "event_type": "user_created", + "payload": { + "id": self._user.id, + "name": self._user.name, + "age": self._user.age, + "hair_color": self._user.hair_color, + }, + }, + default=str, + ) + + +class UpdateUserEvent: + def __init__(self, user): + self._user = user + + def to_json(self): + return json.dumps( + { + "event_type": "user_updated", + "payload": { + "id": self._user.id, + "name": self._user.name, + "age": self._user.age, + "hair_color": self._user.hair_color, + }, + }, + default=str, + ) + + +class DeleteUserEvent: + def __init__(self, user): + self._user = user + + def to_json(self): + return json.dumps( + { + "event_type": "user_deleted", + "payload": {"id": self._user.id}, + }, + default=str, + ) + + +def main(): + + try: + # Connect to PostgreSQL + conn = psycopg2.connect(**conn_params) + cur = conn.cursor() + + my_user = User(cursor=cur, name="John", age=95, hair_color="Gray") + + my_user.update_user_details(age=96) + + my_user.delete_user() + + # Commit the transaction + conn.commit() + + print("Done") + + except Exception as e: + print("Error:", e) + + finally: + if cur: + cur.close() + if conn: + conn.close() + + +if __name__ == "__main__": + main() diff --git a/run_show.sh b/run_show.sh new file mode 100644 index 0000000..402b275 --- /dev/null +++ b/run_show.sh @@ -0,0 +1,5 @@ +docker compose down +docker compose up -d +sleep 2s +alembic upgrade head +python3 create_events.py