Compare commits

...

2 commits

17 changed files with 676 additions and 8 deletions

4
.gitignore vendored
View file

@ -1,4 +1,6 @@
.env .env
.vscode .vscode
*/target */target
.venv

View file

@ -17,4 +17,9 @@ reset: stop
run-rust: run-rust:
cargo build --manifest-path rust/Cargo.toml cargo build --manifest-path rust/Cargo.toml
cargo run --manifest-path rust/Cargo.toml cargo run --manifest-path rust/Cargo.toml
setup-meltano-deps:
meltano init pizza_dw
cd pizza_dw && meltano add extractor tap-postgres && meltano add loader target-postgres

View file

@ -10,18 +10,40 @@
flake-utils.lib.eachDefaultSystem (system: flake-utils.lib.eachDefaultSystem (system:
let let
pkgs = import nixpkgs { inherit system; }; pkgs = import nixpkgs { inherit system; };
python-with-deps = pkgs.python3.withPackages (ps: with ps; [
pip
setuptools
wheel
psycopg2
]);
venvDir = ".venv";
in { in {
devShells.default = pkgs.mkShell { devShells.default = pkgs.mkShell {
buildInputs = with pkgs; [ packages = [
rustup pkgs.rustup
pkg-config pkgs.cargo
openssl pkgs.docker
postgresql pkgs.postgresql
sqlx-cli python-with-deps
]; ];
shellHook = '' shellHook = ''
echo " Rust + PostgreSQL dev env ready" echo " Rust + PostgreSQL dev env ready"
if [ ! -d ${venvDir} ]; then
echo "🐍 Creating Python virtualenv..."
python3 -m venv ${venvDir}
fi
source ${venvDir}/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
echo "Python env ready"
''; '';
}; };
}); });

3
pizza_dw/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
/venv
/.meltano
.env

0
pizza_dw/README.md Normal file
View file

View file

View file

0
pizza_dw/load/.gitkeep Normal file
View file

43
pizza_dw/meltano.yml Normal file
View file

@ -0,0 +1,43 @@
version: 1
default_environment: dev
project_id: 13249288-8bd5-4212-93e2-59af03383713
environments:
- name: dev
config:
plugins:
extractors:
- name: tap-app
config:
host: localhost
port: 5432
user: app
password: app123
database: app_db
ssl: false
select:
- public-customers.*
- public-pizza_orders.*
- name: staging
- name: prod
plugins:
extractors:
- name: tap-postgres
variant: meltanolabs
pip_url: meltanolabs-tap-postgres
- name: tap-app
inherit_from: tap-postgres
loaders:
- name: target-postgres
variant: meltanolabs
pip_url: meltanolabs-target-postgres
config:
host: localhost
port: 5444
user: dw
password: dw123
database: dw_db
ssl: false

View file

View file

2
pizza_dw/output/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*
!.gitignore

View file

