Tutorials

E-commerce Showcase

Building an E‑commerce Analytics Pipeline with Bruin, PostgresSQL and DuckDB

:

.bruin.yml
default_environment: default
environments:
    default:
        connections:
            postgres:
                - name: pg-default
                  username: postgres
                  password: postgres
                  host: localhost
                  port: 5432
                  database: postgres
            duckdb:
                - name: duckdb-default
                  path: duckdb.db

What this hands-on tutorial covers

Ingestion Layer

Learn how to configure Bruin assets for incremental ingestion from Postgres into DuckDB.

Transformation Layer

Explore staging and mart patterns that turn raw e-commerce data into usable analytics.

Quality Checks

Implement column-level and business logic validations to ensure data integrity.

Analytics & Insights

Build daily revenue and customer lifetime value dashboards from validated datasets.

This tutorial is structured to give you a straightforward, engineering-first path to building a production-ready analytics pipeline. It’s about understanding how each component — ingestion, transformation, validation, and analysis — fits together under Bruin’s unified workflow.

1) Prerequisites & Setup

git clone https://github.com/danielhe4rt/bruin-ecommerce-pipeline
cd bruin-ecommerce-pipeline

Or use the bruin templates directly from the CLI:

bruin init ecommerce

cd bruin

Then, open the project in your favorite code editor and let's get started!

2) Understanding the Data Model (OLTP → Analytics)

In e-commerce — and most analytics-driven businesses — data pipelines are typically organized into layers, each serving a specific purpose. This layered structure keeps data reliable, interpretable, and adaptable as the business scales.

The ultimate goal is to make data analysis effortless and focused on answering real business questions across key domains, such as:

DomainAnalysis Focus
CustomersSegmentation by age, geography, and behavior patterns
Products and Product VariantsPerformance by category, profitability, and pricing insights
Orders and Order ItemsDaily revenue, order volume, and purchasing trends
KPI layerMetrics like AOV, units per order, top SKUs, and repeat purchase rate

By structuring data this way, teams can move from raw information to actionable insights with clarity and confidence.

At this tutorial, it's important to understand the data relationships that form the backbone of most e-commerce systems. These relationships define how information flows across entities — from customers placing orders to products being sold and analyzed.

Here’s the simplified schema we’ll use throughout this tutorial:

FromToRelationship Type
customers(id)orders(customer_id)1:N (One-to-Many)
orders(id)order_items(order_id)1:N (One-to-Many)
products(id)product_variants(product_id)1:N (One-to-Many)
product_variants(id)order_items(variant_id)1:N (One-to-Many)
customers.sql
CREATE TABLE IF NOT EXISTS customers (
    id          BIGSERIAL PRIMARY KEY,
    full_name   TEXT        NOT NULL,
    email       TEXT        NOT NULL UNIQUE,
    country     TEXT        NOT NULL,
    age         INTEGER     NOT NULL CHECK (age >= 0),
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

This model mirrors a real-world e-commerce workflow:

  • A customer makes one or more orders.
  • Each order contains one or more items.
  • Each item points to a specific product variant (like a red T-shirt, size M).
  • Each variant belongs to a broader product (e.g., “Classic T-Shirt”).

These foreign key relationships allow us to join datasets efficiently and enable downstream analytics like:

DomainMaterialized TableDepends On
Customersmart.customers_by_agestg.customers
Customersmart.customers_by_countrystg.customers
Productsmart.revenue_by_productraw.order_items, raw.product_variants, raw.products
Productsmart.revenue_by_categoryraw.order_items, raw.product_variants, raw.products
Productsmart.revenue_by_product_variantstg.product_variants, stg.order_items, stg.orders, stg.products
Salesmart.sales_dailystg.order_items, stg.orders

By establishing these links early, every layer of the pipeline — raw, staging, and mart — can remain consistent and traceable across transformations.

3) Setting Up the Source Database (Docker + Postgres)

First, let's spin up our PostgreSQL instance:

docker run -d \
  --name bruin-postgres \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=postgres \
  -p 5432:5432 \
  postgres:18-alpine

