Merged PR 3152: Deal Pipelines and Stages

# Description

This PR brings Hubspot deal pipelines and stages into staging.

It all comes from one same source table: stages have been normalized from the nested JSON array they come in through Airbyte.

This is a necessary step to make Deal data more consumable in `int_`.

I also had to slightly expand an existing macro to allow specifying table aliases besides column names. This is probably a pattern we might want to extend to other macros.

# Checklist

- [X] The edited models and dependants run properly with production data.
- [X] The edited models are sufficiently documented.
- [X] The edited models contain PK tests, and I've ran and passed them.
- [X] I have checked for DRY opportunities with other models and docs.
- [X] I've picked the right materialization for the affected models.

# Other

- [ ] Check if a full-refresh is required after this PR is merged.

Related work items: #22772
This commit is contained in:
Pablo Martín 2024-10-14 10:25:03 +00:00
commit a6191eba46
5 changed files with 155 additions and 2 deletions

View file

@ -9,6 +9,14 @@ It generates two output fields:
This macro is intended to be used within a SELECT statement
and ensures that the output is properly formatted for further analysis.
*/
{% macro unix_ms_timestamp_to_utc(column_name) %}
to_timestamp({{ adapter.quote(column_name) }} / 1000) at time zone 'UTC'
{% macro unix_ms_timestamp_to_utc(column_name, table_alias=None) %}
{%- if table_alias -%}
to_timestamp(
{{ adapter.quote(table_alias) }}.{{ adapter.quote(column_name) }} / 1000
) at time zone 'UTC'
{%- else -%}
to_timestamp({{ adapter.quote(column_name) }} / 1000) at time zone 'UTC'
{%- endif -%}
{% endmacro %}

View file

@ -10,3 +10,5 @@ sources:
identifier: deals
- name: form_submissions
identifier: form_submissions
- name: deal_pipelines
identifier: deal_pipelines

View file

@ -164,3 +164,98 @@ models:
description: "Timestamp of when data was extracted to DWH."
tests:
- not_null
- name: stg_hubspot__deal_pipelines
description: |
Details of the different deal pipelines. Deal pipelines are templates
for the stages a deal can go through. Typically, a deal can only be
in one pipeline. Pipelines have stages, which are stored in a different
table.
columns:
- name: id_deal_pipeline
data_type: character varying
description: "Unique id for each pipeline."
tests:
- not_null
- unique
- name: deal_pipeline_name
data_type: character varying
description: Name for the pipeline.
tests:
- not_null
- unique
- name: is_active
data_type: boolean
description: Flag indicating if the pipeline is currently active.
tests:
- not_null
- name: created_at_utc
data_type: timestamp
description: When was this record created.
tests:
- not_null
- name: updated_at_utc
data_type: timestamp
description: When was this record last updated.
tests:
- not_null
- name: dwh_extracted_at_utc
data_type: timestamp
description: Timestamp of when the record was read from source.
- name: stg_hubspot__deal_pipeline_stages
description: |
The different stages of deal pipelines.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- id_deal_pipeline
- stage_name
columns:
- name: id_deal_pipeline
data_type: character varying
description: ID of the deal pipeline this stage belongs to.
tests:
- not_null
- name: id_stage
data_type: character varying
description: Unique ID for this stage.
tests:
- not_null
- unique
- name: stage_name
data_type: character varying
description: |
The name of this stage. There might be name collisions across
pipelines.
tests:
- not_null
- name: is_active
data_type: boolean
description: Flag indicating wheter the stage is currently active.
tests:
- not_null
- name: created_at_utc
data_type: timestamp
description: When was this record created.
tests:
- not_null
- name: updated_at_utc
data_type: timestamp
description: When was this record last updated.
tests:
- not_null
- name: dwh_extracted_at_utc
data_type: timestamp
description: Timestamp of when the record was read from source.

View file

@ -0,0 +1,33 @@
with
raw_deal_pipelines as (select * from {{ source("hubspot", "deal_pipelines") }}),
stg_hubspot__deal_pipelines as (
select
{{ adapter.quote("pipelineId") }} as id_deal_pipeline,
stage."stageId" as id_stage,
stage.label as stage_name,
stage.active as is_active,
{{
unix_ms_timestamp_to_utc(
column_name="createdAt", table_alias="stage"
)
}} as created_at_utc,
{{
unix_ms_timestamp_to_utc(
column_name="updatedAt", table_alias="stage"
)
}} as updated_at_utc,
{{ adapter.quote("_airbyte_extracted_at") }} as dwh_extracted_at_utc
from
raw_deal_pipelines,
lateral jsonb_to_recordset({{ adapter.quote("stages") }}::jsonb) as stage(
label text,
active boolean,
"stageId" text,
metadata jsonb,
"createdAt" bigint,
"updatedAt" bigint,
"displayOrder" int
)
)
select *
from stg_hubspot__deal_pipelines

View file

@ -0,0 +1,15 @@
with
raw_deal_pipelines as (select * from {{ source("hubspot", "deal_pipelines") }}),
stg_hubspot__deal_pipelines as (
select
{{ adapter.quote("pipelineId") }} as id_deal_pipeline,
{{ adapter.quote("label") }} as deal_pipeline_name,
{{ adapter.quote("active") }} as is_active,
{{ adapter.quote("createdAt") }} as created_at_utc,
{{ adapter.quote("updatedAt") }} as updated_at_utc,
{{ adapter.quote("_airbyte_extracted_at") }} as dwh_extracted_at_utc
from raw_deal_pipelines
)
select *
from stg_hubspot__deal_pipelines