DRY on stg cosmos db models

This commit is contained in:
uri 2024-09-19 15:17:33 +02:00
parent 1c1f8ea82e
commit 3723cf0e57
3 changed files with 33 additions and 46 deletions

View file

@ -0,0 +1,31 @@
{% macro cosmos_db_record_deduplication(source_table, primary_key) %}
/*
This macro provides a deduplication logic for Cosmos DB entities.
If two or more records have a duplicated value in a field that acts as
the primary key, the most recent record will be returned. If the record
is not duplicated, it will also be returned.
Inputs:
- source_table: table that acts as source. Should be a sync model.
- primary_key: unique identifier on which the deduplication will be applied.
Output:
- Returns the set of records from the source_table that are unique according
to the primary_key.
*/
select *
from
-- Some thoughts for the future here:
-- The query below is awful performance wise - but data
-- size is tiny today. Let's tackle the problem as it comes.
(
select
*,
row_number() over (
partition by
{{ adapter.quote("documents") }} ->> '{{ primary_key }}'
order by ({{ adapter.quote("documents") }} ->> '_ts')::integer desc
) as rank
from {{ source_table }}
)
where rank = 1
{% endmacro %}

View file

@ -1,29 +1,7 @@
with
raw_verifications as (select * from {{ source("edeposit", "verifications") }}),
deduped_verifications as (
select *
from
-- Some thoughts for the future here:
-- ··· The query below is awful performance wise, I know. But data
-- size is tiny today. Let's tackle the problem as it comes.
--
-- ··· The deduping logic below will be the same for all the Cosmos
-- DB entities that get brought into the DWH. The only changing
-- parameters will be what's the source table and the PK. I'm
-- not gonna do the macro now, but it would probably be a good
-- idea when we have a second container from Cosmos hitting the
-- DWH.
(
select
*,
row_number() over (
partition by {{ adapter.quote("documents") }} ->> 'id'
order by
({{ adapter.quote("documents") }} ->> '_ts')::integer desc
) as rank
from {{ source("edeposit", "verifications") }}
)
where rank = 1
{{ cosmos_db_record_deduplication("raw_verifications", "id") }}
),
stg_edeposit__verifications as (
select

View file

@ -1,29 +1,7 @@
with
raw_verifications as (select * from {{ source("screening", "verifications") }}),
deduped_verifications as (
select *
from
-- Some thoughts for the future here:
-- ··· The query below is awful performance wise, I know. But data
-- size is tiny today. Let's tackle the problem as it comes.
--
-- ··· The deduping logic below will be the same for all the Cosmos
-- DB entities that get brought into the DWH. The only changing
-- parameters will be what's the source table and the PK. I'm
-- not gonna do the macro now, but it would probably be a good
-- idea when we have a second container from Cosmos hitting the
-- DWH.
(
select
*,
row_number() over (
partition by {{ adapter.quote("documents") }} ->> 'id'
order by
({{ adapter.quote("documents") }} ->> '_ts')::integer desc
) as rank
from raw_verifications
)
where rank = 1
{{ cosmos_db_record_deduplication("raw_verifications", "id") }}
),
stg_screening__verifications as (
select