Merged PR 2918: Integrates screening API verifications into DWH staging

# Description

This PR integrates screening API verifications into staging.

There's 2 commits:
* The earliest one, it just copy-pastes the strategy followed by edeposit and adapts it to fit screening API case, which is simpler. Note here that the schema entries contain a low number of tests. This is because we only have 7 records in production - and these seem fake anyway :) - so it's complex to extrapolate. Those I could extrapolate (NoFlags/Flagged) I'm taking from the data transformation within the PBI report.
* The last commit, it's just a DRY. It handles the deduplication logic for cosmos db in a macro, and I applied it on both the screening API and the edeposit verifications. Works well from my POV.

Since you guys are far more knowledgeable on APIs scope, I'll let you take a close look in case I missed something.

# Checklist

- [X] The edited models and dependants run properly with production data.
- [ ] The edited models are sufficiently documented. **I guess it can be improved, waiting for your comments here**
- [X] The edited models contain PK tests, and I've ran and passed them.
- [X] I have checked for DRY opportunities with other models and docs.
- [ ] I've picked the right materialization for the affected models. **Used default coming from stg_edeposit__verifications**

# Other

- [ ] Check if a full-refresh is required after this PR is merged.

Related work items: #20127
This commit is contained in:
Oriol Roqué Paniagua 2024-09-25 10:52:50 +00:00 committed by Pablo Martín
commit 070d067709
5 changed files with 213 additions and 23 deletions

View file

@ -0,0 +1,31 @@
{% macro cosmos_db_record_deduplication(source_table, primary_key) %}
/*
This macro provides a deduplication logic for Cosmos DB entities.
If two or more records have a duplicated value in a field that acts as
the primary key, the most recent record will be returned. If the record
is not duplicated, it will also be returned.
Inputs:
- source_table: table that acts as source. Should be a sync model.
- primary_key: unique identifier on which the deduplication will be applied.
Output:
- Returns the set of records from the source_table that are unique according
to the primary_key.
*/
select *
from
-- Some thoughts for the future here:
-- The query below is awful performance wise - but data
-- size is tiny today. Let's tackle the problem as it comes.
(
select
*,
row_number() over (
partition by
{{ adapter.quote("documents") }} ->> '{{ primary_key }}'
order by ({{ adapter.quote("documents") }} ->> '_ts')::integer desc
) as rank
from {{ source_table }}
)
where rank = 1
{% endmacro %}

View file

