E-commerce Showcase
:
default_environment: default
environments:
default:
connections:
postgres:
- name: pg-default
username: postgres
password: postgres
host: localhost
port: 5432
database: postgres
duckdb:
- name: duckdb-default
path: duckdb.db
What this hands-on tutorial covers
Ingestion Layer
Learn how to configure Bruin assets for incremental ingestion from Postgres into DuckDB.
Transformation Layer
Explore staging and mart patterns that turn raw e-commerce data into usable analytics.
Quality Checks
Implement column-level and business logic validations to ensure data integrity.
Analytics & Insights
Build daily revenue and customer lifetime value dashboards from validated datasets.
This tutorial is structured to give you a straightforward, engineering-first path to building a production-ready analytics pipeline. It’s about understanding how each component — ingestion, transformation, validation, and analysis — fits together under Bruin’s unified workflow.
1) Prerequisites & Setup
- Docker (for Postgres)
- Python 3.11+
uv(fast Python toolchain manager) → https://docs.astral.sh/uv/- Git, Bash
git clone https://github.com/danielhe4rt/bruin-ecommerce-pipeline
cd bruin-ecommerce-pipeline
Or use the bruin templates directly from the CLI:
bruin init ecommerce
cd bruin
Then, open the project in your favorite code editor and let's get started!
2) Understanding the Data Model (OLTP → Analytics)
In e-commerce — and most analytics-driven businesses — data pipelines are typically organized into layers, each serving a specific purpose. This layered structure keeps data reliable, interpretable, and adaptable as the business scales.
The ultimate goal is to make data analysis effortless and focused on answering real business questions across key domains, such as:
| Domain | Analysis Focus |
|---|---|
| Customers | Segmentation by age, geography, and behavior patterns |
| Products and Product Variants | Performance by category, profitability, and pricing insights |
| Orders and Order Items | Daily revenue, order volume, and purchasing trends |
| KPI layer | Metrics like AOV, units per order, top SKUs, and repeat purchase rate |
By structuring data this way, teams can move from raw information to actionable insights with clarity and confidence.
At this tutorial, it's important to understand the data relationships that form the backbone of most e-commerce systems. These relationships define how information flows across entities — from customers placing orders to products being sold and analyzed.
Here’s the simplified schema we’ll use throughout this tutorial:
| From | To | Relationship Type |
|---|---|---|
| customers(id) | orders(customer_id) | 1:N (One-to-Many) |
| orders(id) | order_items(order_id) | 1:N (One-to-Many) |
| products(id) | product_variants(product_id) | 1:N (One-to-Many) |
| product_variants(id) | order_items(variant_id) | 1:N (One-to-Many) |
CREATE TABLE IF NOT EXISTS customers (
id BIGSERIAL PRIMARY KEY,
full_name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
country TEXT NOT NULL,
age INTEGER NOT NULL CHECK (age >= 0),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
This model mirrors a real-world e-commerce workflow:
- A customer makes one or more orders.
- Each order contains one or more items.
- Each item points to a specific product variant (like a red T-shirt, size M).
- Each variant belongs to a broader product (e.g., “Classic T-Shirt”).
These foreign key relationships allow us to join datasets efficiently and enable downstream analytics like:
| Domain | Materialized Table | Depends On |
|---|---|---|
| Customers | mart.customers_by_age | stg.customers |
| Customers | mart.customers_by_country | stg.customers |
| Products | mart.revenue_by_product | raw.order_items, raw.product_variants, raw.products |
| Products | mart.revenue_by_category | raw.order_items, raw.product_variants, raw.products |
| Products | mart.revenue_by_product_variant | stg.product_variants, stg.order_items, stg.orders, stg.products |
| Sales | mart.sales_daily | stg.order_items, stg.orders |
By establishing these links early, every layer of the pipeline — raw, staging, and mart — can remain consistent and traceable across transformations.
3) Setting Up the Source Database (Docker + Postgres)
First, let's spin up our PostgreSQL instance:
docker run -d \
--name bruin-postgres \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=postgres \
-p 5432:5432 \
postgres:18-alpine
docker compose up -d
make docker-up
Now, we must ingest the DDL inside the PostgresSQL instance:
cat sql/ddl.sql | docker exec -i bruin-ecommerce psql -U postgres -d postgres
make db-migrate
make help4) Populating Test Data
We’ll use Python with uv— a fast, lightweight Python environment manager — to handle dependencies and execution. This approach lets us install only what’s needed (no global dependencies, no virtualenv folders), and it’s fully idempotent.
Dependencies
Inside the project, you’ll find a script called generate_data.py.
This script populates our five main tables (customers, products, product_variants, orders, and order_items) with realistic, time-distributed data that mimics an actual e-commerce platform.
To make this possible, we rely on two lightweight Python libraries:
+ faker==37 # For realistic names, emails, and categorical data
+ psycopg==3 # For direct PostgreSQL database access
These are automatically handled when running with uv.
#!/usr/bin/env python3
"""
E-commerce demo data generator (Postgres).
Usage:
uv run generate-data --dsn postgresql://user:pass@host:5432/db
"""
import argparse, random
from datetime import datetime, timedelta, timezone
from faker import Faker
import psycopg
# -------------------------------------------------------------------
# GLOBAL CONFIGURATION
# -------------------------------------------------------------------
fake = Faker()
# --- Static categorical constants ---
PRODUCT_CATEGORIES = ["t-shirts", "hoodies", "shoes", "accessories", "jackets"]
COLORS = ["Red", "Blue", "Black", "White", "Green"]
SIZES_APPAREL = ["S", "M", "L", "XL"]
SIZES_SHOES = [36, 38, 40, 42, 44]
ORDER_STATUSES = ["pending", "paid", "cancelled", "shipped"]
ORDER_STATUS_WEIGHTS = [0.2, 0.5, 0.1, 0.2]
# --- SQL Statements ---
SQL_INSERT_CUSTOMERS = """
INSERT INTO customers(full_name, email, country, age, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (email) DO
UPDATE
SET full_name=EXCLUDED.full_name, country=EXCLUDED.country,
age=EXCLUDED.age, updated_at=EXCLUDED.updated_at; \
"""
SQL_INSERT_PRODUCTS = """
INSERT INTO products(name, category, sku, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s) ON CONFLICT (sku) DO
UPDATE
SET name=EXCLUDED.name, category=EXCLUDED.category,
updated_at=EXCLUDED.updated_at; \
"""
SQL_INSERT_VARIANTS = """
INSERT INTO product_variants
(product_id, variant_sku, color, size, manufacturing_price, selling_price,
stock_quantity, is_active, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (variant_sku) DO
UPDATE
SET color=EXCLUDED.color, size =EXCLUDED.size,
manufacturing_price=EXCLUDED.manufacturing_price,
selling_price=EXCLUDED.selling_price,
stock_quantity=EXCLUDED.stock_quantity,
updated_at=EXCLUDED.updated_at; \
"""
SQL_DELETE_ORDERS_IN_WINDOW = "DELETE FROM orders WHERE order_date BETWEEN %s AND %s;"
SQL_INSERT_ORDERS = """
INSERT INTO orders(customer_id, order_date, status, total_amount, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s); \
"""
SQL_INSERT_ORDER_ITEMS = """
INSERT INTO order_items(order_id, variant_id, quantity, unit_price, created_at)
VALUES (%s, %s, %s, %s, %s); \
"""
SQL_UPDATE_ORDER_TOTALS = """
UPDATE orders o
SET total_amount = COALESCE(oi.sum_total, 0) FROM (
SELECT order_id, SUM(total_price) sum_total
FROM order_items GROUP BY order_id
) oi
WHERE o.id = oi.order_id
AND o.order_date BETWEEN %s
AND %s; \
"""
SQL_STATS_COUNTS = {
"customers": "SELECT COUNT(*) FROM customers",
"products": "SELECT COUNT(*) FROM products",
"variants": "SELECT COUNT(*) FROM product_variants",
"orders": "SELECT COUNT(*) FROM orders WHERE order_date BETWEEN %s AND %s",
"items": """
SELECT COUNT(*)
FROM order_items oi
JOIN orders o ON o.id = oi.order_id
WHERE o.order_date BETWEEN %s AND %s;
""",
}
SQL_CHAOS_VALIDATION = """
SELECT p.category,
SUM(CASE
WHEN (p.category = 'shoes' AND v.size !~ '^[0-9]+$')
OR (p.category <> 'shoes' AND v.size NOT IN ('S', 'M', 'L', 'XL'))
THEN 1
ELSE 0 END) invalid
FROM product_variants v
JOIN products p ON p.id = v.product_id
GROUP BY 1
ORDER BY 1; \
"""
def parse_args():
now = datetime.now(timezone.utc)
day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = now.replace(hour=23, minute=59, second=59, microsecond=999999)
p = argparse.ArgumentParser(description="Generate demo ecommerce data (idempotent).")
p.add_argument("--dsn", required=True, help="PostgreSQL DSN")
p.add_argument("--customers", type=int, default=100)
p.add_argument("--products", type=int, default=50)
p.add_argument("--orders", type=int, default=500)
p.add_argument("--max-items-per-order", type=int, default=5)
p.add_argument("--chaos-percent", type=float, default=0.0, help="%% of variants with invalid sizes")
p.add_argument("--scale", type=int, default=1, help="Multiply base volumes")
p.add_argument("--seed", type=int, default=42, help="Deterministic RNG seed")
p.add_argument("--starting-at", type=str, help="ISO start (e.g. 2024-01-01)")
p.add_argument("--ending-at", type=str, help="ISO end (e.g. 2024-06-30)")
a = p.parse_args()
a.starting_at = a.starting_at or day_start.isoformat()
a.ending_at = a.ending_at or day_end.isoformat()
return a
def iso(dt):
return datetime.fromisoformat(dt).astimezone(timezone.utc)
def rdate(s, e):
"""Return random datetime between s and e."""
return s + timedelta(seconds=random.randint(0, int((e - s).total_seconds())))
def execmany(conn, sql, rows):
with conn.cursor() as cur:
cur.executemany(sql, rows)
conn.commit()
def gen_customers(n):
for _ in range(n):
yield (
fake.name(),
fake.unique.email(),
fake.country(),
random.randint(18, 70),
datetime.now(timezone.utc),
datetime.now(timezone.utc),
)
def gen_products(n):
for _ in range(n):
cat = random.choice(PRODUCT_CATEGORIES)
yield (
f"{fake.word().capitalize()} {cat}",
cat,
fake.unique.bothify("SKU-####-??").upper(),
datetime.now(timezone.utc),
datetime.now(timezone.utc),
)
def gen_variants(prod_rows, chaos):
for pid, cat in prod_rows:
for _ in range(random.randint(1, 4)):
sku = fake.unique.bothify("VAR-####-??").upper()
color = random.choice(COLORS)
size = str(random.choice(SIZES_SHOES)) if cat == "shoes" else random.choice(SIZES_APPAREL)
# Chaos mode
if chaos > 0 and random.random() < (chaos / 100):
if cat == "shoes":
size = random.choice(SIZES_APPAREL)
else:
size = str(random.choice(SIZES_SHOES))
manuf = round(random.uniform(10, 80), 2)
sell = round(manuf * random.uniform(1.2, 2.0), 2)
yield (
pid, sku, color, size, manuf, sell,
random.randint(0, 200), True,
datetime.now(timezone.utc), datetime.now(timezone.utc),
)
def gen_orders(cust_ids, n, s, e):
for _ in range(n):
yield (random.choice(cust_ids), rdate(s, e), random.choices(ORDER_STATUSES, ORDER_STATUS_WEIGHTS)[0], 0.0,
datetime.now(timezone.utc), datetime.now(timezone.utc),)
def gen_items(order_ids, variant_ids, max_items):
for oid in order_ids:
for vid in random.sample(variant_ids, random.randint(1, max(1, max_items))):
yield (oid, vid, random.randint(1, 3), round(random.uniform(20, 200), 2), datetime.now(timezone.utc))
def print_box_summary(S, E, c_cnt, p_cnt, v_cnt, o_cnt, i_cnt, max_items, bad_by_cat):
print("\n✅ Data generation complete (idempotent window load)")
print(f"Window: {S.isoformat()} → {E.isoformat()}")
print(f"Counts — Customers: {c_cnt:,} | Products: {p_cnt:,} | Variants: {v_cnt:,}")
print(f"Window Facts — Orders: {o_cnt:,} | Items: {i_cnt:,} | MaxItems/Order: {max_items}")
print("Chaos check (invalid sizes per category):")
for cat, bad in bad_by_cat:
print(f" • {cat}: {bad:,} invalid")
def main():
args = parse_args()
random.seed(args.seed)
Faker.seed(args.seed)
S, E = iso(args.starting_at), iso(args.ending_at)
print("\n=== Generating E-commerce Demo Data ===")
print(f"Window: {S.isoformat()} → {E.isoformat()}")
print(f"Scale x{args.scale} | Seed {args.seed} | Chaos {args.chaos_percent:.1f}%\n")
base_c, base_p, base_o = args.customers, args.products, args.orders
n_c, n_p, n_o = base_c * args.scale, base_p * args.scale, base_o * args.scale
with psycopg.connect(args.dsn) as conn:
execmany(conn, SQL_INSERT_CUSTOMERS, gen_customers(n_c))
execmany(conn, SQL_INSERT_PRODUCTS, gen_products(n_p))
prod_rows = list(conn.execute("SELECT id, category FROM products"))
execmany(conn, SQL_INSERT_VARIANTS, gen_variants(prod_rows, args.chaos_percent))
with conn.cursor() as cur:
cur.execute(SQL_DELETE_ORDERS_IN_WINDOW, (S, E))
conn.commit()
cust_ids = [r[0] for r in conn.execute("SELECT id FROM customers")]
execmany(conn, SQL_INSERT_ORDERS, gen_orders(cust_ids, n_o, S, E))
order_ids = [r[0] for r in conn.execute("SELECT id FROM orders WHERE order_date BETWEEN %s AND %s", (S, E))]
variant_ids = [r[0] for r in conn.execute("SELECT id FROM product_variants")]
execmany(conn, SQL_INSERT_ORDER_ITEMS, gen_items(order_ids, variant_ids, args.max_items_per_order))
with conn.cursor() as cur:
cur.execute(SQL_UPDATE_ORDER_TOTALS, (S, E))
conn.commit()
with conn.cursor() as cur:
cur.execute(SQL_STATS_COUNTS["customers"]);
c_cnt = cur.fetchone()[0]
cur.execute(SQL_STATS_COUNTS["products"]);
p_cnt = cur.fetchone()[0]
cur.execute(SQL_STATS_COUNTS["variants"]);
v_cnt = cur.fetchone()[0]
cur.execute(SQL_STATS_COUNTS["orders"], (S, E));
o_cnt = cur.fetchone()[0]
cur.execute(SQL_STATS_COUNTS["items"], (S, E));
i_cnt = cur.fetchone()[0]
cur.execute(SQL_CHAOS_VALIDATION);
bad_by_cat = cur.fetchall()
print_box_summary(S, E, c_cnt, p_cnt, v_cnt, o_cnt, i_cnt, args.max_items_per_order, bad_by_cat)
if __name__ == "__main__":
main()
After that, let's run the script and see if everything works as expected:
uv run generate_data.py --dsn postgresql://postgres:postgres@localhost:5432/postgres
make generate-data
The output should look like this:
=== Generating E-commerce Demo Data ===
Window: 2025-10-19T00:00:00+00:00 → 2025-10-19T23:59:59.999999+00:00
Scale x1 | Seed 42 | Chaos 0.0%
✅ Data generation complete
Window: 2025-10-19T00:00:00+00:00 → 2025-10-19T23:59:59.999999+00:00
Counts — Customers: 100 | Products: 50 | Variants: 144
Window Facts — Orders: 500 | Items: 1,487 | MaxItems/Order: 5
Chaos check (invalid sizes per category):
• accessories: 0 invalid
• hoodies: 0 invalid
• jackets: 0 invalid
• shoes: 0 invalid
• t-shirts: 0 invalid
Cool! Now we're ready to start our Bruin Pipeline!
Click to expand the Script Argument List
| Argument | Type | Default | Description |
|---|---|---|---|
--dsn | str | (required) | PostgreSQL connection string (DSN). Example: postgresql://user:pass@localhost:5432/ecommerce |
--customers | int | 100 | Number of customers to generate. |
--products | int | 50 | Number of products to generate. |
--orders | int | 500 | Number of orders to generate. |
--max-items-per-order | int | 5 | Maximum number of items per order. Each order will have 1–N items. |
--chaos-percent | float | 0.0 | Percentage of product variants that should have invalid sizes (used for validation testing). |
--starting-at | str (ISO date) | Yesterday at 00:00 UTC | Beginning of the generated data window. Example: 2024-01-01. |
--ending-at | str (ISO date) | Today at 23:59 UTC | End of the generated data window. Example: 2024-06-30. |
Note: You can also run the script with the
--helpflag to see the available arguments.
5) Creating the Bruin Pipeline
We have everything structured to start ingesting data into our Bruin Pipeline. So, first, we must create the assets.
But before that, let's understand the structure of the Bruin Pipeline.
A pipeline can be created using the Bruin CLI tool:
# Example Command
bruin init default some-feature
and it will always generate the same structure, where:
.bruin.yml- The main configuration file for your Bruin environment.
- Defines global settings like default connections, variables, and behavior for all pipelines.
some-feature/pipeline.yml- Defines a specific pipeline for a domain or project (in this example, ecommerce).
- Describes the end-to-end data flow — which assets to run, their dependencies, and schedules.
- Pipelines are modular, so you can maintain separate ones for different business domains.
some-feature/assets/*- Contains all the assets — the building blocks of your data pipelines.
- Each asset handles a distinct task: ingesting raw data, transforming it, or generating analytical tables.
- Since every asset is a file, it’s version-controlled, testable, and reusable — just like code.
The important thing to remember is that Bruin will check all files nested inside the assets folder for filenames with suffix .asset.yml, which can be any type of asset .asset.*.
So, doesn't matter how you organize your pipeline assets, by nesting folders. If you follows the convention proposed, Bruin will understand automagically!
6. Raw Ingestion Assets
Using ingestr, we're able to retrieve data from a source database and ingest it into destination database.
At our case, the goal is to ingest all the data generated from PostgresSQL to DuckDB, which will act as our analytical database.
Let's take a look at our assets and configuration.
name: raw.customers
type: ingestr
description: Ingest OLTP customers from Postgres into DuckDB raw layer.
parameters:
source_connection: pg-default
source_table: "public.customers"
destination: duckdb
The pipeline could run everything concurrently, but we're going to run it in a sequential way by awaiting the dependencies to be processed/resolved.
flowchart TD
A[customers] -->|feeds| D[orders]
B[products] -->|defines| C[product_variants]
D -->|requires| E[order_items]
C -->|used by| E
In Airflow or Dagster, you write DAGs in Python — describing explicitly how tasks depend on one another. Bruin takes a declarative approach: dependencies are simply listed in YAML under the depends: key, and the system automatically resolves execution order and lineage.
Unlike Airflow (which just schedules tasks) or dbt (which only supports SQL), Bruin allows you to define transformations in both SQL and Python — directly tied to your ingestion assets.
| Pipeline Layer | Bruin Equivalent |
|---|---|
| Ingestion (Airbyte, Fivetran) | type: ingestr assets |
| Transformation (dbt) | type: transform assets (YML,SQL or Python) |
| Orchestration (Airflow, Dagster) | Implicit asset graph with depends: |
Now, let's see if our ingestion layer is working as expected. Using the Bruin CLI, you must run the following command:
bruin run orders-performance --tag raw
The output must be something like this:
Analyzed the pipeline 'orders_performance' with 15 assets.
Pipeline: orders_performance (.)
No issues found
✓ Successfully validated 15 assets across 1 pipeline, all good.
Interval: 2025-10-17T00:00:00Z - 2025-10-17T23:59:59Z
Starting the pipeline execution...
[20:24:23] Running: raw.products
[20:24:23] Running: raw.customers
[20:24:23] Running: raw.orders
[20:24:23] Running: raw.product_variants
...[logs]...
PASS raw.products
PASS raw.customers
PASS raw.orders
PASS raw.order_items
PASS raw.product_variants
bruin run completed successfully in 16.704s
✓ Assets executed 5 succeeded
Note that using the
--tagoption we'll run only the assets that have the specified tag, avoiding the execution of unnecessary assets.
7) Implementing Quality Checks (Staging layer)
The staging layer is where raw data gets cleaned, validated, and prepared for analytics. Unlike the raw layer (which mirrors source systems exactly), staging applies business rules, data quality checks, and standardized transformations that make downstream analysis reliable and consistent.
In Bruin, the staging layer combines SQL transformations with declarative quality checks defined in your columns inside an asset.
You can work with column check by adding inside your asset the columns the needed definitions for the output of the column that you will materialize:
/* @bruin
name: stg.customers
type: duckdb.sql
materialization:
type: table
depends:
- raw.customers
owner: daniel@gmail.com
columns:
- name: customer_id
type: integer
primary_key: true
checks:
- name: unique
- name: not_null
@bruin */
SELECT
id::INT AS customer_id,
COALESCE(TRIM(email), '') AS email,
COALESCE(TRIM(country), 'Unknown') AS country,
COALESCE(age, 0) AS age,
created_at,
updated_at
FROM raw.customers
WHERE email IS NOT NULL;
Available Check Types
| Check Type | Purpose | Example Configuration |
|---|---|---|
not_null | Ensures no NULL values | - name: not_null |
unique | Ensures no duplicate values | - name: unique |
range | Validates numeric ranges | - name: range min: 0 max: 100 |
accepted_values | Validates against allowed values | - name: accepted_values value: - pending - paid |
positive | Ensures positive numbers | - name: positive |
regex | Pattern matching validation | - name: regex pattern: '^[A-Z]{3}-\d{4}$' |
Bruin's approach to data quality is declarative and reusable. Instead of writing custom validation SQL for every table, you define entities and attributes in glossary.yml, then reference them in your staging assets using the extends: keyword.
Here's how the column check system works:
Entity Definition in glossary.yml:
entities:
Customer:
description: Customer is an individual/business registered on our platform.
domains:
- customer-management
attributes:
Email:
name: email
type: string
description: E-mail address used during registration.
checks:
- name: not_null
Age:
name: age
type: integer
description: Age of the customer.
checks:
- name: range
min: 0
Then, at your asset you could easily just extends it, avoiding duplicate validations across other assets:
/* @bruin
columns:
- extends: Customer.Email # Inherits not_null check
- extends: Customer.Age # Inherits range check (min: 0)
@bruin */
For complex validations that go beyond column-level checks, Bruin supports custom checks with SQL queries:
custom_checks:
- name: validate product variant sizes
description: |
Ensures that if the product category is 'shoes', the size must be numeric.
Otherwise, the size must be one of ['S', 'M', 'L'].
value: 0 # Expected result (0 = no violations)
query: |
SELECT COUNT(*) AS invalid_count
FROM raw.product_variants v
JOIN stg.products p ON p.product_id = v.product_id
WHERE
(p.category = 'shoes' AND NOT (v.size ~ '^[0-9]+$'))
OR (p.category != 'shoes' AND v.size NOT IN ('S', 'M', 'L'))
Now that we understood the basics of Bruin's data quality system, let's run the staging pipeline:
# Run all staging assets
bruin run orders-performance --tag staging
The output must be like:
Analyzed the pipeline 'orders_performance' with 15 assets.
Pipeline: orders_performance (.)
No issues found
✓ Successfully validated 15 assets across 1 pipeline, all good.
Interval: 2025-10-17T00:00:00Z - 2025-10-17T23:59:59Z
Starting the pipeline execution...
[20:59:31] Running: stg.products
[20:59:31] Running: stg.order_items
[20:59:31] Running: stg.orders
[20:59:31] Running: stg.customers
[20:59:31] Finished: stg.products (28ms)
[20:59:31] Running: stg.product_variants
[20:59:31] Finished: stg.orders (57ms)
[20:59:31] Finished: stg.order_items (78ms)
[20:59:31] Finished: stg.customers (101ms)
[20:59:31] Finished: stg.product_variants (89ms)
[20:59:31] Running: stg.product_variants:custom-check:validate_product_variant_sizes
[20:59:31] Finished: stg.product_variants:custom-check:validate_product_variant_sizes (6ms)
==================================================
PASS stg.products
PASS stg.orders
PASS stg.order_items
PASS stg.customers
PASS stg.product_variants .
bruin run completed successfully in 125ms
✓ Assets executed 5 succeeded
✓ Quality checks 1 succeeded
Each asset runs its quality checks alongside transformations, keeping data accurate at every step. You can reuse validation rules across assets, add business context through glossaries, and rely on incremental checks that catch issues early.
8) Designing Your Analytics Layer (Mart)
The analytics (mart) layer turns curated staging tables into business-ready, well-structured datasets. Here we define models with a clear grain, apply final business rules, and expose KPIs for decision-making. These models typically power dashboards and self-service exploration.
Just like in the Raw and Staging sections above, below is a code tree that shows how the mart is structured in our orders-performance pipeline. Each model consumes stg.* tables and produces clean, documented outputs.
/* @bruin
name: mart.sales_daily
type: duckdb.sql
materialization:
type: table
strategy: append
depends:
- stg.order_items
- stg.orders
columns:
- name: sale_date
type: date
checks:
- name: not_null
- name: orders_count
type: integer
checks:
- name: not_null
- name: items_count
type: integer
checks:
- name: not_null
- name: revenue
type: numeric
checks:
- name: not_null
custom_checks:
- name: total revenue non-negative
value: 0
query: SELECT COUNT(*) FROM mart.sales_daily WHERE revenue < 0
@bruin */
WITH daily_orders AS (SELECT CAST(CAST(order_date AS TIMESTAMP) AS DATE) AS sale_date,
COUNT(*) AS orders_count,
SUM(CASE WHEN status IN ('paid', 'shipped') THEN total_amount ELSE 0 END) AS revenue
FROM stg.orders
GROUP BY 1),
daily_items AS (SELECT CAST(CAST(o.order_date AS TIMESTAMP) AS DATE) AS sale_date,
COUNT(*) AS items_count
FROM stg.order_items oi
JOIN stg.orders o ON o.order_id = oi.order_id
GROUP BY 1)
SELECT d.sale_date,
d.orders_count,
COALESCE(i.items_count, 0) AS items_count,
d.revenue
FROM daily_orders d
ORDER BY sale_date;
::
::