williamzujkowski

Data Engineering Pipeline Designer

3
0
# Install this skill:
npx skills add williamzujkowski/cognitive-toolworks --skill "Data Engineering Pipeline Designer"

Install specific skill from multi-skill repository

# Description

Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows.

# SKILL.md


name: Data Engineering Pipeline Designer
slug: data-pipeline-designer
description: Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows.
capabilities:
- Design DAGs for batch and streaming pipelines
- Embed data quality checks with Great Expectations
- Configure Airflow orchestration best practices
- Model transformations with dbt patterns
- Implement Kafka streaming architectures
- Define data lineage and governance controls
inputs:
- pipeline_type: batch | streaming | hybrid
- source_systems: array of data sources (databases, APIs, files, streams)
- transformation_requirements: business logic, aggregations, joins
- quality_requirements: data validation rules, SLAs, monitoring needs
- orchestration_platform: Airflow | Prefect | Dagster | custom
- target_systems: data warehouse, lake, lakehouse destinations
outputs:
- pipeline_architecture: JSON schema with components and flow
- dag_template: orchestration code (Airflow DAG, dbt project)
- quality_checks: Great Expectations suite configuration
- monitoring_config: alerts, SLAs, data lineage tracking
- implementation_guide: step-by-step deployment instructions
keywords:
- data-engineering
- airflow
- dbt
- great-expectations
- kafka
- data-quality
- orchestration
- ETL
- ELT
- data-pipeline
version: 1.0.0
owner: cognitive-toolworks
license: MIT
security: No secrets or PII in examples; use environment variables for credentials
links:
- https://airflow.apache.org/docs/
- https://docs.getdbt.com/
- https://greatexpectations.io/
- https://kafka.apache.org/documentation/


Purpose & When-To-Use

Trigger this skill when:

  • Designing a new data pipeline (batch, streaming, or hybrid)
  • Migrating from ETL to modern ELT patterns
  • Implementing data quality checks in existing pipelines
  • Troubleshooting data quality issues or pipeline failures
  • Establishing data orchestration best practices
  • Configuring real-time streaming data architectures
  • Setting up data lineage and governance controls

Do NOT use for:

  • Simple one-off data exports (use SQL directly)
  • BI tool configuration (separate concern)
  • ML model training pipelines (use mlops-lifecycle-manager skill)
  • Database schema design only (use database-optimization-analyzer)

Pre-Checks

Time normalization:

  • Compute NOW_ET = 2025-10-25T21:30:36-04:00 (NIST/time.gov, America/New_York)

Input validation:

  1. pipeline_type must be one of: batch, streaming, hybrid
  2. source_systems must contain at least one valid source
  3. transformation_requirements must specify business logic or be empty for raw ingestion
  4. quality_requirements must define at least one validation rule or SLA
  5. orchestration_platform must be specified (default: Airflow if omitted)
  6. target_systems must contain at least one destination

Abort conditions:

  • If source and target are identical (no transformation needed)
  • If pipeline_type=streaming but no stream source specified
  • If quality_requirements reference non-existent fields
  • If orchestration_platform is unsupported (emit TODO list)

Procedure

Tier 1 (≀2k tokens): Quick Pipeline Design

Use when: 80% of cases; standard batch pipeline with known patterns