@ -1,29 +1,7 @@
with
raw_verifications as (select * from {{ source("edeposit", "verifications") }}),
deduped_verifications as (
select *
from
-- Some thoughts for the future here:
-- ··· The query below is awful performance wise, I know. But data
-- size is tiny today. Let's tackle the problem as it comes.
--
-- ··· The deduping logic below will be the same for all the Cosmos
-- DB entities that get brought into the DWH. The only changing
-- parameters will be what's the source table and the PK. I'm
-- not gonna do the macro now, but it would probably be a good
-- idea when we have a second container from Cosmos hitting the
-- DWH.
(
select
*,
row_number() over (
partition by {{ adapter.quote("documents") }} ->> 'id'
order by
({{ adapter.quote("documents") }} ->> '_ts')::integer desc
) as rank
from {{ source("edeposit", "verifications") }}
)
where rank = 1
{{ cosmos_db_record_deduplication("raw_verifications", "id") }}
),
stg_edeposit__verifications as (
select

View file

@ -0,0 +1,8 @@
version: 2
sources:
- name: screening
schema: sync_cdb_screening
tables:
- name: verifications
identifier: verifications

View file

@ -0,0 +1,130 @@
version: 2
models:
- name: stg_screening__verifications
description: |
Records of each transaction that happens in the Screening API. Records are
mutable and can get updated.
columns:
- name: id_verification
data_type: character varying
description: Unique id for the specific transaction.
tests:
- unique
- not_null
- name: id_seon
data_type: text
description: The identifier in Seon.
tests:
- not_null
- name: id_user_partner
data_type: text
description: The unique ID of the partner calling the API.
tests:
- not_null
- name: id_watch_list
data_type: text
description: Identification of the watch list.
- name: email_flag
data_type: text
description: |
noFlags if the email shows no issues, Flagged otherwise.
tests:
- accepted_values:
values:
- "Flagged"
- "noFlags"
- name: phone_flag
data_type: text
description: |
noFlags if the phone number shows no issues, Flagged otherwise.
tests:
- accepted_values:
values:
- "Flagged"
- "noFlags"
- name: watch_list
data_type: text
description: |
noFlags if not in the watch list, Flagged otherwise.
tests:
- accepted_values:
values:
- "Flagged"
- "noFlags"
- name: verification_type
data_type: text
description: |
Type of the verification.
- name: user_email
data_type: text
description: |
The email of the Screening API partner user.
- name: guest_email
data_type: text
description: ""
- name: guest_last_name
data_type: text
description: ""
- name: guest_first_name
data_type: text
description: ""
- name: guest_telephone
data_type: text
description: ""
- name: company_name
data_type: text
description: ""
- name: attachments
data_type: text
description: ""
- name: updated_at_utc
data_type: timestamp without time zone
description: |
Timestamp of the last edit of the record, as set by
Screening API.
tests:
- not_null
- name: updated_date_utc
data_type: timestamp without time zone
description: |
Date of the last edit of the record, as set by
Screening API.
tests:
- not_null
- name: created_at_utc
data_type: timestamp without time zone
description: |
The internal application timestamp of when this record was created.
tests:
- not_null
- name: created_date_utc
data_type: timestamp without time zone
description: |
The internal application date of when this record was created.
tests:
- not_null
- name: cosmos_db_timestamp_utc
data_type: timestamp with time zone
description: The internal Cosmos DB timestamp of the last record update.
tests:
- not_null

View file

@ -0,0 +1,43 @@
with
raw_verifications as (select * from {{ source("screening", "verifications") }}),
deduped_verifications as (
{{ cosmos_db_record_deduplication("raw_verifications", "id") }}
),
stg_screening__verifications as (
select
{{ adapter.quote("documents") }} ->> 'id' as id_verification,
{{ adapter.quote("documents") }} ->> 'SeonId' as id_seon,
{{ adapter.quote("documents") }} ->> 'userId' as id_user_partner,
{{ adapter.quote("documents") }} ->> 'WatchListId' as id_watch_list,
{{ adapter.quote("documents") }} ->> 'EmailFlag' as email_flag,
{{ adapter.quote("documents") }} ->> 'PhoneFlag' as phone_flag,
{{ adapter.quote("documents") }} ->> 'WatchList' as watch_list,
{{ adapter.quote("documents") }}
->> 'VerificationType' as verification_type,
{{ adapter.quote("documents") }} ->> 'UserEmail' as user_email,
{{ adapter.quote("documents") }} ->> 'GuestEmail' as guest_email,
{{ adapter.quote("documents") }} ->> 'GuestLastName' as guest_last_name,
{{ adapter.quote("documents") }} ->> 'GuestFirstName' as guest_first_name,
{{ adapter.quote("documents") }} ->> 'GuestTelephone' as guest_telephone,
{{ adapter.quote("documents") }} ->> 'CompanyName' as company_name,
{{ adapter.quote("documents") }} ->> '_attachments' as attachments,
({{ adapter.quote("documents") }} ->> 'UpdatedDate')::timestamp
as updated_at_utc,
({{ adapter.quote("documents") }} ->> 'UpdatedDate')::date
as updated_date_utc,
({{ adapter.quote("documents") }} ->> 'CreatedDate')::timestamp
as created_at_utc,
({{ adapter.quote("documents") }} ->> 'CreatedDate')::date
as created_date_utc,
to_timestamp(
(({{ adapter.quote("documents") }} ->> '_ts'))::integer
) as cosmos_db_timestamp_utc
from deduped_verifications
)
select *
from stg_screening__verifications