From e2850da78725cb5f5374db88c11cf8316f2a9087 Mon Sep 17 00:00:00 2001 From: counterweight Date: Sun, 15 Jun 2025 00:56:00 +0200 Subject: [PATCH] creation projection workinggg --- .../394542f24f6c_add_projection_table.py | 57 ++++++++++++++++++- create_events.py | 6 +- run_show.sh | 2 + 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/alembic/versions/394542f24f6c_add_projection_table.py b/alembic/versions/394542f24f6c_add_projection_table.py index 4bdc7b9..151ba22 100644 --- a/alembic/versions/394542f24f6c_add_projection_table.py +++ b/alembic/versions/394542f24f6c_add_projection_table.py @@ -30,7 +30,7 @@ def upgrade() -> None: ), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), - sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("deleted_at", sa.DateTime(timezone=True)), sa.Column("last_event_id", sa.UUID(as_uuid=True), nullable=False, unique=True), sa.Column( "last_event_sequence", @@ -45,6 +45,61 @@ def upgrade() -> None: sa.PrimaryKeyConstraint("id"), ) + op.execute( + """ + create or replace function fn_project_user_created(event_id uuid, event_sequence integer, event_timestamp timestamptz, event jsonb) returns uuid + security definer + language plpgsql as $$ + declare result uuid; + begin + insert into users( + id, + created_at, + updated_at, + last_event_id, + last_event_sequence, + name, + age, + hair_color + ) + values( + cast(event->'event_payload'->>'id' as UUID), + event_timestamp, + event_timestamp, + event_id, + event_sequence, + event->'event_payload'->>'name', + cast(event->'event_payload'->>'age' as INTEGER), + event->'event_payload'->>'hair_color' + ) + returning id into result; + return result; + end; + $$; + + create or replace function fn_trigger_user_created() returns trigger + security definer + language plpgsql + as $$ + begin + perform fn_project_user_created( + new.id, + new.sequence, + new.created_at, + new.event_payload + ); + return new; + end; + $$; + + create trigger event_insert_user_created + after insert on user_events + for each row + when ((new.event_payload->>'event_type') = 'user_created') + execute procedure fn_trigger_user_created(); + """ + ) + def downgrade() -> None: """Downgrade schema.""" diff --git a/create_events.py b/create_events.py index 7d04f8a..fb3314e 100644 --- a/create_events.py +++ b/create_events.py @@ -65,7 +65,7 @@ class CreateUserEvent: return json.dumps( { "event_type": "user_created", - "payload": { + "event_payload": { "id": self._user.id, "name": self._user.name, "age": self._user.age, @@ -84,7 +84,7 @@ class UpdateUserEvent: return json.dumps( { "event_type": "user_updated", - "payload": { + "event_payload": { "id": self._user.id, "name": self._user.name, "age": self._user.age, @@ -103,7 +103,7 @@ class DeleteUserEvent: return json.dumps( { "event_type": "user_deleted", - "payload": {"id": self._user.id}, + "event_payload": {"id": self._user.id}, }, default=str, ) diff --git a/run_show.sh b/run_show.sh index 402b275..aa56cdd 100644 --- a/run_show.sh +++ b/run_show.sh @@ -1,5 +1,7 @@ docker compose down docker compose up -d sleep 2s +echo "Running migrations" alembic upgrade head +echo "Creating events" python3 create_events.py