@ -0,0 +1,302 @@
{
"plugin_type": "extractors",
"name": "tap-postgres",
"namespace": "tap_postgres",
"variant": "meltanolabs",
"label": "Postgres",
"docs": "https://hub.meltano.com/extractors/tap-postgres--meltanolabs",
"repo": "https://github.com/MeltanoLabs/tap-postgres",
"pip_url": "meltanolabs-tap-postgres",
"description": "PostgreSQL database extractor",
"logo_url": "https://hub.meltano.com/assets/logos/extractors/postgres.png",
"capabilities": [
"about",
"activate-version",
"batch",
"catalog",
"discover",
"schema-flattening",
"state",
"stream-maps"
],
"settings_group_validation": [
[
"sqlalchemy_url"
]
],
"settings": [
{
"name": "batch_config.encoding.compression",
"kind": "options",
"label": "Batch Compression Format",
"description": "Compression format to use for batch files.",
"options": [
{
"label": "GZIP",
"value": "gzip"
},
{
"label": "None",
"value": "none"
}
]
},
{
"name": "batch_config.encoding.format",
"kind": "options",
"label": "Batch Encoding Format",
"description": "Format to use for batch files.",
"options": [
{
"label": "JSONL",
"value": "jsonl"
},
{
"label": "Parquet",
"value": "parquet"
}
]
},
{
"name": "batch_config.storage.prefix",
"kind": "string",
"label": "Batch Storage Prefix",
"description": "Prefix to use when writing batch files."
},
{
"name": "batch_config.storage.root",
"kind": "string",
"label": "Batch Storage Root",
"description": "Root path to use when writing batch files."
},
{
"name": "database",
"kind": "string",
"label": "Database",
"description": "Database name. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "dates_as_string",
"kind": "boolean",
"value": false,
"label": "Dates As String",
"description": "Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True."
},
{
"name": "default_replication_method",
"kind": "options",
"value": "FULL_TABLE",
"label": "Default Replication Method",
"description": "Replication method to use if there is not a catalog entry to override this choice. One of `FULL_TABLE`, `INCREMENTAL`, or `LOG_BASED`.",
"options": [
{
"label": "Full Table",
"value": "FULL_TABLE"
},
{
"label": "Incremental",
"value": "INCREMENTAL"
},
{
"label": "Log Based",
"value": "LOG_BASED"
}
]
},
{
"name": "faker_config.locale",
"kind": "array",
"label": "Faker Locale",
"description": "One or more LCID locale strings to produce localized output for: https://faker.readthedocs.io/en/master/#localization"
},
{
"name": "faker_config.seed",
"kind": "string",
"label": "Faker Seed",
"description": "Value to seed the Faker generator for deterministic output: https://faker.readthedocs.io/en/master/#seeding-the-generator"
},
{
"name": "filter_schemas",
"kind": "array",
"label": "Filter Schemas",
"description": "If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas."
},
{
"name": "flattening_enabled",
"kind": "boolean",
"label": "Enable Schema Flattening",
"description": "'True' to enable schema flattening and automatically expand nested properties."
},
{
"name": "flattening_max_depth",
"kind": "integer",
"label": "Max Flattening Depth",
"description": "The max depth to flatten schemas."
},
{
"name": "host",
"kind": "string",
"label": "Host",
"description": "Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "json_as_object",
"kind": "boolean",
"value": false,
"label": "Json As Object",
"description": "Defaults to false, if true, json and jsonb fields will be Objects."
},
{
"name": "max_record_count",
"kind": "integer",
"label": "Max Record Count",
"description": "Optional. The maximum number of records to return in a single stream."
},
{
"name": "password",
"kind": "string",
"label": "Password",
"description": "Password used to authenticate. Note if sqlalchemy_url is set this will be ignored.",
"sensitive": true
},
{
"name": "port",
"kind": "integer",
"value": 5432,
"label": "Port",
"description": "The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "replication_slot_name",
"kind": "string",
"value": "tappostgres",
"label": "Replication Slot Name",
"description": "Name of the replication slot to use for logical replication. Must be unique for parallel extractions. Only applicable when replication_method is LOG_BASED. - Contain only letters, numbers, and underscores. - Be less than or equal to 63 characters. - Not start with 'pg_'."
},
{
"name": "sqlalchemy_url",
"kind": "string",
"label": "SQLAlchemy URL",
"description": "Example postgresql://[username]:[password]@localhost:5432/[db_name]"
},
{
"name": "ssh_tunnel.enable",
"kind": "boolean",
"value": false,
"label": "SSH Tunnel Enable",
"description": "Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details"
},
{
"name": "ssh_tunnel.host",
"kind": "string",
"label": "SSH Tunnel Host",
"description": "Host of the bastion server, this is the host we'll connect to via ssh"
},
{
"name": "ssh_tunnel.port",
"kind": "integer",
"value": 22,
"label": "SSH Tunnel Port",
"description": "Port to connect to bastion server"
},
{
"name": "ssh_tunnel.private_key",
"kind": "string",
"label": "SSH Tunnel Private Key",
"description": "Private Key for authentication to the bastion server",
"sensitive": true
},
{
"name": "ssh_tunnel.private_key_password",
"kind": "string",
"label": "SSH Tunnel Private Key Password",
"description": "Private Key Password, leave None if no password is set",
"sensitive": true
},
{
"name": "ssh_tunnel.username",
"kind": "string",
"label": "SSH Tunnel Username",
"description": "Username to connect to bastion server"
},
{
"name": "ssl_certificate_authority",
"kind": "string",
"value": "~/.postgresql/root.crl",
"label": "SSL Certificate Authority",
"description": "The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "ssl_client_certificate",
"kind": "string",
"value": "~/.postgresql/postgresql.crt",
"label": "SSL Client Certificate",
"description": "The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "ssl_client_certificate_enable",
"kind": "boolean",
"value": false,
"label": "SSL Client Certificate Enable",
"description": "Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "ssl_client_private_key",
"kind": "string",
"value": "~/.postgresql/postgresql.key",
"label": "SSL Client Private Key",
"description": "The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored.",
"sensitive": true
},
{
"name": "ssl_enable",
"kind": "boolean",
"value": false,
"label": "SSL Enable",
"description": "Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "ssl_mode",
"kind": "string",
"value": "verify-full",
"label": "SSL Mode",
"description": "SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full. Note if sqlalchemy_url is set this will be ignored."
},
{
"name": "ssl_storage_directory",
"kind": "string",
"value": ".secrets",
"label": "SSL Storage Directory",
"description": "The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created."
},
{
"name": "stream_map_config",
"kind": "object",
"label": "User Stream Map Configuration",
"description": "User-defined config values to be used within map expressions."
},
{
"name": "stream_maps",
"kind": "object",
"label": "Stream Maps",
"description": "Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)."
},
{
"name": "stream_options",
"kind": "object",
"label": "Stream Options"
},
{
"name": "use_singer_decimal",
"kind": "boolean",
"label": "Use Singer Decimal",
"description": "Whether to use use strings with `x-singer.decimal` format for decimals in the discovered schema. This is useful to avoid precision loss when working with large numbers."
},
{
"name": "user",
"kind": "string",
"label": "User",
"description": "User name used to authenticate. Note if sqlalchemy_url is set this will be ignored."
}
]
}