Now, we must ingest the DDL inside the PostgresSQL instance:

cat sql/ddl.sql | docker exec -i bruin-ecommerce psql -U postgres -d postgres
If you're using Makefile, check the commands available using make help

4) Populating Test Data

We’ll use Python with uv— a fast, lightweight Python environment manager — to handle dependencies and execution. This approach lets us install only what’s needed (no global dependencies, no virtualenv folders), and it’s fully idempotent.

Dependencies

Inside the project, you’ll find a script called generate_data.py. This script populates our five main tables (customers, products, product_variants, orders, and order_items) with realistic, time-distributed data that mimics an actual e-commerce platform.

To make this possible, we rely on two lightweight Python libraries:

+ faker==37     # For realistic names, emails, and categorical data
+ psycopg==3    # For direct PostgreSQL database access

These are automatically handled when running with uv.

generate_data.py
#!/usr/bin/env python3
"""
E-commerce demo data generator (Postgres).

Usage:
    uv run generate-data --dsn postgresql://user:pass@host:5432/db
"""

import argparse, random
from datetime import datetime, timedelta, timezone
from faker import Faker
import psycopg

# -------------------------------------------------------------------
# GLOBAL CONFIGURATION
# -------------------------------------------------------------------

fake = Faker()

# --- Static categorical constants ---
PRODUCT_CATEGORIES = ["t-shirts", "hoodies", "shoes", "accessories", "jackets"]
COLORS = ["Red", "Blue", "Black", "White", "Green"]
SIZES_APPAREL = ["S", "M", "L", "XL"]
SIZES_SHOES = [36, 38, 40, 42, 44]
ORDER_STATUSES = ["pending", "paid", "cancelled", "shipped"]
ORDER_STATUS_WEIGHTS = [0.2, 0.5, 0.1, 0.2]

# --- SQL Statements ---
SQL_INSERT_CUSTOMERS = """
                       INSERT INTO customers(full_name, email, country, age, created_at, updated_at)
                       VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (email) DO
                       UPDATE
                           SET full_name=EXCLUDED.full_name, country=EXCLUDED.country,
                           age=EXCLUDED.age, updated_at=EXCLUDED.updated_at; \
                       """

SQL_INSERT_PRODUCTS = """
                      INSERT INTO products(name, category, sku, created_at, updated_at)
                      VALUES (%s, %s, %s, %s, %s) ON CONFLICT (sku) DO
                      UPDATE
                          SET name=EXCLUDED.name, category=EXCLUDED.category,
                          updated_at=EXCLUDED.updated_at; \
                      """

SQL_INSERT_VARIANTS = """
                      INSERT INTO product_variants
                      (product_id, variant_sku, color, size, manufacturing_price, selling_price,
                       stock_quantity, is_active, created_at, updated_at)
                      VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (variant_sku) DO
                      UPDATE
                          SET color=EXCLUDED.color, size =EXCLUDED.size,
                          manufacturing_price=EXCLUDED.manufacturing_price,
                          selling_price=EXCLUDED.selling_price,
                          stock_quantity=EXCLUDED.stock_quantity,
                          updated_at=EXCLUDED.updated_at; \
                      """

SQL_DELETE_ORDERS_IN_WINDOW = "DELETE FROM orders WHERE order_date BETWEEN %s AND %s;"

SQL_INSERT_ORDERS = """
                    INSERT INTO orders(customer_id, order_date, status, total_amount, created_at, updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s); \
                    """

SQL_INSERT_ORDER_ITEMS = """
                         INSERT INTO order_items(order_id, variant_id, quantity, unit_price, created_at)
                         VALUES (%s, %s, %s, %s, %s); \
                         """

SQL_UPDATE_ORDER_TOTALS = """
                          UPDATE orders o
                          SET total_amount = COALESCE(oi.sum_total, 0) FROM (
  SELECT order_id, SUM(total_price) sum_total
  FROM order_items GROUP BY order_id
) oi
                          WHERE o.id = oi.order_id
                            AND o.order_date BETWEEN %s
                            AND %s; \
                          """