Steps:

  1. Analyze inputs and classify pipeline pattern:
  2. Batch: scheduled ETL/ELT (daily, hourly)
  3. Streaming: real-time event processing (Kafka, Kinesis)
  4. Hybrid: batch + streaming (lambda architecture)

  5. Select orchestration approach:

  6. Airflow DAG for batch/hybrid (de facto standard, accessed 2025-10-25T21:30:36-04:00: https://www.astronomer.io/airflow/)
  7. Kafka + ksqlDB for streaming (accessed 2025-10-25T21:30:36-04:00: https://kafka.apache.org/documentation/)
  8. dbt for transformation layer (accessed 2025-10-25T21:30:36-04:00: https://docs.getdbt.com/)

  9. Generate pipeline architecture JSON:
    json { "pipeline_id": "<slug>", "type": "batch|streaming|hybrid", "orchestration": "airflow|kafka", "layers": { "ingestion": {"sources": [], "method": "full|incremental"}, "transformation": {"tool": "dbt", "models": []}, "quality": {"framework": "great_expectations", "checkpoints": []}, "storage": {"targets": [], "format": "parquet|delta"} }, "schedule": "cron|event-driven" }

  10. Output quick-start template:

  11. Airflow DAG skeleton with TaskGroups
  12. dbt project structure (staging β†’ intermediate β†’ marts)
  13. Great Expectations basic suite (nullity, uniqueness, ranges)

  14. Define monitoring:

  15. SLA alerts (Airflow SLAs or custom)
  16. Data quality thresholds (fail fast on critical checks)
  17. Lineage tracking (dbt docs, OpenLineage)

Token budget: T1 ≀ 2000 tokens

Tier 2 (≀6k tokens): Production-Ready Pipeline with Quality Gates

Use when: Production deployment, complex transformations, strict SLAs

Prerequisites: T1 completed OR inputs indicate production requirements

Steps:

  1. Deep-dive on data quality (accessed 2025-10-25T21:30:36-04:00: https://greatexpectations.io/):
  2. Profiling: auto-generate Expectations from sample data
  3. Critical validations: PK uniqueness, FK integrity, business rules
  4. Checkpoint strategy: pre-ingestion, post-transformation, pre-load
  5. Action on failure: block pipeline, alert, quarantine bad records

  6. Optimize Airflow DAG design (accessed 2025-10-25T21:30:36-04:00: https://medium.com/@datasmiles/mastering-apache-airflow-myessential-best-practices-for-robust-data-orchestration-095460505843):

  7. Idempotent tasks: same input β†’ same output (critical for backfills)
  8. Atomic tasks: one task = one action (fine-grained retry)
  9. Dynamic task generation: use TaskGroups for parallel sources
  10. XCom for state passing: avoid large payloads (use external state store)
  11. Connection management: use Airflow Connections, never hardcode credentials
  12. Resource optimization: pools, queues, executor type (LocalExecutor vs CeleryExecutor)

  13. Implement dbt best practices (accessed 2025-10-25T21:30:36-04:00: https://www.getdbt.com/blog/data-transformation-best-practices):

  14. Layer pattern: staging (raw + minimal cleaning) β†’ intermediate (business logic) β†’ marts (denormalized for analytics)
  15. One model = one logical transformation (no mega-models with 50 joins)
  16. Materialization strategy:
    • Views: lightweight, always fresh, slow queries
    • Tables: fast queries, stale until rebuild
    • Incremental: append-only or merge, handle late-arriving data
  17. Testing: not_null, unique, accepted_values, relationships
  18. Documentation: schema.yml with descriptions, dbt docs generate
  19. Macros for DRY: reusable SQL snippets (date ranges, common filters)

  20. Configure streaming (if applicable):

  21. Kafka architecture: producers β†’ topics (partitioned) β†’ consumers (accessed 2025-10-25T21:30:36-04:00: https://kafka.apache.org/)
  22. Partition strategy: by key (user_id, order_id) for ordering guarantees
  23. Consumer groups: parallel processing, fault tolerance
  24. Schema registry: Avro/Protobuf for schema evolution (accessed 2025-10-25T21:30:36-04:00: https://www.confluent.io/blog/streaming-data-pipeline-with-apache-kafka-and-ksqldb/)
  25. ksqlDB for stream transformations: joins, aggregations, windowing
  26. Exactly-once semantics: idempotent producers + transactional consumers

  27. Data lineage and governance:

  28. OpenLineage integration: capture lineage from Airflow and dbt
  29. Data catalog: tag PII, set retention policies
  30. RBAC: column-level access controls in warehouse
  31. Audit logs: who accessed what data, when

  32. Monitoring and alerting:

  33. Pipeline health: Airflow UI, metrics to Prometheus/Datadog
  34. Data quality dashboards: Great Expectations Data Docs
  35. SLA violations: PagerDuty/Slack integration
  36. Cost tracking: warehouse query costs, Airflow compute

Token budget: T2 ≀ 6000 tokens

Tier 3 (≀12k tokens): Advanced Patterns and Optimization

Use when: Handling PB-scale data, multi-region, complex event-driven patterns

Note: This skill is scoped to T2. For T3 scenarios:

  • TODO: Consult mlops-lifecycle-manager for ML feature pipelines
  • TODO: Consult database-optimization-analyzer for warehouse tuning
  • TODO: Consult cloud-native-deployment-orchestrator for Kubernetes-based orchestration

Not implemented in v1.0.0.

Decision Rules

When to choose batch vs streaming:

  • Batch if: data arrives in bulk, latency tolerance >1 hour, simpler to maintain
  • Streaming if: sub-second latency required, event-driven triggers, real-time analytics
  • Hybrid if: both real-time dashboards AND overnight batch reporting needed

When to use incremental vs full refresh:

  • Full refresh: small tables (<10M rows), idempotent, no history tracking
  • Incremental: large tables (>100M rows), append-only or merge strategy, track watermarks

When to fail vs warn on data quality issues:

  • Fail (block pipeline): critical business rules (revenue calculations, PK violations)
  • Warn (continue with alert): non-critical outliers, minor formatting issues
  • Quarantine: isolate bad records, process good records, manual review queue

Orchestration platform selection:

  • Airflow: if need Python flexibility, complex dependencies, mature ecosystem
  • Prefect: if want modern UI, easier local development, Pythonic
  • Dagster: if want software-defined assets, testing-first approach
  • Cloud-native (Step Functions, Cloud Composer): if locked into cloud vendor

Abort conditions:

  • No clear transformation logic defined β†’ emit TODO: "Specify business rules"
  • Source schema unknown β†’ emit TODO: "Profile source data first"
  • Target warehouse not provisioned β†’ emit TODO: "Setup destination infra"

Output Contract

Required fields:

{
  "pipeline_architecture": {
    "pipeline_id": "string (slug format)",
    "type": "batch|streaming|hybrid",
    "orchestration": {
      "platform": "airflow|prefect|dagster|kafka",
      "schedule": "cron expression | event-driven",
      "parallelism": "integer (max concurrent tasks)"
    },
    "layers": {
      "ingestion": {
        "sources": ["array of source configs"],
        "method": "full|incremental",
        "connector": "native|fivetran|airbyte|custom"
      },
      "transformation": {
        "tool": "dbt|spark|custom",
        "models": ["array of model names"],
        "materialization": "view|table|incremental"
      },
      "quality": {
        "framework": "great_expectations|dbt_tests|custom",
        "checkpoints": ["array of checkpoint configs"],
        "action_on_failure": "block|warn|quarantine"
      },
      "storage": {
        "targets": ["array of target configs"],
        "format": "parquet|delta|iceberg|avro"
      }
    },
    "monitoring": {
      "slas": ["array of SLA definitions"],
      "alerts": ["array of alert configs"],
      "lineage": "openlineage|datahub|custom"
    }
  },
  "dag_template": "string (executable code or path to resource)",
  "quality_checks": "string (Great Expectations suite YAML or dbt test SQL)",
  "monitoring_config": "string (alert rules, dashboard JSON)",
  "implementation_guide": "array of step-by-step instructions"
}

Optional fields:

  • cost_estimate: projected monthly cost (warehouse + orchestration + storage)
  • performance_benchmarks: expected throughput, latency targets
  • rollback_plan: how to revert if pipeline fails in production

Validation:

  • pipeline_id must be unique, slug format (lowercase, hyphens)
  • schedule must be valid cron OR event trigger definition
  • sources and targets must have valid connection info (no credentials in output)
  • All referenced models must exist in transformation layer

Examples

Example 1: Batch ELT Pipeline (E-commerce Orders)

# Input
pipeline_type: batch
source_systems: [{type: postgres, name: orders_db, tables: [orders, customers]}]
transformation_requirements: [Join orders+customers, Calculate daily revenue]
quality_requirements: [order_id unique, order_total > 0]
orchestration_platform: airflow
target_systems: [{type: snowflake, schema: analytics}]
schedule: 0 2 * * *

# Output (abbreviated)
pipeline_architecture:
  pipeline_id: ecommerce-orders-elt
  type: batch
  orchestration: {platform: airflow, schedule: "0 2 * * *"}
  layers:
    ingestion:
      sources: [orders_db.orders, orders_db.customers]
      method: incremental
    transformation:
      tool: dbt
      models: [stg_orders, int_order_metrics, fct_daily_revenue]
    quality:
      framework: great_expectations
      checkpoints: [staging_check, marts_check]

Quality Gates

Token budgets (enforced):

  • T1: ≀2000 tokens (quick design, standard patterns)
  • T2: ≀6000 tokens (production-ready, quality gates, monitoring)
  • T3: Not implemented in v1.0.0

Safety checks:

  • Never emit credentials or API keys in outputs
  • Always use environment variables or secret managers (Airflow Connections, AWS Secrets Manager)
  • Validate that quality checks don't reference PII columns without encryption

Auditability:

  • All architecture decisions logged in implementation_guide
  • Source links with access dates for claims (Airflow docs, dbt docs, etc.)
  • Version transformations with dbt git tags or Airflow DAG versions

Determinism:

  • Same inputs β†’ same pipeline architecture JSON
  • Idempotent DAG designs (safe to re-run)
  • Incremental models handle late-arriving data gracefully

Resources

Official documentation (accessed 2025-10-25T21:30:36-04:00):

  • Apache Airflow: https://airflow.apache.org/docs/
  • dbt (data build tool): https://docs.getdbt.com/
  • Great Expectations: https://greatexpectations.io/
  • Apache Kafka: https://kafka.apache.org/documentation/

Best practices guides:

  • Airflow orchestration patterns: https://www.astronomer.io/airflow/
  • dbt transformation best practices: https://www.getdbt.com/blog/data-transformation-best-practices
  • Modern data stack architecture: https://www.getdbt.com/blog/data-integration

Templates and examples:

  • Located in /skills/data-pipeline-designer/resources/
  • airflow-dag-template.py: Production-ready DAG with TaskGroups and SLAs
  • dbt-project-structure.yml: Layered dbt project (staging β†’ marts)
  • great-expectations-suite.yml: Common data quality checks
  • kafka-streaming-config.json: Schema registry + consumer group setup

Related skills:

  • database-optimization-analyzer: For warehouse query tuning and indexing
  • devops-pipeline-architect: For CI/CD of pipeline code
  • cloud-native-deployment-orchestrator: For Kubernetes-based Airflow deployments

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.