jgarrison929

data-engineer

0
0
# Install this skill:
npx skills add jgarrison929/openclaw-skills --skill "data-engineer"

Install specific skill from multi-skill repository

# Description

Use when building data pipelines, ETL/ELT workflows, data warehouses, data modeling (star/snowflake schemas), Spark jobs, Airflow DAGs, Kafka streaming, data quality checks, or SQL analytics.

# SKILL.md


name: data-engineer
version: 1.0.0
description: Use when building data pipelines, ETL/ELT workflows, data warehouses, data modeling (star/snowflake schemas), Spark jobs, Airflow DAGs, Kafka streaming, data quality checks, or SQL analytics.
triggers:
- data pipeline
- ETL
- ELT
- data warehouse
- data lake
- star schema
- snowflake schema
- Airflow
- Spark
- Kafka
- data modeling
- dbt
- BigQuery
- Redshift
- data quality
- data lineage
- streaming
- batch processing
- SQL analytics
- data ingestion
role: specialist
scope: implementation
output-format: code


Data Engineer

Senior data engineer specializing in scalable data pipelines, warehouse modeling, streaming architectures, data quality, and SQL analytics.

Role Definition

You are a senior data engineer building production-grade data infrastructure. You design and implement ETL/ELT pipelines, data warehouses, streaming systems, and analytics platforms. You prioritize data quality, reliability, cost optimization, and governance.

Core Principles

  1. Idempotent operations β€” every pipeline run produces the same result given the same inputs
  2. Incremental over full refreshes β€” process only what changed
  3. Schema-on-write for warehouses, schema-on-read for lakes β€” choose based on use case
  4. Data quality is not optional β€” validate at every pipeline stage
  5. Document data lineage β€” know where every field comes from
  6. Cost-aware design β€” partition, compress, and prune aggressively

Data Warehouse Modeling

Star Schema Design

-- Fact table: records business events (measures + foreign keys)
CREATE TABLE fact_orders (
    order_id        BIGINT PRIMARY KEY,
    customer_key    BIGINT REFERENCES dim_customer(customer_key),
    product_key     BIGINT REFERENCES dim_product(product_key),
    date_key        INT REFERENCES dim_date(date_key),
    store_key       BIGINT REFERENCES dim_store(store_key),

    -- Measures (aggregatable)
    quantity        INT NOT NULL,
    unit_price      DECIMAL(10,2) NOT NULL,
    discount_amount DECIMAL(10,2) DEFAULT 0,
    total_amount    DECIMAL(12,2) NOT NULL,
    tax_amount      DECIMAL(10,2) NOT NULL,

    -- Degenerate dimensions
    order_number    VARCHAR(50) NOT NULL,

    -- ETL metadata
    loaded_at       TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system   VARCHAR(50) NOT NULL
)
PARTITION BY RANGE (date_key);

-- Create monthly partitions
CREATE TABLE fact_orders_2025_01 PARTITION OF fact_orders
    FOR VALUES FROM (20250101) TO (20250201);


-- Dimension table: descriptive attributes (slowly changing)
CREATE TABLE dim_customer (
    customer_key    BIGSERIAL PRIMARY KEY,       -- Surrogate key
    customer_id     VARCHAR(50) NOT NULL,         -- Natural/business key
    first_name      VARCHAR(100),
    last_name       VARCHAR(100),
    email           VARCHAR(255),
    segment         VARCHAR(50),
    city            VARCHAR(100),
    state           VARCHAR(50),
    country         VARCHAR(50),

    -- SCD Type 2 tracking
    effective_from  DATE NOT NULL,
    effective_to    DATE DEFAULT '9999-12-31',
    is_current      BOOLEAN DEFAULT TRUE,

    -- ETL metadata
    loaded_at       TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system   VARCHAR(50)
);

CREATE INDEX idx_customer_bk ON dim_customer(customer_id, is_current);


-- Date dimension (pre-populated calendar table)
CREATE TABLE dim_date (
    date_key        INT PRIMARY KEY,              -- YYYYMMDD format
    full_date       DATE NOT NULL,
    day_of_week     SMALLINT,
    day_name        VARCHAR(10),
    day_of_month    SMALLINT,
    day_of_year     SMALLINT,
    week_of_year    SMALLINT,
    month_number    SMALLINT,
    month_name      VARCHAR(10),
    quarter         SMALLINT,
    year            SMALLINT,
    is_weekend      BOOLEAN,
    is_holiday      BOOLEAN DEFAULT FALSE,
    fiscal_quarter  SMALLINT,
    fiscal_year     SMALLINT
);

SCD Type 2 Implementation

-- Merge pattern for slowly changing dimensions (PostgreSQL)
WITH source AS (
    SELECT * FROM staging.customers
),
changes AS (
    SELECT
        s.*,
        d.customer_key,
        d.first_name AS existing_name,
        d.email AS existing_email,
        d.segment AS existing_segment
    FROM source s
    LEFT JOIN dim_customer d
        ON s.customer_id = d.customer_id
        AND d.is_current = TRUE
)
-- Close existing records that have changed
UPDATE dim_customer d
SET effective_to = CURRENT_DATE - 1,
    is_current = FALSE
FROM changes c
WHERE d.customer_key = c.customer_key
    AND (d.first_name != c.first_name
         OR d.email != c.email
         OR d.segment != c.segment);

-- Insert new/changed records
INSERT INTO dim_customer (customer_id, first_name, last_name, email, segment,
                          city, state, country, effective_from, is_current, source_system)
SELECT
    customer_id, first_name, last_name, email, segment,
    city, state, country, CURRENT_DATE, TRUE, 'crm'
FROM changes
WHERE customer_key IS NULL  -- New records
   OR (existing_name != first_name
       OR existing_email != email
       OR existing_segment != segment);  -- Changed records

Airflow DAG Patterns

# dags/daily_etl_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from datetime import timedelta
import logging

logger = logging.getLogger(__name__)

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "email_on_failure": True,
    "email": ["[email protected]"],
    "execution_timeout": timedelta(hours=2),
}

dag = DAG(
    "daily_orders_etl",
    default_args=default_args,
    description="Daily orders ETL: extract β†’ validate β†’ transform β†’ load β†’ quality check",
    schedule_interval="0 6 * * *",  # 6 AM daily
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=["etl", "orders", "daily"],
)


def extract_from_source(**context):
    """Extract incremental data from source system."""
    from src.extractors.api_extractor import APIExtractor

    execution_date = context["ds"]
    extractor = APIExtractor(base_url="https://api.source-system.com")

    data = extractor.extract_orders(
        start_date=execution_date,
        end_date=execution_date,
    )
    # Write to staging area
    output_path = f"s3://data-lake/staging/orders/{execution_date}/orders.parquet"
    data.to_parquet(output_path, index=False)

    context["ti"].xcom_push(key="row_count", value=len(data))
    context["ti"].xcom_push(key="output_path", value=output_path)
    logger.info(f"Extracted {len(data)} orders for {execution_date}")


def validate_extracted_data(**context):
    """Run data quality checks on extracted data."""
    import great_expectations as gx

    ti = context["ti"]
    output_path = ti.xcom_pull(task_ids="extract", key="output_path")

    ge_context = gx.get_context()
    result = ge_context.run_checkpoint(
        checkpoint_name="staging_orders_check",
        batch_request={"path": output_path},
    )

    if not result.success:
        failed = [r for r in result.run_results.values() if not r.success]
        raise ValueError(f"Data quality check failed: {len(failed)} expectations failed")

    logger.info("Data quality validation passed")


def check_data_volume(**context):
    """Branch based on data volume β€” skip transform if no data."""
    ti = context["ti"]
    row_count = ti.xcom_pull(task_ids="extract", key="row_count")

    if row_count == 0:
        logger.info("No new data β€” skipping transform and load")
        return "skip_notification"
    return "transform"


def transform_orders(**context):
    """Apply business transformations."""
    import pandas as pd

    ti = context["ti"]
    output_path = ti.xcom_pull(task_ids="extract", key="output_path")
    df = pd.read_parquet(output_path)

    # Business transformations
    df["total_amount"] = df["quantity"] * df["unit_price"] - df["discount_amount"]
    df["tax_amount"] = df["total_amount"] * 0.08
    df["date_key"] = pd.to_datetime(df["order_date"]).dt.strftime("%Y%m%d").astype(int)

    # Lookup dimension keys
    df = enrich_with_dimension_keys(df)

    transformed_path = output_path.replace("staging", "transformed")
    df.to_parquet(transformed_path, index=False)
    ti.xcom_push(key="transformed_path", value=transformed_path)


# DAG structure
wait_for_source = S3KeySensor(
    task_id="wait_for_source",
    bucket_key="s3://source-bucket/exports/{{ ds }}/orders_complete.flag",
    timeout=3600,
    poke_interval=300,
    dag=dag,
)

extract = PythonOperator(task_id="extract", python_callable=extract_from_source, dag=dag)
validate = PythonOperator(task_id="validate", python_callable=validate_extracted_data, dag=dag)
branch = BranchPythonOperator(task_id="check_volume", python_callable=check_data_volume, dag=dag)
transform = PythonOperator(task_id="transform", python_callable=transform_orders, dag=dag)

load = PostgresOperator(
    task_id="load",
    postgres_conn_id="warehouse",
    sql="sql/load_fact_orders.sql",
    params={"execution_date": "{{ ds }}"},
    dag=dag,
)

quality_check = PostgresOperator(
    task_id="quality_check",
    postgres_conn_id="warehouse",
    sql="""
        SELECT CASE
            WHEN COUNT(*) = 0 THEN RAISE_ERROR('No rows loaded for {{ ds }}')
            WHEN SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) > 0
                THEN RAISE_ERROR('Negative totals found')
        END
        FROM fact_orders WHERE date_key = {{ params.date_key }};
    """,
    dag=dag,
)

wait_for_source >> extract >> validate >> branch
branch >> transform >> load >> quality_check

dbt Models

-- models/marts/orders/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "month"
        },
        cluster_by=["customer_id", "product_id"]
    )
}}

WITH source_orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    {% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),

enriched AS (
    SELECT
        o.order_id,
        o.order_date,
        o.customer_id,
        c.customer_segment,
        c.customer_lifetime_value,
        o.product_id,
        p.product_category,
        p.product_subcategory,
        o.quantity,
        o.unit_price,
        o.discount_amount,
        o.quantity * o.unit_price - o.discount_amount AS gross_amount,
        (o.quantity * o.unit_price - o.discount_amount) * 0.08 AS tax_amount,
        o.quantity * o.unit_price - o.discount_amount +
            ((o.quantity * o.unit_price - o.discount_amount) * 0.08) AS total_amount,
        o.updated_at
    FROM source_orders o
    LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
    LEFT JOIN {{ ref('dim_products') }} p ON o.product_id = p.product_id
)

SELECT * FROM enriched


-- models/marts/orders/schema.yml
version: 2

models:
  - name: fct_orders
    description: "Order fact table with enriched dimensions"
    columns:
      - name: order_id
        description: "Unique order identifier"
        tests:
          - unique
          - not_null
      - name: total_amount
        description: "Total order amount including tax"
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 1000000
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

Spark Job (PySpark)

# jobs/aggregate_daily_metrics.py
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from typing import Optional
import logging

logger = logging.getLogger(__name__)


def create_spark_session(app_name: str) -> SparkSession:
    return (
        SparkSession.builder
        .appName(app_name)
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.shuffle.partitions", "200")
        .getOrCreate()
    )


def read_incremental(
    spark: SparkSession,
    path: str,
    start_date: str,
    end_date: str,
) -> DataFrame:
    """Read partitioned data for a date range."""
    return (
        spark.read.parquet(path)
        .filter(
            (F.col("event_date") >= start_date) &
            (F.col("event_date") <= end_date)
        )
    )


def compute_daily_metrics(events: DataFrame) -> DataFrame:
    """Aggregate events into daily metrics per customer."""
    daily = (
        events
        .groupBy("customer_id", "event_date")
        .agg(
            F.count("*").alias("event_count"),
            F.sum("revenue").alias("daily_revenue"),
            F.avg("session_duration").alias("avg_session_duration"),
            F.countDistinct("product_id").alias("unique_products_viewed"),
            F.max("event_timestamp").alias("last_activity"),
        )
    )

    # Add rolling 7-day metrics
    window_7d = (
        Window
        .partitionBy("customer_id")
        .orderBy("event_date")
        .rowsBetween(-6, 0)
    )

    daily = daily.withColumns({
        "rolling_7d_revenue": F.sum("daily_revenue").over(window_7d),
        "rolling_7d_events": F.sum("event_count").over(window_7d),
        "rolling_7d_avg_session": F.avg("avg_session_duration").over(window_7d),
    })

    return daily


def deduplicate_events(events: DataFrame) -> DataFrame:
    """Remove duplicates keeping the latest version."""
    window = Window.partitionBy("event_id").orderBy(F.col("event_timestamp").desc())
    return (
        events
        .withColumn("row_num", F.row_number().over(window))
        .filter(F.col("row_num") == 1)
        .drop("row_num")
    )


def write_with_quality_check(
    df: DataFrame,
    output_path: str,
    partition_cols: list,
    expected_min_rows: int = 100,
):
    """Write output with post-write validation."""
    row_count = df.count()
    null_key_count = df.filter(F.col("customer_id").isNull()).count()

    if row_count < expected_min_rows:
        raise ValueError(f"Output has only {row_count} rows (expected >= {expected_min_rows})")
    if null_key_count > 0:
        raise ValueError(f"Found {null_key_count} rows with null customer_id")

    (
        df.repartition(*[F.col(c) for c in partition_cols])
        .write
        .mode("overwrite")
        .partitionBy(partition_cols)
        .parquet(output_path)
    )
    logger.info(f"Wrote {row_count} rows to {output_path}")


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--start-date", required=True)
    parser.add_argument("--end-date", required=True)
    parser.add_argument("--input-path", default="s3://data-lake/events/")
    parser.add_argument("--output-path", default="s3://data-lake/metrics/daily/")
    args = parser.parse_args()

    spark = create_spark_session("daily_metrics_aggregation")

    events = read_incremental(spark, args.input_path, args.start_date, args.end_date)
    events = deduplicate_events(events)
    metrics = compute_daily_metrics(events)
    write_with_quality_check(metrics, args.output_path, ["event_date"])

    spark.stop()

Kafka Streaming Pipeline

# streaming/order_events_processor.py
from confluent_kafka import Consumer, Producer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
import json
import logging
from typing import Dict, Callable, Optional

logger = logging.getLogger(__name__)


class StreamProcessor:
    """Kafka stream processor with exactly-once semantics."""

    def __init__(
        self,
        bootstrap_servers: str,
        group_id: str,
        input_topic: str,
        output_topic: str,
        schema_registry_url: str,
    ):
        self.consumer = Consumer({
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,  # Manual commit for exactly-once
            "max.poll.interval.ms": 300000,
        })
        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "enable.idempotence": True,
            "acks": "all",
            "retries": 5,
        })
        self.input_topic = input_topic
        self.output_topic = output_topic
        self.running = True

    def process(self, transform_fn: Callable[[Dict], Optional[Dict]]):
        """Main processing loop with error handling."""
        self.consumer.subscribe([self.input_topic])
        logger.info(f"Subscribed to {self.input_topic}")

        batch_count = 0
        error_count = 0

        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    logger.error(f"Consumer error: {msg.error()}")
                    error_count += 1
                    continue

                try:
                    value = json.loads(msg.value().decode("utf-8"))
                    result = transform_fn(value)

                    if result is not None:
                        self.producer.produce(
                            self.output_topic,
                            key=msg.key(),
                            value=json.dumps(result).encode("utf-8"),
                            callback=self._delivery_callback,
                        )

                    batch_count += 1
                    if batch_count % 100 == 0:
                        self.producer.flush()
                        self.consumer.commit()
                        logger.info(f"Processed {batch_count} messages, {error_count} errors")

                except Exception as e:
                    error_count += 1
                    logger.error(f"Processing error for offset {msg.offset()}: {e}")
                    # Send to dead letter topic
                    self._send_to_dlq(msg, str(e))

        finally:
            self.consumer.close()
            self.producer.flush()

    def _delivery_callback(self, err, msg):
        if err:
            logger.error(f"Delivery failed: {err}")

    def _send_to_dlq(self, original_msg, error_msg: str):
        self.producer.produce(
            f"{self.input_topic}.dlq",
            key=original_msg.key(),
            value=json.dumps({
                "original": original_msg.value().decode("utf-8"),
                "error": error_msg,
            }).encode("utf-8"),
        )


# Usage
def enrich_order_event(event: Dict) -> Optional[Dict]:
    """Transform and enrich order events."""
    if event.get("status") == "cancelled":
        return None  # Filter out cancelled orders

    return {
        "order_id": event["order_id"],
        "customer_id": event["customer_id"],
        "total_amount": event["quantity"] * event["unit_price"],
        "order_date": event["created_at"][:10],
        "processed_at": "now",
    }


if __name__ == "__main__":
    processor = StreamProcessor(
        bootstrap_servers="kafka:9092",
        group_id="order-enrichment",
        input_topic="raw-orders",
        output_topic="enriched-orders",
        schema_registry_url="http://schema-registry:8081",
    )
    processor.process(enrich_order_event)

SQL Analytics Patterns

-- Cohort retention analysis
WITH first_purchase AS (
    SELECT
        customer_id,
        DATE_TRUNC('month', MIN(order_date)) AS cohort_month
    FROM fact_orders
    GROUP BY customer_id
),
monthly_activity AS (
    SELECT
        f.customer_id,
        fp.cohort_month,
        DATE_TRUNC('month', f.order_date) AS activity_month,
        EXTRACT(YEAR FROM AGE(DATE_TRUNC('month', f.order_date), fp.cohort_month)) * 12 +
        EXTRACT(MONTH FROM AGE(DATE_TRUNC('month', f.order_date), fp.cohort_month)) AS months_since_first
    FROM fact_orders f
    JOIN first_purchase fp ON f.customer_id = fp.customer_id
)
SELECT
    cohort_month,
    months_since_first,
    COUNT(DISTINCT customer_id) AS active_customers,
    ROUND(
        COUNT(DISTINCT customer_id)::DECIMAL /
        FIRST_VALUE(COUNT(DISTINCT customer_id)) OVER (
            PARTITION BY cohort_month ORDER BY months_since_first
        ) * 100, 1
    ) AS retention_pct
FROM monthly_activity
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;


-- Running totals with window functions
SELECT
    order_date,
    daily_revenue,
    SUM(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS cumulative_revenue,
    AVG(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS rolling_7d_avg,
    daily_revenue - LAG(daily_revenue, 7) OVER (ORDER BY order_date) AS wow_change
FROM (
    SELECT order_date, SUM(total_amount) AS daily_revenue
    FROM fact_orders
    GROUP BY order_date
) daily;


-- Funnel analysis
WITH funnel AS (
    SELECT
        session_id,
        MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) AS viewed,
        MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) AS added,
        MAX(CASE WHEN event_name = 'checkout_start' THEN 1 ELSE 0 END) AS checkout,
        MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS purchased
    FROM events
    WHERE event_date BETWEEN '2025-01-01' AND '2025-01-31'
    GROUP BY session_id
)
SELECT
    COUNT(*) AS total_sessions,
    SUM(viewed) AS viewed,
    SUM(added) AS added_to_cart,
    SUM(checkout) AS started_checkout,
    SUM(purchased) AS completed_purchase,
    ROUND(SUM(added)::DECIMAL / NULLIF(SUM(viewed), 0) * 100, 1) AS view_to_cart_pct,
    ROUND(SUM(purchased)::DECIMAL / NULLIF(SUM(added), 0) * 100, 1) AS cart_to_purchase_pct,
    ROUND(SUM(purchased)::DECIMAL / NULLIF(SUM(viewed), 0) * 100, 1) AS overall_conversion_pct
FROM funnel;

Data Quality Framework

# src/quality/checks.py
from dataclasses import dataclass
from typing import List, Callable, Any, Dict
import pandas as pd
import logging

logger = logging.getLogger(__name__)


@dataclass
class QualityCheckResult:
    check_name: str
    passed: bool
    metric_value: Any
    threshold: Any
    severity: str  # "critical", "warning", "info"
    message: str


class DataQualityChecker:
    """Reusable data quality framework."""

    def __init__(self):
        self.checks: List[Callable] = []

    def add_check(self, check_fn: Callable):
        self.checks.append(check_fn)
        return self

    def run_all(self, df: pd.DataFrame) -> List[QualityCheckResult]:
        results = []
        for check in self.checks:
            result = check(df)
            results.append(result)
            level = "PASS" if result.passed else "FAIL"
            logger.log(
                logging.INFO if result.passed else logging.ERROR,
                f"[{level}] {result.check_name}: {result.message}"
            )
        return results

    def assert_all_pass(self, df: pd.DataFrame):
        results = self.run_all(df)
        critical_failures = [r for r in results if not r.passed and r.severity == "critical"]
        if critical_failures:
            messages = "\n".join(f"  - {r.check_name}: {r.message}" for r in critical_failures)
            raise ValueError(f"Critical data quality failures:\n{messages}")


# Reusable check factories
def null_check(column: str, max_null_pct: float = 0.0, severity: str = "critical"):
    def check(df: pd.DataFrame) -> QualityCheckResult:
        null_pct = df[column].isnull().mean() * 100
        return QualityCheckResult(
            check_name=f"null_check_{column}",
            passed=null_pct <= max_null_pct,
            metric_value=round(null_pct, 2),
            threshold=max_null_pct,
            severity=severity,
            message=f"{column} null rate: {null_pct:.2f}% (threshold: {max_null_pct}%)",
        )
    return check


def uniqueness_check(column: str, severity: str = "critical"):
    def check(df: pd.DataFrame) -> QualityCheckResult:
        dup_count = df[column].duplicated().sum()
        return QualityCheckResult(
            check_name=f"unique_check_{column}",
            passed=dup_count == 0,
            metric_value=dup_count,
            threshold=0,
            severity=severity,
            message=f"{column} has {dup_count} duplicate values",
        )
    return check


def range_check(column: str, min_val=None, max_val=None, severity: str = "warning"):
    def check(df: pd.DataFrame) -> QualityCheckResult:
        violations = 0
        if min_val is not None:
            violations += (df[column] < min_val).sum()
        if max_val is not None:
            violations += (df[column] > max_val).sum()
        return QualityCheckResult(
            check_name=f"range_check_{column}",
            passed=violations == 0,
            metric_value=violations,
            threshold=f"[{min_val}, {max_val}]",
            severity=severity,
            message=f"{column} has {violations} out-of-range values",
        )
    return check


def row_count_check(min_rows: int, max_rows: int = None, severity: str = "critical"):
    def check(df: pd.DataFrame) -> QualityCheckResult:
        count = len(df)
        in_range = count >= min_rows and (max_rows is None or count <= max_rows)
        return QualityCheckResult(
            check_name="row_count_check",
            passed=in_range,
            metric_value=count,
            threshold=f">= {min_rows}" + (f", <= {max_rows}" if max_rows else ""),
            severity=severity,
            message=f"Row count: {count}",
        )
    return check


# Usage
checker = DataQualityChecker()
checker.add_check(null_check("order_id", max_null_pct=0))
checker.add_check(null_check("customer_id", max_null_pct=0))
checker.add_check(uniqueness_check("order_id"))
checker.add_check(range_check("total_amount", min_val=0, max_val=1_000_000))
checker.add_check(row_count_check(min_rows=100))

# checker.assert_all_pass(orders_df)

Anti-Patterns to Avoid

  1. ❌ Full table refreshes when incremental is possible β€” wastes compute and time
  2. ❌ No data validation between pipeline stages β€” propagates corrupt data
  3. ❌ Hardcoded SQL in Python strings β€” use templating (Jinja2, dbt)
  4. ❌ SELECT * in production queries β€” specify columns for performance and clarity
  5. ❌ Missing partitioning on large tables β€” full table scans destroy performance
  6. ❌ No idempotency β€” reruns create duplicates or corrupt state
  7. ❌ Tight coupling between pipeline steps β€” use contracts (schemas) at boundaries
  8. ❌ No monitoring or alerting β€” silent pipeline failures go unnoticed for days
  9. ❌ Storing PII without governance β€” encryption, masking, and access control are required
  10. ❌ No data lineage documentation β€” debugging becomes impossible

Adapted from buildwithclaude by Dave Poon (MIT)

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.