SQL_STATS_COUNTS = {
    "customers": "SELECT COUNT(*) FROM customers",
    "products": "SELECT COUNT(*) FROM products",
    "variants": "SELECT COUNT(*) FROM product_variants",
    "orders": "SELECT COUNT(*) FROM orders WHERE order_date BETWEEN %s AND %s",
    "items": """
             SELECT COUNT(*)
             FROM order_items oi
                      JOIN orders o ON o.id = oi.order_id
             WHERE o.order_date BETWEEN %s AND %s;
             """,
}

SQL_CHAOS_VALIDATION = """
                       SELECT p.category,
                              SUM(CASE
                                      WHEN (p.category = 'shoes' AND v.size !~ '^[0-9]+$')
                                          OR (p.category <> 'shoes' AND v.size NOT IN ('S', 'M', 'L', 'XL'))
                                          THEN 1
                                      ELSE 0 END) invalid
                       FROM product_variants v
                                JOIN products p ON p.id = v.product_id
                       GROUP BY 1
                       ORDER BY 1; \
                       """


def parse_args():
    now = datetime.now(timezone.utc)
    day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
    day_end = now.replace(hour=23, minute=59, second=59, microsecond=999999)

    p = argparse.ArgumentParser(description="Generate demo ecommerce data (idempotent).")
    p.add_argument("--dsn", required=True, help="PostgreSQL DSN")
    p.add_argument("--customers", type=int, default=100)
    p.add_argument("--products", type=int, default=50)
    p.add_argument("--orders", type=int, default=500)
    p.add_argument("--max-items-per-order", type=int, default=5)
    p.add_argument("--chaos-percent", type=float, default=0.0, help="%% of variants with invalid sizes")
    p.add_argument("--scale", type=int, default=1, help="Multiply base volumes")
    p.add_argument("--seed", type=int, default=42, help="Deterministic RNG seed")
    p.add_argument("--starting-at", type=str, help="ISO start (e.g. 2024-01-01)")
    p.add_argument("--ending-at", type=str, help="ISO end (e.g. 2024-06-30)")

    a = p.parse_args()
    a.starting_at = a.starting_at or day_start.isoformat()
    a.ending_at = a.ending_at or day_end.isoformat()
    return a


def iso(dt):
    return datetime.fromisoformat(dt).astimezone(timezone.utc)


def rdate(s, e):
    """Return random datetime between s and e."""
    return s + timedelta(seconds=random.randint(0, int((e - s).total_seconds())))


def execmany(conn, sql, rows):
    with conn.cursor() as cur:
        cur.executemany(sql, rows)
    conn.commit()

def gen_customers(n):
    for _ in range(n):
        yield (
            fake.name(),
            fake.unique.email(),
            fake.country(),
            random.randint(18, 70),
            datetime.now(timezone.utc),
            datetime.now(timezone.utc),
        )


def gen_products(n):
    for _ in range(n):
        cat = random.choice(PRODUCT_CATEGORIES)
        yield (
            f"{fake.word().capitalize()} {cat}",
            cat,
            fake.unique.bothify("SKU-####-??").upper(),
            datetime.now(timezone.utc),
            datetime.now(timezone.utc),
        )


def gen_variants(prod_rows, chaos):
    for pid, cat in prod_rows:
        for _ in range(random.randint(1, 4)):
            sku = fake.unique.bothify("VAR-####-??").upper()
            color = random.choice(COLORS)
            size = str(random.choice(SIZES_SHOES)) if cat == "shoes" else random.choice(SIZES_APPAREL)

            # Chaos mode
            if chaos > 0 and random.random() < (chaos / 100):
                if cat == "shoes":
                    size = random.choice(SIZES_APPAREL)
                else:
                    size = str(random.choice(SIZES_SHOES))

            manuf = round(random.uniform(10, 80), 2)
            sell = round(manuf * random.uniform(1.2, 2.0), 2)
            yield (
                pid, sku, color, size, manuf, sell,
                random.randint(0, 200), True,
                datetime.now(timezone.utc), datetime.now(timezone.utc),
            )


