Compare commits

..

No commits in common. "e5d9eca47fc45f79e86dbcb4ac2be8361565cc79" and "aa526dd3f5c7a1e6808e5ff10e9f45594af9aa08" have entirely different histories.

8 changed files with 3 additions and 2196 deletions

5
.gitignore vendored
View file

@ -1,4 +1 @@
.env
.vscode
*/target
.env

View file

@ -14,7 +14,3 @@ reset: stop
@echo "Removing volumes..."
docker volume rm -f $(APP_DB_VOLUME) $(DW_DB_VOLUME)
@echo "Volumes removed. You can now start fresh with 'make start'"
run-rust:
cargo build --manifest-path rust/Cargo.toml
cargo run --manifest-path rust/Cargo.toml

View file

@ -21,6 +21,8 @@
];
shellHook = ''
export DATABASE_URL="postgres://${POSTGRES_APP_USER}:${POSTGRES_APP_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_APP_PORT}/${POSTGRES_APP_DB}"
export DW_DATABASE_URL="postgres://${POSTGRES_DW_USER}:${POSTGRES_DW_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_DW_PORT}/${POSTGRES_DW_DB}"
echo " Rust + PostgreSQL dev env ready"
'';
};

2066
rust/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,11 +0,0 @@
[package]
name = "meltano_playground"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] }
fake = { version = "4", features = ["derive"] }
rand = { version = "0.9.2" }
dotenvy = { version = "0.15.7" }

View file

@ -1,29 +0,0 @@
use sqlx::PgPool;
pub async fn setup_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query(r#"
CREATE TABLE IF NOT EXISTS customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
"#)
.execute(pool)
.await?;
sqlx::query(r#"
CREATE TABLE IF NOT EXISTS pizza_orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
pizza_type TEXT NOT NULL,
quantity INTEGER NOT NULL,
order_time TIMESTAMP WITH TIME ZONE DEFAULT now()
);
"#)
.execute(pool)
.await?;
Ok(())
}

View file

@ -1,54 +0,0 @@
use fake::faker::{internet::en::FreeEmail, name::en::Name};
use fake::Fake;
use rand::seq::IndexedRandom;
use sqlx::PgPool;
use tokio::time::{sleep, Duration};
use rand::thread_rng;
pub async fn generate_data(pool: &PgPool) -> Result<(), sqlx::Error> {
let pizza_types = vec!["Margherita", "Pepperoni", "Hawaiian", "Veggie", "BBQ Chicken"];
let mut last_customer_insert = tokio::time::Instant::now();
loop {
// Add a new customer every 30 seconds
if last_customer_insert.elapsed() >= Duration::from_secs(30) {
let name: String = Name().fake();
let email: String = FreeEmail().fake();
sqlx::query("INSERT INTO customers (name, email) VALUES ($1, $2)")
.bind(name)
.bind(email)
.execute(pool)
.await?;
println!("✅ Created new customer");
last_customer_insert = tokio::time::Instant::now();
}
// Insert a pizza order every second, if there are customers
let customer_opt: Option<(i32,)> = sqlx::query_as("SELECT id FROM customers ORDER BY RANDOM() LIMIT 1")
.fetch_optional(pool)
.await?;
if let Some((customer_id,)) = customer_opt {
let mut rng = thread_rng();
if let Some(pizza_type) = pizza_types.choose(&mut rng) {
let quantity = 1;
sqlx::query("INSERT INTO pizza_orders (customer_id, pizza_type, quantity) VALUES ($1, $2, $3)")
.bind(customer_id)
.bind(*pizza_type)
.bind(quantity)
.execute(pool)
.await?;
println!("🍕 Order placed for customer_id {}: {}", customer_id, pizza_type);
}
} else {
println!("🛑 No customers yet, skipping order.");
}
// Wait 1 second before doing the loop again
sleep(Duration::from_secs(1)).await;
}
}

View file

@ -1,28 +0,0 @@
mod db;
mod generate;
use dotenvy::dotenv;
use std::env;
use sqlx::PgPool;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
dotenv().ok();
println!("{:?}", env::var("POSTGRES_APP_USER"));
let user = env::var("POSTGRES_APP_USER").expect("POSTGRES_APP_USER not set");
let pass = env::var("POSTGRES_APP_PASSWORD").expect("POSTGRES_APP_PASSWORD not set");
let host = env::var("POSTGRES_APP_HOST").expect("POSTGRES_APP_HOST not set");
let port = env::var("POSTGRES_APP_PORT").expect("POSTGRES_POSTGRES_APP_PORTAPP_USER not set");
let db = env::var("POSTGRES_APP_DB").expect("POSTGRES_APP_DB not set");
let database_url = format!("postgres://{}:{}@{}:{}/{}", user, pass, host, port, db);
let pool = PgPool::connect(&database_url).await?;
db::setup_schema(&pool).await?;
generate::generate_data(&pool).await?;
Ok(())
}