silly-event-rollup/alembic/versions/394542f24f6c_add_projection_table.py

106 lines
3.1 KiB
Python

"""add projection table
Revision ID: 394542f24f6c
Revises: abfbb3d96037
Create Date: 2025-06-14 23:57:39.301255
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "394542f24f6c"
down_revision: Union[str, None] = "abfbb3d96037"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.create_table(
"users",
sa.Column(
"id",
sa.UUID(as_uuid=True),
nullable=False,
),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("deleted_at", sa.DateTime(timezone=True)),
sa.Column("last_event_id", sa.UUID(as_uuid=True), nullable=False, unique=True),
sa.Column(
"last_event_sequence",
sa.Integer(),
nullable=False,
autoincrement=True,
unique=True,
),
sa.Column("name", sa.String(), nullable=False),
sa.Column("age", sa.Integer(), nullable=False),
sa.Column("hair_color", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.execute(
"""
create or replace function fn_project_user_created(event_id uuid, event_sequence integer, event_timestamp timestamptz, event jsonb) returns uuid
security definer
language plpgsql as $$
declare result uuid;
begin
insert into users(
id,
created_at,
updated_at,
last_event_id,
last_event_sequence,
name,
age,
hair_color
)
values(
cast(event->'event_payload'->>'id' as UUID),
event_timestamp,
event_timestamp,
event_id,
event_sequence,
event->'event_payload'->>'name',
cast(event->'event_payload'->>'age' as INTEGER),
event->'event_payload'->>'hair_color'
)
returning id into result;
return result;
end;
$$;
create or replace function fn_trigger_user_created() returns trigger
security definer
language plpgsql
as $$
begin
perform fn_project_user_created(
new.id,
new.sequence,
new.created_at,
new.event_payload
);
return new;
end;
$$;
create trigger event_insert_user_created
after insert on user_events
for each row
when ((new.event_payload->>'event_type') = 'user_created')
execute procedure fn_trigger_user_created();
"""
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table("users")