def gen_orders(cust_ids, n, s, e):
    for _ in range(n):
        yield (random.choice(cust_ids), rdate(s, e), random.choices(ORDER_STATUSES, ORDER_STATUS_WEIGHTS)[0], 0.0,
               datetime.now(timezone.utc), datetime.now(timezone.utc),)


def gen_items(order_ids, variant_ids, max_items):
    for oid in order_ids:
        for vid in random.sample(variant_ids, random.randint(1, max(1, max_items))):
            yield (oid, vid, random.randint(1, 3), round(random.uniform(20, 200), 2), datetime.now(timezone.utc))


def print_box_summary(S, E, c_cnt, p_cnt, v_cnt, o_cnt, i_cnt, max_items, bad_by_cat):
    print("\n✅ Data generation complete (idempotent window load)")
    print(f"Window: {S.isoformat()}{E.isoformat()}")
    print(f"Counts — Customers: {c_cnt:,} | Products: {p_cnt:,} | Variants: {v_cnt:,}")
    print(f"Window Facts — Orders: {o_cnt:,} | Items: {i_cnt:,} | MaxItems/Order: {max_items}")
    print("Chaos check (invalid sizes per category):")
    for cat, bad in bad_by_cat:
        print(f"  • {cat}: {bad:,} invalid")


def main():
    args = parse_args()
    random.seed(args.seed)
    Faker.seed(args.seed)
    S, E = iso(args.starting_at), iso(args.ending_at)

    print("\n=== Generating E-commerce Demo Data ===")
    print(f"Window: {S.isoformat()}{E.isoformat()}")
    print(f"Scale x{args.scale} | Seed {args.seed} | Chaos {args.chaos_percent:.1f}%\n")

    base_c, base_p, base_o = args.customers, args.products, args.orders
    n_c, n_p, n_o = base_c * args.scale, base_p * args.scale, base_o * args.scale

    with psycopg.connect(args.dsn) as conn:
        execmany(conn, SQL_INSERT_CUSTOMERS, gen_customers(n_c))
        execmany(conn, SQL_INSERT_PRODUCTS, gen_products(n_p))

        prod_rows = list(conn.execute("SELECT id, category FROM products"))
        execmany(conn, SQL_INSERT_VARIANTS, gen_variants(prod_rows, args.chaos_percent))

        with conn.cursor() as cur:
            cur.execute(SQL_DELETE_ORDERS_IN_WINDOW, (S, E))
        conn.commit()

        cust_ids = [r[0] for r in conn.execute("SELECT id FROM customers")]
        execmany(conn, SQL_INSERT_ORDERS, gen_orders(cust_ids, n_o, S, E))

        order_ids = [r[0] for r in conn.execute("SELECT id FROM orders WHERE order_date BETWEEN %s AND %s", (S, E))]
        variant_ids = [r[0] for r in conn.execute("SELECT id FROM product_variants")]
        execmany(conn, SQL_INSERT_ORDER_ITEMS, gen_items(order_ids, variant_ids, args.max_items_per_order))

        with conn.cursor() as cur:
            cur.execute(SQL_UPDATE_ORDER_TOTALS, (S, E))
        conn.commit()

        with conn.cursor() as cur:
            cur.execute(SQL_STATS_COUNTS["customers"]);
            c_cnt = cur.fetchone()[0]
            cur.execute(SQL_STATS_COUNTS["products"]);
            p_cnt = cur.fetchone()[0]
            cur.execute(SQL_STATS_COUNTS["variants"]);
            v_cnt = cur.fetchone()[0]
            cur.execute(SQL_STATS_COUNTS["orders"], (S, E));
            o_cnt = cur.fetchone()[0]
            cur.execute(SQL_STATS_COUNTS["items"], (S, E));
            i_cnt = cur.fetchone()[0]
            cur.execute(SQL_CHAOS_VALIDATION);
            bad_by_cat = cur.fetchall()

    print_box_summary(S, E, c_cnt, p_cnt, v_cnt, o_cnt, i_cnt, args.max_items_per_order, bad_by_cat)


if __name__ == "__main__":
    main()

After that, let's run the script and see if everything works as expected:

uv run generate_data.py --dsn postgresql://postgres:postgres@localhost:5432/postgres

The output should look like this:

Bash Output
=== Generating E-commerce Demo Data ===
Window: 2025-10-19T00:00:00+00:00 2025-10-19T23:59:59.999999+00:00
Scale x1 | Seed 42 | Chaos 0.0%


 Data generation complete
Window: 2025-10-19T00:00:00+00:00 2025-10-19T23:59:59.999999+00:00
Counts Customers: 100 | Products: 50 | Variants: 144
Window Facts Orders: 500 | Items: 1,487 | MaxItems/Order: 5
Chaos check (invalid sizes per category):
 accessories: 0 invalid
 hoodies: 0 invalid
 jackets: 0 invalid
 shoes: 0 invalid
 t-shirts: 0 invalid

Cool! Now we're ready to start our Bruin Pipeline!

Click to expand the Script Argument List
ArgumentTypeDefaultDescription
--dsnstr(required)PostgreSQL connection string (DSN). Example: postgresql://user:pass@localhost:5432/ecommerce
--customersint100Number of customers to generate.
--productsint50Number of products to generate.
--ordersint500Number of orders to generate.
--max-items-per-orderint5Maximum number of items per order. Each order will have 1–N items.
--chaos-percentfloat0.0Percentage of product variants that should have invalid sizes (used for validation testing).
--starting-atstr (ISO date)Yesterday at 00:00 UTCBeginning of the generated data window. Example: 2024-01-01.
--ending-atstr (ISO date)Today at 23:59 UTCEnd of the generated data window. Example: 2024-06-30.

Note: You can also run the script with the --help flag to see the available arguments.

5) Creating the Bruin Pipeline

We have everything structured to start ingesting data into our Bruin Pipeline. So, first, we must create the assets.

But before that, let's understand the structure of the Bruin Pipeline.

A pipeline can be created using the Bruin CLI tool:

Bruin CLI
# Example Command
bruin init default some-feature

and it will always generate the same structure, where:

  • .bruin.yml
    • The main configuration file for your Bruin environment.
    • Defines global settings like default connections, variables, and behavior for all pipelines.
  • some-feature/pipeline.yml
    • Defines a specific pipeline for a domain or project (in this example, ecommerce).
    • Describes the end-to-end data flow — which assets to run, their dependencies, and schedules.
    • Pipelines are modular, so you can maintain separate ones for different business domains.
  • some-feature/assets/*
    • Contains all the assets — the building blocks of your data pipelines.
    • Each asset handles a distinct task: ingesting raw data, transforming it, or generating analytical tables.
    • Since every asset is a file, it’s version-controlled, testable, and reusable — just like code.

The important thing to remember is that Bruin will check all files nested inside the assets folder for filenames with suffix .asset.yml, which can be any type of asset .asset.*.

So, doesn't matter how you organize your pipeline assets, by nesting folders. If you follows the convention proposed, Bruin will understand automagically!

6. Raw Ingestion Assets

Using ingestr, we're able to retrieve data from a source database and ingest it into destination database.

At our case, the goal is to ingest all the data generated from PostgresSQL to DuckDB, which will act as our analytical database.

Let's take a look at our assets and configuration.

orders-performance/assets/ingestion/raw.customers.asset.yml
name: raw.customers
type: ingestr
description: Ingest OLTP customers from Postgres into DuckDB raw layer.

parameters:
  source_connection: pg-default
  source_table: "public.customers"
  destination: duckdb

The pipeline could run everything concurrently, but we're going to run it in a sequential way by awaiting the dependencies to be processed/resolved.

flowchart TD
    A[customers] -->|feeds| D[orders]
    B[products] -->|defines| C[product_variants]
    D -->|requires| E[order_items]
    C -->|used by| E

In Airflow or Dagster, you write DAGs in Python — describing explicitly how tasks depend on one another. Bruin takes a declarative approach: dependencies are simply listed in YAML under the depends: key, and the system automatically resolves execution order and lineage.

Unlike Airflow (which just schedules tasks) or dbt (which only supports SQL), Bruin allows you to define transformations in both SQL and Python — directly tied to your ingestion assets.

Pipeline LayerBruin Equivalent
Ingestion (Airbyte, Fivetran)type: ingestr assets
Transformation (dbt)type: transform assets (YML,SQL or Python)
Orchestration (Airflow, Dagster)Implicit asset graph with depends:

Now, let's see if our ingestion layer is working as expected. Using the Bruin CLI, you must run the following command:

Bruin CLI
bruin run orders-performance --tag raw

The output must be something like this:

Analyzed the pipeline 'orders_performance' with 15 assets.

Pipeline: orders_performance (.)
  No issues found

 Successfully validated 15 assets across 1 pipeline, all good.

Interval: 2025-10-17T00:00:00Z - 2025-10-17T23:59:59Z
Starting the pipeline execution...

[20:24:23] Running:  raw.products
[20:24:23] Running:  raw.customers
[20:24:23] Running:  raw.orders
[20:24:23] Running:  raw.product_variants

...[logs]...

PASS raw.products 
PASS raw.customers 
PASS raw.orders 
PASS raw.order_items 
PASS raw.product_variants 

bruin run completed successfully in 16.704s

 Assets executed      5 succeeded

Note that using the --tag option we'll run only the assets that have the specified tag, avoiding the execution of unnecessary assets.

7) Implementing Quality Checks (Staging layer)

The staging layer is where raw data gets cleaned, validated, and prepared for analytics. Unlike the raw layer (which mirrors source systems exactly), staging applies business rules, data quality checks, and standardized transformations that make downstream analysis reliable and consistent.

In Bruin, the staging layer combines SQL transformations with declarative quality checks defined in your columns inside an asset.

You can work with column check by adding inside your asset the columns the needed definitions for the output of the column that you will materialize:

orders-performance/assets/staging/stg.customers.asset.sql
/* @bruin
name: stg.customers
type: duckdb.sql

materialization:
  type: table

depends:
  - raw.customers
owner: daniel@gmail.com

columns: 
  - name: customer_id 
    type: integer
    primary_key: true
    checks:
      - name: unique
      - name: not_null
@bruin */

SELECT 
  id::INT AS customer_id, 
  COALESCE(TRIM(email), '') AS email,
  COALESCE(TRIM(country), 'Unknown') AS country,
  COALESCE(age, 0) AS age,
  created_at,
  updated_at
FROM raw.customers
WHERE email IS NOT NULL;

Available Check Types

Check TypePurposeExample Configuration
not_nullEnsures no NULL values- name: not_null
uniqueEnsures no duplicate values- name: unique
rangeValidates numeric ranges- name: range
min: 0
max: 100
accepted_valuesValidates against allowed values- name: accepted_values
value:
- pending
- paid
positiveEnsures positive numbers- name: positive
regexPattern matching validation- name: regex
pattern: '^[A-Z]{3}-\d{4}$'

Bruin's approach to data quality is declarative and reusable. Instead of writing custom validation SQL for every table, you define entities and attributes in glossary.yml, then reference them in your staging assets using the extends: keyword.

Here's how the column check system works:

Entity Definition in glossary.yml:

entities:
  Customer:
    description: Customer is an individual/business registered on our platform.
    domains:
      - customer-management
    attributes:
      Email:
        name: email
        type: string
        description: E-mail address used during registration.
        checks:
          - name: not_null
      Age:
        name: age
        type: integer
        description: Age of the customer.
        checks:
          - name: range
            min: 0

Then, at your asset you could easily just extends it, avoiding duplicate validations across other assets:

/* @bruin
columns:
  - extends: Customer.Email    # Inherits not_null check
  - extends: Customer.Age      # Inherits range check (min: 0)
@bruin */

For complex validations that go beyond column-level checks, Bruin supports custom checks with SQL queries:

custom_checks:
  - name: validate product variant sizes
    description: |
      Ensures that if the product category is 'shoes', the size must be numeric.
      Otherwise, the size must be one of ['S', 'M', 'L'].
    value: 0  # Expected result (0 = no violations)
    query: |
      SELECT COUNT(*) AS invalid_count
      FROM raw.product_variants v
      JOIN stg.products p ON p.product_id = v.product_id
      WHERE
        (p.category = 'shoes' AND NOT (v.size ~ '^[0-9]+$'))
        OR (p.category != 'shoes' AND v.size NOT IN ('S', 'M', 'L'))

Now that we understood the basics of Bruin's data quality system, let's run the staging pipeline:

# Run all staging assets
bruin run orders-performance --tag staging

The output must be like:


Analyzed the pipeline 'orders_performance' with 15 assets.

Pipeline: orders_performance (.)
  No issues found

 Successfully validated 15 assets across 1 pipeline, all good.

Interval: 2025-10-17T00:00:00Z - 2025-10-17T23:59:59Z

Starting the pipeline execution...

[20:59:31] Running:  stg.products
[20:59:31] Running:  stg.order_items
[20:59:31] Running:  stg.orders
[20:59:31] Running:  stg.customers
[20:59:31] Finished: stg.products (28ms)
[20:59:31] Running:  stg.product_variants
[20:59:31] Finished: stg.orders (57ms)
[20:59:31] Finished: stg.order_items (78ms)
[20:59:31] Finished: stg.customers (101ms)
[20:59:31] Finished: stg.product_variants (89ms)
[20:59:31] Running:  stg.product_variants:custom-check:validate_product_variant_sizes
[20:59:31] Finished: stg.product_variants:custom-check:validate_product_variant_sizes (6ms)

==================================================

PASS stg.products 
PASS stg.orders 
PASS stg.order_items 
PASS stg.customers 
PASS stg.product_variants .


bruin run completed successfully in 125ms

 Assets executed      5 succeeded
 Quality checks       1 succeeded

Each asset runs its quality checks alongside transformations, keeping data accurate at every step. You can reuse validation rules across assets, add business context through glossaries, and rely on incremental checks that catch issues early.

8) Designing Your Analytics Layer (Mart)

The analytics (mart) layer turns curated staging tables into business-ready, well-structured datasets. Here we define models with a clear grain, apply final business rules, and expose KPIs for decision-making. These models typically power dashboards and self-service exploration.

Just like in the Raw and Staging sections above, below is a code tree that shows how the mart is structured in our orders-performance pipeline. Each model consumes stg.* tables and produces clean, documented outputs.

mart.sales_daily.asset.sql
/* @bruin

name: mart.sales_daily
type: duckdb.sql

materialization:
  type: table
  strategy: append

depends:
  - stg.order_items
  - stg.orders

columns:
  - name: sale_date
    type: date
    checks:
      - name: not_null
  - name: orders_count
    type: integer
    checks:
      - name: not_null
  - name: items_count
    type: integer
    checks:
      - name: not_null
  - name: revenue
    type: numeric
    checks:
      - name: not_null

custom_checks:
  - name: total revenue non-negative
    value: 0
    query: SELECT COUNT(*) FROM mart.sales_daily WHERE revenue < 0

@bruin */

WITH daily_orders AS (SELECT CAST(CAST(order_date AS TIMESTAMP) AS DATE)                               AS sale_date,
                             COUNT(*)                                                                  AS orders_count,
                             SUM(CASE WHEN status IN ('paid', 'shipped') THEN total_amount ELSE 0 END) AS revenue
                      FROM stg.orders
                      GROUP BY 1),
     daily_items AS (SELECT CAST(CAST(o.order_date AS TIMESTAMP) AS DATE) AS sale_date,
                            COUNT(*)                                      AS items_count
                     FROM stg.order_items oi
                            JOIN stg.orders o ON o.order_id = oi.order_id
                     GROUP BY 1)
SELECT d.sale_date,
       d.orders_count,
       COALESCE(i.items_count, 0) AS items_count,
       d.revenue
FROM daily_orders d
ORDER BY sale_date;

::

::