Remote OpenClaw
Menu
SkillsMCPPluginsMarketplaceGuideAgentsAdvertise
Remote OpenClaw
SkillsMCPPluginsMarketplaceGuideAgentsAdvertise
Skills/aradotso/data-skills/retail-etl-pipeline-medallion

retail-etl-pipeline-medallion

aradotso/data-skills
590 installs1 stars

Installation

npx skills add https://github.com/aradotso/data-skills --skill retail-etl-pipeline-medallion

Summary

End-to-end retail ETL pipeline using PySpark, SQL Server, and Medallion Architecture (Bronze/Silver/Gold layers) for data warehousing

SKILL.md

Retail ETL Pipeline - Medallion Architecture Skill

Skill by ara.so — Data Skills collection

Overview

The Retail ETL Pipeline project implements a complete data engineering solution for retail operations using the Medallion Architecture pattern (Bronze → Silver → Gold layers). It handles complex retail scenarios including:

  • Inventory shrinkage resolution
  • Recipe conversions for meat/poultry products
  • Supplier rebate tier tracking
  • Multi-branch sales consolidation
  • Stock level management across locations

The pipeline processes raw CSV data from CRM/ERP systems through three progressive quality layers, ultimately delivering a "Single Version of Truth" for business intelligence.

Architecture Layers

Bronze Layer (Raw Ingestion)

  • Raw data ingestion from CSV files
  • Minimal transformation, preserving source format
  • Audit columns: _loaded_at, _source_file

Silver Layer (Cleaned & Standardized)

  • Data type enforcement
  • Deduplication
  • Standardization (dates, currencies, product codes)
  • Business rule validation

Gold Layer (Business-Ready Analytics)

  • Aggregated metrics
  • Calculated KPIs (inventory turnover, shrinkage %)
  • Dimensional models for BI tools

Installation & Setup

Prerequisites

# Required tools
- Docker & Docker Compose
- SQL Server 2019+
- Python 3.8+
- PySpark 3.x
- Apache Airflow (optional, for orchestration)

Infrastructure Setup

# Clone the repository
git clone https://github.com/EsraaSolimanMubarak/Retail-ETL-Pipeline.git
cd Retail-ETL-Pipeline

# Start SQL Server container
docker-compose up -d

# Wait for SQL Server to be ready
docker logs -f retail-sql-server

Database Initialization

# Connect to SQL Server and run schema setup
sqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD} \
  -i sql_scripts/00_create_database_and_schemas.sql

Key SQL Scripts Execution Order

The pipeline consists of 13+ SQL scripts that must be executed sequentially:

# 1. Create database and schemas
sql_scripts/00_create_database_and_schemas.sql

# 2. Bronze layer ingestion
sql_scripts/01_bronze_products.sql
sql_scripts/02_bronze_sales.sql
sql_scripts/03_bronze_stock.sql

# 3. Silver layer transformations
sql_scripts/04_silver_products.sql
sql_scripts/05_silver_sales.sql
sql_scripts/06_silver_stock.sql

# 4. Gold layer aggregations
sql_scripts/07_gold_sales_summary.sql
sql_scripts/08_gold_inventory_metrics.sql
sql_scripts/09_gold_product_performance.sql

# 5. Rebuild pipeline (if needed)
sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql

Core ETL Patterns

Pattern 1: Bronze Layer Ingestion (Raw CSV → SQL)

-- Example: Bronze Products Table
CREATE TABLE bronze.products (
    product_id INT,
    product_name NVARCHAR(255),
    category NVARCHAR(100),
    unit_price DECIMAL(10,2),
    supplier_id INT,
    _loaded_at DATETIME2 DEFAULT GETDATE(),
    _source_file NVARCHAR(500)
);

-- Bulk insert from CSV
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    TABLOCK
);

Pattern 2: Silver Layer Cleansing

-- Example: Silver Products with Data Quality Rules
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
    TRUNCATE TABLE silver.products;
    
    INSERT INTO silver.products (
        product_id,
        product_name,
        category,
        unit_price,
        supplier_id,
        is_active,
        processed_at
    )
    SELECT DISTINCT
        product_id,
        UPPER(TRIM(product_name)) AS product_name,
        COALESCE(category, 'UNCATEGORIZED') AS category,
        CASE 
            WHEN unit_price < 0 THEN 0 
            ELSE unit_price 
        END AS unit_price,
        supplier_id,
        1 AS is_active,
        GETDATE() AS processed_at
    FROM bronze.products
    WHERE product_id IS NOT NULL
      AND product_name IS NOT NULL;
