Compare commits
No commits in common. "e5d9eca47fc45f79e86dbcb4ac2be8361565cc79" and "aa526dd3f5c7a1e6808e5ff10e9f45594af9aa08" have entirely different histories.
e5d9eca47f
...
aa526dd3f5
8 changed files with 3 additions and 2196 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -1,4 +1 @@
|
|||
.env
|
||||
.vscode
|
||||
|
||||
*/target
|
||||
4
Makefile
4
Makefile
|
|
@ -14,7 +14,3 @@ reset: stop
|
|||
@echo "Removing volumes..."
|
||||
docker volume rm -f $(APP_DB_VOLUME) $(DW_DB_VOLUME)
|
||||
@echo "Volumes removed. You can now start fresh with 'make start'"
|
||||
|
||||
run-rust:
|
||||
cargo build --manifest-path rust/Cargo.toml
|
||||
cargo run --manifest-path rust/Cargo.toml
|
||||
|
|
@ -21,6 +21,8 @@
|
|||
];
|
||||
|
||||
shellHook = ''
|
||||
export DATABASE_URL="postgres://${POSTGRES_APP_USER}:${POSTGRES_APP_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_APP_PORT}/${POSTGRES_APP_DB}"
|
||||
export DW_DATABASE_URL="postgres://${POSTGRES_DW_USER}:${POSTGRES_DW_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_DW_PORT}/${POSTGRES_DW_DB}"
|
||||
echo "✅ Rust + PostgreSQL dev env ready"
|
||||
'';
|
||||
};
|
||||
|
|
|
|||
2066
rust/Cargo.lock
generated
2066
rust/Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -1,11 +0,0 @@
|
|||
[package]
|
||||
name = "meltano_playground"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] }
|
||||
fake = { version = "4", features = ["derive"] }
|
||||
rand = { version = "0.9.2" }
|
||||
dotenvy = { version = "0.15.7" }
|
||||
|
|
@ -1,29 +0,0 @@
|
|||
use sqlx::PgPool;
|
||||
|
||||
pub async fn setup_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
|
||||
sqlx::query(r#"
|
||||
CREATE TABLE IF NOT EXISTS customers (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
email TEXT NOT NULL UNIQUE,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
|
||||
);
|
||||
"#)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(r#"
|
||||
CREATE TABLE IF NOT EXISTS pizza_orders (
|
||||
id SERIAL PRIMARY KEY,
|
||||
customer_id INTEGER REFERENCES customers(id),
|
||||
pizza_type TEXT NOT NULL,
|
||||
quantity INTEGER NOT NULL,
|
||||
order_time TIMESTAMP WITH TIME ZONE DEFAULT now()
|
||||
);
|
||||
"#)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
use fake::faker::{internet::en::FreeEmail, name::en::Name};
|
||||
use fake::Fake;
|
||||
use rand::seq::IndexedRandom;
|
||||
use sqlx::PgPool;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use rand::thread_rng;
|
||||
|
||||
pub async fn generate_data(pool: &PgPool) -> Result<(), sqlx::Error> {
|
||||
let pizza_types = vec!["Margherita", "Pepperoni", "Hawaiian", "Veggie", "BBQ Chicken"];
|
||||
let mut last_customer_insert = tokio::time::Instant::now();
|
||||
|
||||
loop {
|
||||
// Add a new customer every 30 seconds
|
||||
if last_customer_insert.elapsed() >= Duration::from_secs(30) {
|
||||
let name: String = Name().fake();
|
||||
let email: String = FreeEmail().fake();
|
||||
|
||||
sqlx::query("INSERT INTO customers (name, email) VALUES ($1, $2)")
|
||||
.bind(name)
|
||||
.bind(email)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
println!("✅ Created new customer");
|
||||
last_customer_insert = tokio::time::Instant::now();
|
||||
}
|
||||
|
||||
// Insert a pizza order every second, if there are customers
|
||||
let customer_opt: Option<(i32,)> = sqlx::query_as("SELECT id FROM customers ORDER BY RANDOM() LIMIT 1")
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
if let Some((customer_id,)) = customer_opt {
|
||||
let mut rng = thread_rng();
|
||||
if let Some(pizza_type) = pizza_types.choose(&mut rng) {
|
||||
let quantity = 1;
|
||||
|
||||
sqlx::query("INSERT INTO pizza_orders (customer_id, pizza_type, quantity) VALUES ($1, $2, $3)")
|
||||
.bind(customer_id)
|
||||
.bind(*pizza_type)
|
||||
.bind(quantity)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
println!("🍕 Order placed for customer_id {}: {}", customer_id, pizza_type);
|
||||
}
|
||||
} else {
|
||||
println!("🛑 No customers yet, skipping order.");
|
||||
}
|
||||
|
||||
// Wait 1 second before doing the loop again
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
mod db;
|
||||
mod generate;
|
||||
|
||||
use dotenvy::dotenv;
|
||||
use std::env;
|
||||
use sqlx::PgPool;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), sqlx::Error> {
|
||||
dotenv().ok();
|
||||
|
||||
println!("{:?}", env::var("POSTGRES_APP_USER"));
|
||||
|
||||
let user = env::var("POSTGRES_APP_USER").expect("POSTGRES_APP_USER not set");
|
||||
let pass = env::var("POSTGRES_APP_PASSWORD").expect("POSTGRES_APP_PASSWORD not set");
|
||||
let host = env::var("POSTGRES_APP_HOST").expect("POSTGRES_APP_HOST not set");
|
||||
let port = env::var("POSTGRES_APP_PORT").expect("POSTGRES_POSTGRES_APP_PORTAPP_USER not set");
|
||||
let db = env::var("POSTGRES_APP_DB").expect("POSTGRES_APP_DB not set");
|
||||
|
||||
let database_url = format!("postgres://{}:{}@{}:{}/{}", user, pass, host, port, db);
|
||||
let pool = PgPool::connect(&database_url).await?;
|
||||
|
||||
db::setup_schema(&pool).await?;
|
||||
|
||||
generate::generate_data(&pool).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue