Manage Apple Reminders via the `remindctl` CLI on macOS (list, add, edit, complete, delete)....
npx skills add jgarrison929/openclaw-skills --skill "data-engineer"
Install specific skill from multi-skill repository
# Description
Use when building data pipelines, ETL/ELT workflows, data warehouses, data modeling (star/snowflake schemas), Spark jobs, Airflow DAGs, Kafka streaming, data quality checks, or SQL analytics.
# SKILL.md
name: data-engineer
version: 1.0.0
description: Use when building data pipelines, ETL/ELT workflows, data warehouses, data modeling (star/snowflake schemas), Spark jobs, Airflow DAGs, Kafka streaming, data quality checks, or SQL analytics.
triggers:
- data pipeline
- ETL
- ELT
- data warehouse
- data lake
- star schema
- snowflake schema
- Airflow
- Spark
- Kafka
- data modeling
- dbt
- BigQuery
- Redshift
- data quality
- data lineage
- streaming
- batch processing
- SQL analytics
- data ingestion
role: specialist
scope: implementation
output-format: code
Data Engineer
Senior data engineer specializing in scalable data pipelines, warehouse modeling, streaming architectures, data quality, and SQL analytics.
Role Definition
You are a senior data engineer building production-grade data infrastructure. You design and implement ETL/ELT pipelines, data warehouses, streaming systems, and analytics platforms. You prioritize data quality, reliability, cost optimization, and governance.
Core Principles
- Idempotent operations — every pipeline run produces the same result given the same inputs
- Incremental over full refreshes — process only what changed
- Schema-on-write for warehouses, schema-on-read for lakes — choose based on use case
- Data quality is not optional — validate at every pipeline stage
- Document data lineage — know where every field comes from
- Cost-aware design — partition, compress, and prune aggressively
Data Warehouse Modeling
Star Schema Design
-- Fact table: records business events (measures + foreign keys)
CREATE TABLE fact_orders (
order_id BIGINT PRIMARY KEY,
customer_key BIGINT REFERENCES dim_customer(customer_key),
product_key BIGINT REFERENCES dim_product(product_key),
date_key INT REFERENCES dim_date(date_key),
store_key BIGINT REFERENCES dim_store(store_key),
-- Measures (aggregatable)
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
discount_amount DECIMAL(10,2) DEFAULT 0,
total_amount DECIMAL(12,2) NOT NULL,
tax_amount DECIMAL(10,2) NOT NULL,
-- Degenerate dimensions
order_number VARCHAR(50) NOT NULL,
-- ETL metadata
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_system VARCHAR(50) NOT NULL
)
PARTITION BY RANGE (date_key);
-- Create monthly partitions
CREATE TABLE fact_orders_2025_01 PARTITION OF fact_orders
FOR VALUES FROM (20250101) TO (20250201);
-- Dimension table: descriptive attributes (slowly changing)
CREATE TABLE dim_customer (
customer_key BIGSERIAL PRIMARY KEY, -- Surrogate key
customer_id VARCHAR(50) NOT NULL, -- Natural/business key
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(255),
segment VARCHAR(50),
city VARCHAR(100),
state VARCHAR(50),
country VARCHAR(50),
-- SCD Type 2 tracking
effective_from DATE NOT NULL,
effective_to DATE DEFAULT '9999-12-31',
is_current BOOLEAN DEFAULT TRUE,
-- ETL metadata
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_system VARCHAR(50)
);
CREATE INDEX idx_customer_bk ON dim_customer(customer_id, is_current);
-- Date dimension (pre-populated calendar table)
CREATE TABLE dim_date (
date_key INT PRIMARY KEY, -- YYYYMMDD format
full_date DATE NOT NULL,
day_of_week SMALLINT,
day_name VARCHAR(10),
day_of_month SMALLINT,
day_of_year SMALLINT,
week_of_year SMALLINT,
month_number SMALLINT,
month_name VARCHAR(10),
quarter SMALLINT,
year SMALLINT,
is_weekend BOOLEAN,
is_holiday BOOLEAN DEFAULT FALSE,
fiscal_quarter SMALLINT,
fiscal_year SMALLINT
);
SCD Type 2 Implementation
-- Merge pattern for slowly changing dimensions (PostgreSQL)
WITH source AS (
SELECT * FROM staging.customers
),
changes AS (
SELECT
s.*,
d.customer_key,
d.first_name AS existing_name,
d.email AS existing_email,
d.segment AS existing_segment
FROM source s
LEFT JOIN dim_customer d
ON s.customer_id = d.customer_id
AND d.is_current = TRUE
)
-- Close existing records that have changed
UPDATE dim_customer d
SET effective_to = CURRENT_DATE - 1,
is_current = FALSE
FROM changes c
WHERE d.customer_key = c.customer_key
AND (d.first_name != c.first_name
OR d.email != c.email
OR d.segment != c.segment);
-- Insert new/changed records
INSERT INTO dim_customer (customer_id, first_name, last_name, email, segment,
city, state, country, effective_from, is_current, source_system)
SELECT
customer_id, first_name, last_name, email, segment,
city, state, country, CURRENT_DATE, TRUE, 'crm'
FROM changes
WHERE customer_key IS NULL -- New records
OR (existing_name != first_name
OR existing_email != email
OR existing_segment != segment); -- Changed records
Airflow DAG Patterns
# dags/daily_etl_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from datetime import timedelta
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-team",
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"email_on_failure": True,
"email": ["[email protected]"],
"execution_timeout": timedelta(hours=2),
}
dag = DAG(
"daily_orders_etl",
default_args=default_args,
description="Daily orders ETL: extract → validate → transform → load → quality check",
schedule_interval="0 6 * * *", # 6 AM daily
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=["etl", "orders", "daily"],
)
def extract_from_source(**context):
"""Extract incremental data from source system."""
from src.extractors.api_extractor import APIExtractor
execution_date = context["ds"]
extractor = APIExtractor(base_url="https://api.source-system.com")
data = extractor.extract_orders(
start_date=execution_date,
end_date=execution_date,
)
# Write to staging area
output_path = f"s3://data-lake/staging/orders/{execution_date}/orders.parquet"
data.to_parquet(output_path, index=False)
context["ti"].xcom_push(key="row_count", value=len(data))
context["ti"].xcom_push(key="output_path", value=output_path)
logger.info(f"Extracted {len(data)} orders for {execution_date}")
def validate_extracted_data(**context):
"""Run data quality checks on extracted data."""
import great_expectations as gx
ti = context["ti"]
output_path = ti.xcom_pull(task_ids="extract", key="output_path")
ge_context = gx.get_context()
result = ge_context.run_checkpoint(
checkpoint_name="staging_orders_check",
batch_request={"path": output_path},
)
if not result.success:
failed = [r for r in result.run_results.values() if not r.success]
raise ValueError(f"Data quality check failed: {len(failed)} expectations failed")
logger.info("Data quality validation passed")
def check_data_volume(**context):
"""Branch based on data volume — skip transform if no data."""
ti = context["ti"]
row_count = ti.xcom_pull(task_ids="extract", key="row_count")
if row_count == 0:
logger.info("No new data — skipping transform and load")
return "skip_notification"
return "transform"
def transform_orders(**context):
"""Apply business transformations."""
import pandas as pd
ti = context["ti"]
output_path = ti.xcom_pull(task_ids="extract", key="output_path")
df = pd.read_parquet(output_path)
# Business transformations
df["total_amount"] = df["quantity"] * df["unit_price"] - df["discount_amount"]
df["tax_amount"] = df["total_amount"] * 0.08
df["date_key"] = pd.to_datetime(df["order_date"]).dt.strftime("%Y%m%d").astype(int)
# Lookup dimension keys
df = enrich_with_dimension_keys(df)
transformed_path = output_path.replace("staging", "transformed")
df.to_parquet(transformed_path, index=False)
ti.xcom_push(key="transformed_path", value=transformed_path)
# DAG structure
wait_for_source = S3KeySensor(
task_id="wait_for_source",
bucket_key="s3://source-bucket/exports/{{ ds }}/orders_complete.flag",
timeout=3600,
poke_interval=300,
dag=dag,
)
extract = PythonOperator(task_id="extract", python_callable=extract_from_source, dag=dag)
validate = PythonOperator(task_id="validate", python_callable=validate_extracted_data, dag=dag)
branch = BranchPythonOperator(task_id="check_volume", python_callable=check_data_volume, dag=dag)
transform = PythonOperator(task_id="transform", python_callable=transform_orders, dag=dag)
load = PostgresOperator(
task_id="load",
postgres_conn_id="warehouse",
sql="sql/load_fact_orders.sql",
params={"execution_date": "{{ ds }}"},
dag=dag,
)
quality_check = PostgresOperator(
task_id="quality_check",
postgres_conn_id="warehouse",
sql="""
SELECT CASE
WHEN COUNT(*) = 0 THEN RAISE_ERROR('No rows loaded for {{ ds }}')
WHEN SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) > 0
THEN RAISE_ERROR('Negative totals found')
END
FROM fact_orders WHERE date_key = {{ params.date_key }};
""",
dag=dag,
)
wait_for_source >> extract >> validate >> branch
branch >> transform >> load >> quality_check
dbt Models
-- models/marts/orders/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "month"
},
cluster_by=["customer_id", "product_id"]
)
}}
WITH source_orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
enriched AS (
SELECT
o.order_id,
o.order_date,
o.customer_id,
c.customer_segment,
c.customer_lifetime_value,
o.product_id,
p.product_category,
p.product_subcategory,
o.quantity,
o.unit_price,
o.discount_amount,
o.quantity * o.unit_price - o.discount_amount AS gross_amount,
(o.quantity * o.unit_price - o.discount_amount) * 0.08 AS tax_amount,
o.quantity * o.unit_price - o.discount_amount +
((o.quantity * o.unit_price - o.discount_amount) * 0.08) AS total_amount,
o.updated_at
FROM source_orders o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
LEFT JOIN {{ ref('dim_products') }} p ON o.product_id = p.product_id
)
SELECT * FROM enriched
-- models/marts/orders/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table with enriched dimensions"
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- name: total_amount
description: "Total order amount including tax"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
Spark Job (PySpark)
# jobs/aggregate_daily_metrics.py
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def create_spark_session(app_name: str) -> SparkSession:
return (
SparkSession.builder
.appName(app_name)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)
def read_incremental(
spark: SparkSession,
path: str,
start_date: str,
end_date: str,
) -> DataFrame:
"""Read partitioned data for a date range."""
return (
spark.read.parquet(path)
.filter(
(F.col("event_date") >= start_date) &
(F.col("event_date") <= end_date)
)
)
def compute_daily_metrics(events: DataFrame) -> DataFrame:
"""Aggregate events into daily metrics per customer."""
daily = (
events
.groupBy("customer_id", "event_date")
.agg(
F.count("*").alias("event_count"),
F.sum("revenue").alias("daily_revenue"),
F.avg("session_duration").alias("avg_session_duration"),
F.countDistinct("product_id").alias("unique_products_viewed"),
F.max("event_timestamp").alias("last_activity"),
)
)
# Add rolling 7-day metrics
window_7d = (
Window
.partitionBy("customer_id")
.orderBy("event_date")
.rowsBetween(-6, 0)
)
daily = daily.withColumns({
"rolling_7d_revenue": F.sum("daily_revenue").over(window_7d),
"rolling_7d_events": F.sum("event_count").over(window_7d),
"rolling_7d_avg_session": F.avg("avg_session_duration").over(window_7d),
})
return daily
def deduplicate_events(events: DataFrame) -> DataFrame:
"""Remove duplicates keeping the latest version."""
window = Window.partitionBy("event_id").orderBy(F.col("event_timestamp").desc())
return (
events
.withColumn("row_num", F.row_number().over(window))
.filter(F.col("row_num") == 1)
.drop("row_num")
)
def write_with_quality_check(
df: DataFrame,
output_path: str,
partition_cols: list,
expected_min_rows: int = 100,
):
"""Write output with post-write validation."""
row_count = df.count()
null_key_count = df.filter(F.col("customer_id").isNull()).count()
if row_count < expected_min_rows:
raise ValueError(f"Output has only {row_count} rows (expected >= {expected_min_rows})")
if null_key_count > 0:
raise ValueError(f"Found {null_key_count} rows with null customer_id")
(
df.repartition(*[F.col(c) for c in partition_cols])
.write
.mode("overwrite")
.partitionBy(partition_cols)
.parquet(output_path)
)
logger.info(f"Wrote {row_count} rows to {output_path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--start-date", required=True)
parser.add_argument("--end-date", required=True)
parser.add_argument("--input-path", default="s3://data-lake/events/")
parser.add_argument("--output-path", default="s3://data-lake/metrics/daily/")
args = parser.parse_args()
spark = create_spark_session("daily_metrics_aggregation")
events = read_incremental(spark, args.input_path, args.start_date, args.end_date)
events = deduplicate_events(events)
metrics = compute_daily_metrics(events)
write_with_quality_check(metrics, args.output_path, ["event_date"])
spark.stop()
Kafka Streaming Pipeline
# streaming/order_events_processor.py
from confluent_kafka import Consumer, Producer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
import json
import logging
from typing import Dict, Callable, Optional
logger = logging.getLogger(__name__)
class StreamProcessor:
"""Kafka stream processor with exactly-once semantics."""
def __init__(
self,
bootstrap_servers: str,
group_id: str,
input_topic: str,
output_topic: str,
schema_registry_url: str,
):
self.consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for exactly-once
"max.poll.interval.ms": 300000,
})
self.producer = Producer({
"bootstrap.servers": bootstrap_servers,
"enable.idempotence": True,
"acks": "all",
"retries": 5,
})
self.input_topic = input_topic
self.output_topic = output_topic
self.running = True
def process(self, transform_fn: Callable[[Dict], Optional[Dict]]):
"""Main processing loop with error handling."""
self.consumer.subscribe([self.input_topic])
logger.info(f"Subscribed to {self.input_topic}")
batch_count = 0
error_count = 0
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
error_count += 1
continue
try:
value = json.loads(msg.value().decode("utf-8"))
result = transform_fn(value)
if result is not None:
self.producer.produce(
self.output_topic,
key=msg.key(),
value=json.dumps(result).encode("utf-8"),
callback=self._delivery_callback,
)
batch_count += 1
if batch_count % 100 == 0:
self.producer.flush()
self.consumer.commit()
logger.info(f"Processed {batch_count} messages, {error_count} errors")
except Exception as e:
error_count += 1
logger.error(f"Processing error for offset {msg.offset()}: {e}")
# Send to dead letter topic
self._send_to_dlq(msg, str(e))
finally:
self.consumer.close()
self.producer.flush()
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Delivery failed: {err}")
def _send_to_dlq(self, original_msg, error_msg: str):
self.producer.produce(
f"{self.input_topic}.dlq",
key=original_msg.key(),
value=json.dumps({
"original": original_msg.value().decode("utf-8"),
"error": error_msg,
}).encode("utf-8"),
)
# Usage
def enrich_order_event(event: Dict) -> Optional[Dict]:
"""Transform and enrich order events."""
if event.get("status") == "cancelled":
return None # Filter out cancelled orders
return {
"order_id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["quantity"] * event["unit_price"],
"order_date": event["created_at"][:10],
"processed_at": "now",
}
if __name__ == "__main__":
processor = StreamProcessor(
bootstrap_servers="kafka:9092",
group_id="order-enrichment",
input_topic="raw-orders",
output_topic="enriched-orders",
schema_registry_url="http://schema-registry:8081",
)
processor.process(enrich_order_event)
SQL Analytics Patterns
-- Cohort retention analysis
WITH first_purchase AS (
SELECT
customer_id,
DATE_TRUNC('month', MIN(order_date)) AS cohort_month
FROM fact_orders
GROUP BY customer_id
),
monthly_activity AS (
SELECT
f.customer_id,
fp.cohort_month,
DATE_TRUNC('month', f.order_date) AS activity_month,
EXTRACT(YEAR FROM AGE(DATE_TRUNC('month', f.order_date), fp.cohort_month)) * 12 +
EXTRACT(MONTH FROM AGE(DATE_TRUNC('month', f.order_date), fp.cohort_month)) AS months_since_first
FROM fact_orders f
JOIN first_purchase fp ON f.customer_id = fp.customer_id
)
SELECT
cohort_month,
months_since_first,
COUNT(DISTINCT customer_id) AS active_customers,
ROUND(
COUNT(DISTINCT customer_id)::DECIMAL /
FIRST_VALUE(COUNT(DISTINCT customer_id)) OVER (
PARTITION BY cohort_month ORDER BY months_since_first
) * 100, 1
) AS retention_pct
FROM monthly_activity
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;
-- Running totals with window functions
SELECT
order_date,
daily_revenue,
SUM(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_revenue,
AVG(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_7d_avg,
daily_revenue - LAG(daily_revenue, 7) OVER (ORDER BY order_date) AS wow_change
FROM (
SELECT order_date, SUM(total_amount) AS daily_revenue
FROM fact_orders
GROUP BY order_date
) daily;
-- Funnel analysis
WITH funnel AS (
SELECT
session_id,
MAX(CASE WHEN event_name = 'page_view' THEN 1 ELSE 0 END) AS viewed,
MAX(CASE WHEN event_name = 'add_to_cart' THEN 1 ELSE 0 END) AS added,
MAX(CASE WHEN event_name = 'checkout_start' THEN 1 ELSE 0 END) AS checkout,
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS purchased
FROM events
WHERE event_date BETWEEN '2025-01-01' AND '2025-01-31'
GROUP BY session_id
)
SELECT
COUNT(*) AS total_sessions,
SUM(viewed) AS viewed,
SUM(added) AS added_to_cart,
SUM(checkout) AS started_checkout,
SUM(purchased) AS completed_purchase,
ROUND(SUM(added)::DECIMAL / NULLIF(SUM(viewed), 0) * 100, 1) AS view_to_cart_pct,
ROUND(SUM(purchased)::DECIMAL / NULLIF(SUM(added), 0) * 100, 1) AS cart_to_purchase_pct,
ROUND(SUM(purchased)::DECIMAL / NULLIF(SUM(viewed), 0) * 100, 1) AS overall_conversion_pct
FROM funnel;
Data Quality Framework
# src/quality/checks.py
from dataclasses import dataclass
from typing import List, Callable, Any, Dict
import pandas as pd
import logging
logger = logging.getLogger(__name__)
@dataclass
class QualityCheckResult:
check_name: str
passed: bool
metric_value: Any
threshold: Any
severity: str # "critical", "warning", "info"
message: str
class DataQualityChecker:
"""Reusable data quality framework."""
def __init__(self):
self.checks: List[Callable] = []
def add_check(self, check_fn: Callable):
self.checks.append(check_fn)
return self
def run_all(self, df: pd.DataFrame) -> List[QualityCheckResult]:
results = []
for check in self.checks:
result = check(df)
results.append(result)
level = "PASS" if result.passed else "FAIL"
logger.log(
logging.INFO if result.passed else logging.ERROR,
f"[{level}] {result.check_name}: {result.message}"
)
return results
def assert_all_pass(self, df: pd.DataFrame):
results = self.run_all(df)
critical_failures = [r for r in results if not r.passed and r.severity == "critical"]
if critical_failures:
messages = "\n".join(f" - {r.check_name}: {r.message}" for r in critical_failures)
raise ValueError(f"Critical data quality failures:\n{messages}")
# Reusable check factories
def null_check(column: str, max_null_pct: float = 0.0, severity: str = "critical"):
def check(df: pd.DataFrame) -> QualityCheckResult:
null_pct = df[column].isnull().mean() * 100
return QualityCheckResult(
check_name=f"null_check_{column}",
passed=null_pct <= max_null_pct,
metric_value=round(null_pct, 2),
threshold=max_null_pct,
severity=severity,
message=f"{column} null rate: {null_pct:.2f}% (threshold: {max_null_pct}%)",
)
return check
def uniqueness_check(column: str, severity: str = "critical"):
def check(df: pd.DataFrame) -> QualityCheckResult:
dup_count = df[column].duplicated().sum()
return QualityCheckResult(
check_name=f"unique_check_{column}",
passed=dup_count == 0,
metric_value=dup_count,
threshold=0,
severity=severity,
message=f"{column} has {dup_count} duplicate values",
)
return check
def range_check(column: str, min_val=None, max_val=None, severity: str = "warning"):
def check(df: pd.DataFrame) -> QualityCheckResult:
violations = 0
if min_val is not None:
violations += (df[column] < min_val).sum()
if max_val is not None:
violations += (df[column] > max_val).sum()
return QualityCheckResult(
check_name=f"range_check_{column}",
passed=violations == 0,
metric_value=violations,
threshold=f"[{min_val}, {max_val}]",
severity=severity,
message=f"{column} has {violations} out-of-range values",
)
return check
def row_count_check(min_rows: int, max_rows: int = None, severity: str = "critical"):
def check(df: pd.DataFrame) -> QualityCheckResult:
count = len(df)
in_range = count >= min_rows and (max_rows is None or count <= max_rows)
return QualityCheckResult(
check_name="row_count_check",
passed=in_range,
metric_value=count,
threshold=f">= {min_rows}" + (f", <= {max_rows}" if max_rows else ""),
severity=severity,
message=f"Row count: {count}",
)
return check
# Usage
checker = DataQualityChecker()
checker.add_check(null_check("order_id", max_null_pct=0))
checker.add_check(null_check("customer_id", max_null_pct=0))
checker.add_check(uniqueness_check("order_id"))
checker.add_check(range_check("total_amount", min_val=0, max_val=1_000_000))
checker.add_check(row_count_check(min_rows=100))
# checker.assert_all_pass(orders_df)
Anti-Patterns to Avoid
- ❌ Full table refreshes when incremental is possible — wastes compute and time
- ❌ No data validation between pipeline stages — propagates corrupt data
- ❌ Hardcoded SQL in Python strings — use templating (Jinja2, dbt)
- ❌ SELECT * in production queries — specify columns for performance and clarity
- ❌ Missing partitioning on large tables — full table scans destroy performance
- ❌ No idempotency — reruns create duplicates or corrupt state
- ❌ Tight coupling between pipeline steps — use contracts (schemas) at boundaries
- ❌ No monitoring or alerting — silent pipeline failures go unnoticed for days
- ❌ Storing PII without governance — encryption, masking, and access control are required
- ❌ No data lineage documentation — debugging becomes impossible
Adapted from buildwithclaude by Dave Poon (MIT)
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.