END;

Pattern 3: Gold Layer Aggregations

-- Example: Sales Summary by Branch and Product
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
    TRUNCATE TABLE gold.sales_summary;
    
    INSERT INTO gold.sales_summary (
        branch_name,
        product_category,
        total_quantity_sold,
        total_revenue,
        avg_unit_price,
        transaction_count,
        report_date
    )
    SELECT 
        s.branch_name,
        p.category AS product_category,
        SUM(s.quantity) AS total_quantity_sold,
        SUM(s.quantity * s.unit_price) AS total_revenue,
        AVG(s.unit_price) AS avg_unit_price,
        COUNT(DISTINCT s.transaction_id) AS transaction_count,
        CAST(GETDATE() AS DATE) AS report_date
    FROM silver.sales s
    INNER JOIN silver.products p ON s.product_id = p.product_id
    GROUP BY s.branch_name, p.category;
END;

Pattern 4: Inventory Shrinkage Calculation

-- Complex business logic: Detect inventory discrepancies
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
    WITH StockLevels AS (
        SELECT 
            product_id,
            branch_name,
            SUM(quantity_on_hand) AS current_stock
        FROM silver.stock
        GROUP BY product_id, branch_name
    ),
    ExpectedStock AS (
        SELECT 
            s.product_id,
            s.branch_name,
            sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
        FROM StockLevels sl
        LEFT JOIN silver.sales s 
            ON sl.product_id = s.product_id 
            AND sl.branch_name = s.branch_name
        GROUP BY s.product_id, s.branch_name, sl.current_stock
    )
    INSERT INTO gold.inventory_shrinkage (
        product_id,
        branch_name,
        current_stock,
        expected_stock,
        shrinkage_qty,
        shrinkage_percent,
        calculated_at
    )
    SELECT 
        sl.product_id,
        sl.branch_name,
        sl.current_stock,
        es.expected_stock,
        sl.current_stock - es.expected_stock AS shrinkage_qty,
        CASE 
            WHEN es.expected_stock > 0 
            THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
            ELSE 0 
        END AS shrinkage_percent,
        GETDATE() AS calculated_at
    FROM StockLevels sl
    INNER JOIN ExpectedStock es 
        ON sl.product_id = es.product_id 
        AND sl.branch_name = es.branch_name;
END;

PySpark Integration (Optional)

For large-scale data processing, integrate PySpark for Silver/Gold transformations:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RetailETL-Silver") \
    .config("spark.sql.warehouse.dir", "/data/warehouse") \
    .getOrCreate()

# Read Bronze layer (Parquet or Delta)
bronze_sales_df = spark.read.parquet("/data/bronze/sales/")

# Silver transformations
silver_sales_df = bronze_sales_df \
    .dropDuplicates(["transaction_id"]) \
    .filter(col("quantity") > 0) \
    .withColumn("unit_price", 
                when(col("unit_price") < 0, 0).otherwise(col("unit_price"))) \
    .withColumn("total_amount", col("quantity") * col("unit_price"))

# Write to Silver layer
silver_sales_df.write \
    .mode("overwrite") \
    .partitionBy("branch_name", "sale_date") \
    .parquet("/data/silver/sales/")

# Gold aggregations
gold_sales_summary = silver_sales_df \
    .groupBy("branch_name", "product_category") \
    .agg(
        sum("quantity").alias("total_quantity"),
        sum("total_amount").alias("total_revenue"),
        avg("unit_price").alias("avg_unit_price"),
        count("transaction_id").alias("transaction_count")
    )

gold_sales_summary.write \
    .mode("overwrite") \
    .parquet("/data/gold/sales_summary/")

Airflow DAG Example

Orchestrate the entire pipeline with Apache Airflow:

from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'retail_etl_medallion',
    default_args=default_args,
    description='Daily Retail ETL Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # Bronze ingestion
    ingest_products = MsSqlOperator(
        task_id='bronze_ingest_products',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/01_bronze_products.sql',
    )
    
    ingest_sales = MsSqlOperator(
        task_id='bronze_ingest_sales',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/02_bronze_sales.sql',
    )
    
    ingest_stock = MsSqlOperator(
        task_id='bronze_ingest_stock',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/03_bronze_stock.sql',
    )
    
    # Silver transformations
    transform_products = MsSqlOperator(
        task_id='silver_transform_products',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_products;',
    )
    
    transform_sales = MsSqlOperator(
        task_id='silver_transform_sales',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_sales;',
    )
    
    # Gold aggregations
    aggregate_sales_summary = MsSqlOperator(
        task_id='gold_sales_summary',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_sales_summary;',
    )
    
    aggregate_inventory_shrinkage = MsSqlOperator(
        task_id='gold_inventory_shrinkage',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_inventory_shrinkage;',
    )
    
    # Define dependencies
    [ingest_products, ingest_sales, ingest_stock] >> transform_products
    [ingest_sales] >> transform_sales
    [transform_products, transform_sales] >> aggregate_sales_summary
    [transform_products, ingest_stock] >> aggregate_inventory_shrinkage

Configuration

Environment Variables

# SQL Server connection
export SQL_SERVER_HOST=localhost
export SQL_SERVER_PORT=1433
export SQL_SERVER_USER=sa
export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD}
export SQL_SERVER_DATABASE=RetailDataWarehouse

# Data paths
export DATA_SOURCE_PATH=/data_source
export BRONZE_LAYER_PATH=/data/bronze
export SILVER_LAYER_PATH=/data/silver
export GOLD_LAYER_PATH=/data/gold

# Airflow (if used)
export AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"

Docker Compose Configuration

# docker-compose.yml
version: '3.8'

services:
  sqlserver:
    image: mcr.microsoft.com/mssql/server:2019-latest
    container_name: retail-sql-server
    environment:
      - ACCEPT_EULA=Y
      - SA_PASSWORD=${SQL_SERVER_PASSWORD}
      - MSSQL_PID=Developer
    ports:
      - "1433:1433"
    volumes:
      - ./data_source:/data_source
      - ./sql_scripts:/sql_scripts
      - sqlserver_data:/var/opt/mssql

volumes:
  sqlserver_data:

Common Patterns & Use Cases

Use Case 1: Multi-Branch Sales Consolidation

-- Combine sales from Alex, Cairo, and Giza branches
CREATE VIEW gold.vw_consolidated_sales AS
SELECT 
    'Alexandria' AS branch_name,
    * 
FROM bronze.alex_sales
UNION ALL
SELECT 
    'Cairo' AS branch_name,
    * 
FROM bronze.cairo_sales
UNION ALL
SELECT 
    'Giza' AS branch_name,
    * 
FROM bronze.giza_sales;

Use Case 2: Recipe Yield Tracking (Meat/Poultry)

-- Track conversion rates for processed meat products
CREATE TABLE silver.recipe_conversions (
    recipe_id INT PRIMARY KEY,
    raw_product_id INT,
    finished_product_id INT,
    conversion_ratio DECIMAL(5,2), -- e.g., 0.85 (15% waste)
    effective_date DATE
);

-- Calculate actual yield vs. expected
SELECT 
    rc.recipe_id,
    SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
    SUM(stock.quantity_on_hand) AS actual_yield,
    (SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;

Use Case 3: Supplier Rebate Tier Tracking

-- Track purchase volume for supplier rebate calculations
CREATE TABLE gold.supplier_rebate_tiers (
    supplier_id INT,
    total_purchases DECIMAL(15,2),
    rebate_tier VARCHAR(20),
    rebate_percentage DECIMAL(5,2),
    calculated_at DATETIME2
);

INSERT INTO gold.supplier_rebate_tiers
SELECT 
    p.supplier_id,
    SUM(s.quantity * s.unit_price) AS total_purchases,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
        ELSE 'Bronze'
    END AS rebate_tier,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
        ELSE 1.0
    END AS rebate_percentage,
    GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;

Troubleshooting

Issue 1: CSV Bulk Insert Fails

-- Check file path permissions
EXEC xp_cmdshell 'dir C:\data_source\*.csv';

-- Use ERRORFILE to capture bad rows
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    ERRORFILE = '/data_source/errors/products_errors.csv',
    MAXERRORS = 10
);

Issue 2: Duplicate Records in Silver Layer

-- Add deduplication logic with ROW_NUMBER
WITH DeduplicatedSales AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY transaction_id 
            ORDER BY _loaded_at DESC
        ) AS rn
    FROM bronze.sales
)
INSERT INTO silver.sales
SELECT * 
FROM DeduplicatedSales
WHERE rn = 1;

Issue 3: Performance Optimization

-- Create indexes on frequently joined columns
CREATE NONCLUSTERED INDEX idx_sales_product_id 
    ON silver.sales(product_id) INCLUDE (quantity, unit_price);

CREATE NONCLUSTERED INDEX idx_stock_product_branch 
    ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);

-- Partition large tables by date
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
    '2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);

Issue 4: Rebuild Entire Pipeline

# Use the rebuild script to reset and reprocess all layers
sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT} \
  -U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD} \
  -d RetailDataWarehouse \
  -i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql

Testing Data Quality

-- Data quality checks for Silver layer
SELECT 'Products' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT product_id) AS unique_products,
    SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
    SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products

UNION ALL

SELECT 'Sales' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT transaction_id) AS unique_transactions,
    SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
    SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;

Best Practices

  1. Always process through layers sequentially: Bronze → Silver → Gold
  2. Use stored procedures for reusable transformations
  3. Add audit columns (_loaded_at, _source_file, processed_at)
  4. Implement idempotency: Truncate-and-load or upsert patterns
  5. Partition large tables by date or branch for performance
  6. Create comprehensive indexes on join and filter columns
  7. Use CTEs for complex business logic readability
  8. Test data quality at each layer transition
  9. Version control all SQL scripts and configurations
  10. Monitor pipeline execution with Airflow or equivalent orchestrator

Featured

QwikClaw — one-click deploy OpenClaw logoQwikClaw — one-click deploy OpenClaw

Your own always-on OpenClaw agent, live in 60 seconds. No server, no setup — pick a model, connect Telegram, done.

Deploy your agent →
MoltAwards - Agent internet for government contracts + jobs. logoMoltAwards - Agent internet for government contracts + jobs.

MoltAwards is an agent-native social layer for matchawards.com.

Learn more →
CLN.Work — Stop prompting, start hiring AI employees logoCLN.Work — Stop prompting, start hiring AI employees

Turn your Claude agents into a real team — onboard them, assign tasks, and manage them like staff.

Hire AI employees →
Deploy your own AI agent logoDeploy your own AI agent

Launch OpenClaw or Hermes on Hostinger in about 60 seconds, keep your agent live 24/7, earn 20%-40% on your next referral up to $25-$45, and give your friend 20% off.

Launch on Hostinger →
AdvertiseGet your AI tool in front of 67,000+ AI enthusiastsSee placements & pricing →

Categories

External DownloadsCommand ExecutionData ExfiltrationPrompt Injection
View on GitHub

Recommended skills

Browse all →

find-skills

vercel-labs/skills

2.2M installsInstall

frontend-design

anthropics/skills

601K installsInstall

vercel-react-best-practices

vercel-labs/agent-skills

509K installsInstall

agent-browser

vercel-labs/agent-browser

492K installsInstall

web-design-guidelines

vercel-labs/agent-skills

423K installsInstall

microsoft-foundry

microsoft/azure-skills

420K installsInstall

Browse

Skills by category

Frontend250Git198Data154Testing120Design105Docs103Security96Automation87Backend76Devops37Productivity29Mcp23

Advertise on Remote OpenClaw

Get your AI tool in front of 67,000+ AI enthusiasts a month

See placements & pricing →

Remote OpenClaw

AI agent skills directory, marketplace, and workflow hub for OpenClaw, Hermes Agent, Claude Code, Codex, and MCP-powered operator stacks.

Explore

  • Home
  • Skills Directory
  • Claude Code Skills
  • Codex Skills
  • Marketplace
  • Hermes Ecosystem
  • Agents
  • Guide
  • Learn
  • Blog

More

  • Playbook
  • Free Tools
  • Shipping
  • Contact
  • Terms
  • Privacy
© 2026 Remote OpenClaw