View file

@ -0,0 +1,288 @@
{
"plugin_type": "loaders",
"name": "target-postgres",
"namespace": "target_postgres",
"variant": "meltanolabs",
"label": "Postgres",
"docs": "https://hub.meltano.com/loaders/target-postgres--meltanolabs",
"repo": "https://github.com/MeltanoLabs/target-postgres",
"pip_url": "meltanolabs-target-postgres",
"executable": "target-postgres",
"description": "PostgreSQL database loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/postgres.png",
"capabilities": [
"about",
"activate-version",
"hard-delete",
"schema-flattening",
"stream-maps"
],
"settings_group_validation": [
[]
],
"settings": [
{
"name": "activate_version",
"kind": "boolean",
"value": true,
"label": "Activate Version",
"description": "If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well."
},
{
"name": "add_record_metadata",
"kind": "boolean",
"value": true,
"label": "Add Record Metadata",
"description": "Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information."
},
{
"name": "batch_size_rows",
"kind": "integer",
"label": "Batch Size Rows",
"description": "Maximum number of rows in each batch."
},
{
"name": "database",
"kind": "string",
"label": "Database",
"description": "Database name."
},
{
"name": "default_target_schema",
"kind": "string",
"value": "$MELTANO_EXTRACT__LOAD_SCHEMA",
"label": "Default Target Schema",
"description": "Postgres schema to send data to, example: tap-clickup"
},
{
"name": "dialect+driver",
"kind": "string",
"value": "postgresql+psycopg",
"label": "Dialect+Driver",
"description": "DEPRECATED. Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone."
},
{
"name": "faker_config.locale",
"kind": "array",
"label": "Faker Locale",
"description": "One or more LCID locale strings to produce localized output for: https://faker.readthedocs.io/en/master/#localization"
},
{
"name": "faker_config.seed",
"kind": "string",
"label": "Faker Seed",
"description": "Value to seed the Faker generator for deterministic output: https://faker.readthedocs.io/en/master/#seeding-the-generator"
},
{
"name": "flattening_enabled",
"kind": "boolean",
"label": "Enable Schema Flattening",
"description": "'True' to enable schema flattening and automatically expand nested properties."
},
{
"name": "flattening_max_depth",
"kind": "integer",
"label": "Max Flattening Depth",
"description": "The max depth to flatten schemas."
},
{
"name": "hard_delete",
"kind": "boolean",
"value": false,
"label": "Hard Delete",
"description": "When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false."
},
{
"name": "host",
"kind": "string",
"label": "Host",
"description": "Hostname for postgres instance."
},
{
"name": "interpret_content_encoding",
"kind": "boolean",
"value": false,
"label": "Interpret Content Encoding",
"description": "If set to true, the target will interpret the content encoding of the schema to determine how to store the data. Using this option may result in a more efficient storage of the data but may also result in an error if the data is not encoded as expected."
},
{
"name": "load_method",
"kind": "options",
"value": "append-only",
"label": "Load Method",
"description": "The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records.",
"options": [
{
"label": "Append Only",
"value": "append-only"
},
{
"label": "Upsert",
"value": "upsert"
},
{
"label": "Overwrite",
"value": "overwrite"
}
]
},
{
"name": "password",
"kind": "string",
"label": "Password",
"description": "Password used to authenticate.",
"sensitive": true
},
{
"name": "port",
"kind": "integer",
"value": 5432,
"label": "Port",
"description": "The port on which postgres is awaiting connections."
},
{
"name": "process_activate_version_messages",
"kind": "boolean",
"value": true,
"label": "Process `ACTIVATE_VERSION` messages",
"description": "Whether to process `ACTIVATE_VERSION` messages."
},
{
"name": "sanitize_null_text_characters",
"kind": "boolean",
"value": false,
"label": "Sanitize Null Text Characters",
"description": "If set to true, the target will sanitize null characters in char/text/varchar fields, as they are not supported by Postgres. See [postgres documentation](https://www.postgresql.org/docs/current/functions-string.html) for more information about chr(0) not being supported."
},
{
"name": "sqlalchemy_url",
"kind": "string",
"label": "SQLAlchemy URL",
"description": "DEPRECATED. SQLAlchemy connection string. This will override using host, user, password, port, dialect, and all ssl settings. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords"
},
{
"name": "ssh_tunnel.enable",
"kind": "boolean",
"value": false,
"label": "SSH Tunnel Enable",
"description": "Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details"
},
{
"name": "ssh_tunnel.host",
"kind": "string",
"label": "SSH Tunnel Host",
"description": "Host of the bastion host, this is the host we'll connect to via ssh"
},
{
"name": "ssh_tunnel.port",
"kind": "integer",
"value": 22,
"label": "SSH Tunnel Port",
"description": "Port to connect to bastion host"
},
{
"name": "ssh_tunnel.private_key",
"kind": "string",
"label": "SSH Tunnel Private Key",
"description": "Private Key for authentication to the bastion host",
"sensitive": true
},
{
"name": "ssh_tunnel.private_key_password",
"kind": "string",
"label": "SSH Tunnel Private Key Password",
"description": "Private Key Password, leave None if no password is set",
"sensitive": true
},
{
"name": "ssh_tunnel.username",
"kind": "string",
"label": "SSH Tunnel Username",
"description": "Username to connect to bastion host"
},
{
"name": "ssl_certificate_authority",
"kind": "string",
"value": "~/.postgresql/root.crl",
"label": "SSL Certificate Authority",
"description": "The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate."
},
{
"name": "ssl_client_certificate",
"kind": "string",
"value": "~/.postgresql/postgresql.crt",
"label": "SSL Client Certificate",
"description": "The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate."
},
{
"name": "ssl_client_certificate_enable",
"kind": "boolean",
"value": false,
"label": "SSL Client Certificate Enable",
"description": "Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead."
},
{
"name": "ssl_client_private_key",
"kind": "string",
"value": "~/.postgresql/postgresql.key",
"label": "SSL Client Private Key",
"description": "The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate.",
"sensitive": true
},
{
"name": "ssl_enable",
"kind": "boolean",
"value": false,
"label": "SSL Enable",
"description": "Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead."
},
{
"name": "ssl_mode",
"kind": "string",
"value": "verify-full",
"label": "SSL Mode",
"description": "SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full."
},
{
"name": "ssl_storage_directory",
"kind": "string",
"value": ".secrets",
"label": "SSL Storage Directory",
"description": "The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created."
},
{
"name": "stream_map_config",
"kind": "object",
"label": "User Stream Map Configuration",
"description": "User-defined config values to be used within map expressions."
},
{
"name": "stream_maps",
"kind": "object",
"label": "Stream Maps",
"description": "Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)."
},
{
"name": "use_copy",
"kind": "boolean",
"value": false,
"label": "Use COPY",
"description": "Use the COPY command to insert data. This is usually faster than INSERT statements. This option is only available for the postgresql+psycopg dialect+driver."
},
{
"name": "user",
"kind": "string",
"label": "User",
"description": "User name used to authenticate."
},
{
"name": "validate_records",
"kind": "boolean",
"value": true,
"label": "Validate Records",
"description": "Whether to validate the schema of the incoming streams."
}
],
"dialect": "postgres",
"target_schema": "$TARGET_POSTGRES_SCHEMA"
}

View file

View file

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
meltano==3.8.0