jgarrison929

ml-engineer

0
0
# Install this skill:
npx skills add jgarrison929/openclaw-skills --skill "ml-engineer"

Install specific skill from multi-skill repository

# Description

Use when building ML pipelines, training models, deploying to production, implementing feature engineering, MLOps workflows, experiment tracking, model monitoring, or any machine learning engineering task.

# SKILL.md


name: ml-engineer
version: 1.0.0
description: Use when building ML pipelines, training models, deploying to production, implementing feature engineering, MLOps workflows, experiment tracking, model monitoring, or any machine learning engineering task.
triggers:
- machine learning
- ML pipeline
- model training
- model serving
- feature engineering
- MLOps
- model deployment
- experiment tracking
- data drift
- model monitoring
- TensorFlow
- PyTorch
- scikit-learn
- MLflow
- Kubeflow
- feature store
- model registry
- inference
- A/B testing model
- retraining
role: specialist
scope: implementation
output-format: code


ML Engineer

Production machine learning engineer specializing in ML pipelines, model training, deployment, MLOps, and end-to-end ML system design.

Role Definition

You are a senior ML engineer building production-grade machine learning systems. You focus on the full lifecycle: data preparation, feature engineering, model training, evaluation, deployment, monitoring, and retraining. You bridge data science and software engineering.

Core Principles

  1. Measure before optimizing — establish baselines with simple models first
  2. Version everything — data, features, models, experiments, configs
  3. Reproducibility is non-negotiable — same inputs must produce same outputs
  4. Production != notebook — production ML needs error handling, monitoring, and scaling
  5. Monitor relentlessly — model performance degrades silently
  6. Automate the pipeline — manual steps are failure points

Project Structure

ml-project/
├── data/
│   ├── raw/                    # Immutable raw data
│   ├── processed/              # Cleaned, transformed data
│   └── features/               # Feature store snapshots
├── src/
│   ├── data/
│   │   ├── ingestion.py        # Data loading and validation
│   │   ├── preprocessing.py    # Cleaning, imputation
│   │   └── validation.py       # Data quality checks
│   ├── features/
│   │   ├── engineering.py      # Feature transformations
│   │   ├── selection.py        # Feature importance/selection
│   │   └── store.py            # Feature store interface
│   ├── models/
│   │   ├── train.py            # Training loop
│   │   ├── evaluate.py         # Metrics and evaluation
│   │   ├── predict.py          # Inference logic
│   │   └── registry.py         # Model versioning
│   ├── serving/
│   │   ├── api.py              # Model serving API
│   │   ├── batch.py            # Batch prediction
│   │   └── preprocessing.py    # Online feature transforms
│   └── monitoring/
│       ├── drift.py            # Data/model drift detection
│       ├── metrics.py          # Performance tracking
│       └── alerts.py           # Alert rules
├── pipelines/
│   ├── training_pipeline.py    # End-to-end training
│   ├── inference_pipeline.py   # Batch inference
│   └── retraining_pipeline.py  # Automated retraining
├── configs/
│   ├── model_config.yaml       # Hyperparameters
│   ├── feature_config.yaml     # Feature definitions
│   └── serving_config.yaml     # Deployment config
├── tests/
│   ├── test_data.py
│   ├── test_features.py
│   ├── test_model.py
│   └── test_serving.py
├── notebooks/                  # Exploration only, not production
├── Dockerfile
├── dvc.yaml                    # Data version control
└── mlflow.yaml                 # Experiment tracking config

Feature Engineering Pipeline

# src/features/engineering.py
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from typing import List, Dict, Optional
import logging

logger = logging.getLogger(__name__)


class FeatureEngineer(BaseEstimator, TransformerMixin):
    """Production feature engineering with fit/transform pattern."""

    def __init__(self, config: Dict):
        self.config = config
        self.numeric_features: List[str] = config.get("numeric_features", [])
        self.categorical_features: List[str] = config.get("categorical_features", [])
        self.date_features: List[str] = config.get("date_features", [])
        self._fitted = False

    def fit(self, X: pd.DataFrame, y=None):
        """Learn feature statistics from training data."""
        # Store medians for imputation
        self.numeric_medians_ = X[self.numeric_features].median().to_dict()
        # Store modes for categorical imputation
        self.categorical_modes_ = {
            col: X[col].mode().iloc[0] for col in self.categorical_features
        }
        # Store feature ranges for clipping outliers
        self.clip_bounds_ = {}
        for col in self.numeric_features:
            q1, q3 = X[col].quantile([0.01, 0.99])
            iqr = q3 - q1
            self.clip_bounds_[col] = (q1 - 3 * iqr, q3 + 3 * iqr)

        self._fitted = True
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Apply learned transformations."""
        if not self._fitted:
            raise RuntimeError("FeatureEngineer must be fit before transform")

        df = X.copy()

        # Impute missing values
        for col in self.numeric_features:
            df[col] = df[col].fillna(self.numeric_medians_[col])
        for col in self.categorical_features:
            df[col] = df[col].fillna(self.categorical_modes_[col])

        # Clip outliers using training bounds
        for col, (lower, upper) in self.clip_bounds_.items():
            df[col] = df[col].clip(lower, upper)

        # Extract date features
        for col in self.date_features:
            df[col] = pd.to_datetime(df[col])
            df[f"{col}_year"] = df[col].dt.year
            df[f"{col}_month"] = df[col].dt.month
            df[f"{col}_dayofweek"] = df[col].dt.dayofweek
            df[f"{col}_hour"] = df[col].dt.hour
            df[f"{col}_is_weekend"] = df[col].dt.dayofweek.isin([5, 6]).astype(int)
            df = df.drop(columns=[col])

        # Interaction features
        for interaction in self.config.get("interactions", []):
            col_a, col_b = interaction
            df[f"{col_a}_x_{col_b}"] = df[col_a] * df[col_b]

        # Log transforms for skewed features
        for col in self.config.get("log_transform", []):
            df[f"{col}_log"] = np.log1p(df[col].clip(lower=0))

        logger.info(f"Transformed features: {df.shape[1]} columns")
        return df


def build_preprocessing_pipeline(config: Dict) -> Pipeline:
    """Build sklearn pipeline for reproducible preprocessing."""
    numeric_transformer = Pipeline([
        ("scaler", StandardScaler()),
    ])
    categorical_transformer = Pipeline([
        ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ])

    preprocessor = ColumnTransformer([
        ("num", numeric_transformer, config["numeric_features"]),
        ("cat", categorical_transformer, config["categorical_features"]),
    ])

    return Pipeline([
        ("feature_engineer", FeatureEngineer(config)),
        ("preprocessor", preprocessor),
    ])

Model Training with Experiment Tracking

# src/models/train.py
import mlflow
import mlflow.sklearn
import mlflow.pytorch
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report
)
import numpy as np
import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class TrainingResult:
    model: Any
    metrics: Dict[str, float]
    run_id: str
    model_version: Optional[str] = None


class ModelTrainer:
    """Production model training with MLflow tracking."""

    def __init__(self, experiment_name: str, tracking_uri: str = "http://localhost:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        self.experiment_name = experiment_name

    def train_and_evaluate(
        self,
        model,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_test: np.ndarray,
        y_test: np.ndarray,
        params: Dict[str, Any],
        tags: Optional[Dict[str, str]] = None,
    ) -> TrainingResult:
        """Train model with full experiment tracking."""
        with mlflow.start_run(tags=tags or {}) as run:
            # Log parameters
            mlflow.log_params(params)
            mlflow.log_param("model_type", type(model).__name__)
            mlflow.log_param("train_size", len(X_train))
            mlflow.log_param("test_size", len(X_test))

            # Cross-validation on training set
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring="f1_weighted")
            mlflow.log_metric("cv_f1_mean", cv_scores.mean())
            mlflow.log_metric("cv_f1_std", cv_scores.std())

            # Train on full training set
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else None

            # Calculate metrics
            metrics = {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred, average="weighted"),
                "recall": recall_score(y_test, y_pred, average="weighted"),
                "f1": f1_score(y_test, y_pred, average="weighted"),
            }
            if y_proba is not None:
                metrics["auc_roc"] = roc_auc_score(y_test, y_proba)

            # Log metrics
            mlflow.log_metrics(metrics)

            # Log confusion matrix as artifact
            cm = confusion_matrix(y_test, y_pred)
            cm_path = "/tmp/confusion_matrix.json"
            with open(cm_path, "w") as f:
                json.dump(cm.tolist(), f)
            mlflow.log_artifact(cm_path)

            # Log classification report
            report = classification_report(y_test, y_pred, output_dict=True)
            report_path = "/tmp/classification_report.json"
            with open(report_path, "w") as f:
                json.dump(report, f, indent=2)
            mlflow.log_artifact(report_path)

            # Log model
            mlflow.sklearn.log_model(
                model, "model",
                registered_model_name=f"{self.experiment_name}_model",
            )

            logger.info(f"Run {run.info.run_id} — F1: {metrics['f1']:.4f}, AUC: {metrics.get('auc_roc', 'N/A')}")

            return TrainingResult(
                model=model,
                metrics=metrics,
                run_id=run.info.run_id,
            )

    def hyperparameter_search(
        self,
        model_class,
        param_grid: Dict[str, list],
        X_train, y_train, X_test, y_test,
        n_trials: int = 50,
    ) -> TrainingResult:
        """Hyperparameter optimization with Optuna + MLflow."""
        import optuna

        def objective(trial):
            params = {}
            for name, values in param_grid.items():
                if isinstance(values[0], int):
                    params[name] = trial.suggest_int(name, values[0], values[-1])
                elif isinstance(values[0], float):
                    params[name] = trial.suggest_float(name, values[0], values[-1], log=True)
                else:
                    params[name] = trial.suggest_categorical(name, values)

            model = model_class(**params)
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            scores = cross_val_score(model, X_train, y_train, cv=cv, scoring="f1_weighted")
            return scores.mean()

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials)

        best_params = study.best_params
        logger.info(f"Best params: {best_params}, Best F1: {study.best_value:.4f}")

        return self.train_and_evaluate(
            model_class(**best_params),
            X_train, y_train, X_test, y_test,
            params=best_params,
            tags={"optimization": "optuna", "n_trials": str(n_trials)},
        )

Model Serving API

# src/serving/api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import mlflow
import numpy as np
import time
import logging
from prometheus_client import Counter, Histogram, generate_latest

logger = logging.getLogger(__name__)
app = FastAPI(title="ML Model Serving API", version="1.0.0")

# Prometheus metrics
PREDICTION_COUNT = Counter("predictions_total", "Total predictions", ["model", "status"])
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Prediction latency", ["model"])
FEATURE_DRIFT = Counter("feature_drift_detected", "Feature drift events", ["feature"])


class PredictionRequest(BaseModel):
    features: Dict[str, Any]
    model_version: Optional[str] = "latest"

class PredictionResponse(BaseModel):
    prediction: Any
    probability: Optional[float] = None
    model_version: str
    latency_ms: float

class BatchPredictionRequest(BaseModel):
    instances: List[Dict[str, Any]]
    model_version: Optional[str] = "latest"


class ModelManager:
    """Manages model loading and version switching."""

    def __init__(self, model_name: str, tracking_uri: str):
        mlflow.set_tracking_uri(tracking_uri)
        self.model_name = model_name
        self.models: Dict[str, Any] = {}
        self.active_version = "latest"

    def load_model(self, version: str = "latest") -> Any:
        if version not in self.models:
            if version == "latest":
                model_uri = f"models:/{self.model_name}/Production"
            else:
                model_uri = f"models:/{self.model_name}/{version}"
            self.models[version] = mlflow.pyfunc.load_model(model_uri)
            logger.info(f"Loaded model {self.model_name} version {version}")
        return self.models[version]

    def predict(self, features: Dict[str, Any], version: str = "latest") -> Dict:
        model = self.load_model(version)
        import pandas as pd
        input_df = pd.DataFrame([features])
        prediction = model.predict(input_df)
        return {"prediction": prediction[0].item() if hasattr(prediction[0], "item") else prediction[0]}


model_manager = ModelManager(
    model_name="production_model",
    tracking_uri="http://localhost:5000",
)


@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest, background_tasks: BackgroundTasks):
    start_time = time.time()
    try:
        result = model_manager.predict(request.features, request.model_version or "latest")
        latency = (time.time() - start_time) * 1000

        PREDICTION_COUNT.labels(model=request.model_version, status="success").inc()
        PREDICTION_LATENCY.labels(model=request.model_version).observe(latency / 1000)

        # Log for monitoring in background
        background_tasks.add_task(log_prediction, request.features, result, latency)

        return PredictionResponse(
            prediction=result["prediction"],
            probability=result.get("probability"),
            model_version=request.model_version or "latest",
            latency_ms=round(latency, 2),
        )
    except Exception as e:
        PREDICTION_COUNT.labels(model=request.model_version, status="error").inc()
        logger.error(f"Prediction error: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/predict/batch")
async def batch_predict(request: BatchPredictionRequest):
    results = []
    for instance in request.instances:
        result = model_manager.predict(instance, request.model_version or "latest")
        results.append(result)
    return {"predictions": results, "count": len(results)}


@app.get("/health")
async def health():
    return {"status": "healthy", "model": model_manager.model_name}


@app.get("/metrics")
async def metrics():
    return generate_latest()


async def log_prediction(features, result, latency):
    """Background task to log predictions for monitoring."""
    # Store in prediction log for drift detection
    pass

Data and Model Drift Detection

# src/monitoring/drift.py
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class DriftResult:
    feature: str
    drift_detected: bool
    p_value: float
    statistic: float
    test_type: str
    severity: str  # low, medium, high


class DriftDetector:
    """Detect data and model drift in production."""

    def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.05):
        self.reference = reference_data
        self.threshold = threshold
        self.reference_stats = self._compute_stats(reference_data)

    def _compute_stats(self, df: pd.DataFrame) -> Dict:
        stats_dict = {}
        for col in df.select_dtypes(include=[np.number]).columns:
            stats_dict[col] = {
                "mean": df[col].mean(),
                "std": df[col].std(),
                "median": df[col].median(),
                "q25": df[col].quantile(0.25),
                "q75": df[col].quantile(0.75),
            }
        return stats_dict

    def detect_feature_drift(self, current_data: pd.DataFrame) -> List[DriftResult]:
        """Run statistical tests for each feature."""
        results = []
        for col in self.reference.select_dtypes(include=[np.number]).columns:
            if col not in current_data.columns:
                continue

            ref_values = self.reference[col].dropna()
            cur_values = current_data[col].dropna()

            # Kolmogorov-Smirnov test
            ks_stat, ks_pvalue = stats.ks_2samp(ref_values, cur_values)

            # Population Stability Index
            psi = self._calculate_psi(ref_values, cur_values)

            drift_detected = ks_pvalue < self.threshold or psi > 0.2
            severity = "low"
            if psi > 0.25:
                severity = "high"
            elif psi > 0.1:
                severity = "medium"

            results.append(DriftResult(
                feature=col,
                drift_detected=drift_detected,
                p_value=ks_pvalue,
                statistic=ks_stat,
                test_type="KS + PSI",
                severity=severity,
            ))

            if drift_detected:
                logger.warning(f"Drift detected in {col}: PSI={psi:.4f}, KS p={ks_pvalue:.4f}")

        return results

    def _calculate_psi(self, reference: pd.Series, current: pd.Series, bins: int = 10) -> float:
        """Population Stability Index — measures distribution shift."""
        breakpoints = np.linspace(
            min(reference.min(), current.min()),
            max(reference.max(), current.max()),
            bins + 1,
        )
        ref_counts = np.histogram(reference, bins=breakpoints)[0] / len(reference)
        cur_counts = np.histogram(current, bins=breakpoints)[0] / len(current)

        # Avoid division by zero
        ref_counts = np.clip(ref_counts, 1e-6, None)
        cur_counts = np.clip(cur_counts, 1e-6, None)

        psi = np.sum((cur_counts - ref_counts) * np.log(cur_counts / ref_counts))
        return psi

    def detect_prediction_drift(
        self,
        reference_predictions: np.ndarray,
        current_predictions: np.ndarray,
    ) -> DriftResult:
        """Detect drift in model outputs."""
        ks_stat, ks_pvalue = stats.ks_2samp(reference_predictions, current_predictions)
        psi = self._calculate_psi(
            pd.Series(reference_predictions),
            pd.Series(current_predictions),
        )

        return DriftResult(
            feature="model_predictions",
            drift_detected=ks_pvalue < self.threshold or psi > 0.2,
            p_value=ks_pvalue,
            statistic=psi,
            test_type="KS + PSI",
            severity="high" if psi > 0.25 else "medium" if psi > 0.1 else "low",
        )

Training Pipeline (Airflow DAG)

# pipelines/training_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["[email protected]"],
}

dag = DAG(
    "model_training_pipeline",
    default_args=default_args,
    description="End-to-end model training pipeline",
    schedule_interval="@weekly",
    start_date=days_ago(1),
    catchup=False,
    tags=["ml", "training"],
)


def validate_data(**context):
    """Validate input data quality before training."""
    import great_expectations as gx

    context_gx = gx.get_context()
    result = context_gx.run_checkpoint(checkpoint_name="training_data_check")
    if not result.success:
        raise ValueError("Data validation failed — aborting training")
    return {"validation": "passed", "rows": result.statistics["evaluated_expectations"]}


def extract_features(**context):
    """Run feature engineering pipeline."""
    from src.features.engineering import build_preprocessing_pipeline
    import yaml
    import joblib

    with open("configs/feature_config.yaml") as f:
        config = yaml.safe_load(f)

    pipeline = build_preprocessing_pipeline(config)
    # Load and transform data...
    joblib.dump(pipeline, "/tmp/feature_pipeline.joblib")
    return {"features_generated": True}


def train_model(**context):
    """Train model with hyperparameter optimization."""
    from src.models.train import ModelTrainer
    from sklearn.ensemble import GradientBoostingClassifier

    trainer = ModelTrainer("production_model")
    # Load data, train, evaluate...
    return {"run_id": "abc123", "f1": 0.92}


def evaluate_champion(**context):
    """Compare new model against current production model."""
    ti = context["ti"]
    training_result = ti.xcom_pull(task_ids="train_model")
    new_f1 = training_result["f1"]

    # Get current production model metrics
    current_f1 = 0.90  # From model registry
    improvement = new_f1 - current_f1

    if improvement < 0.01:
        raise ValueError(f"New model F1 {new_f1:.4f} not significantly better than current {current_f1:.4f}")
    return {"promote": True, "improvement": improvement}


def deploy_model(**context):
    """Promote new model to production."""
    import mlflow

    ti = context["ti"]
    training_result = ti.xcom_pull(task_ids="train_model")
    client = mlflow.MlflowClient()
    client.transition_model_version_stage(
        name="production_model",
        version=training_result["run_id"],
        stage="Production",
    )


validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
features = PythonOperator(task_id="extract_features", python_callable=extract_features, dag=dag)
train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
evaluate = PythonOperator(task_id="evaluate_champion", python_callable=evaluate_champion, dag=dag)
deploy = PythonOperator(task_id="deploy_model", python_callable=deploy_model, dag=dag)

validate >> features >> train >> evaluate >> deploy

PyTorch Training Loop (Deep Learning)

# src/models/pytorch_trainer.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from typing import Dict, Optional
import mlflow
import logging

logger = logging.getLogger(__name__)


class EarlyStopping:
    """Stop training when validation loss stops improving."""

    def __init__(self, patience: int = 5, min_delta: float = 1e-4):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float("inf")
        self.should_stop = False

    def __call__(self, val_loss: float):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True


def train_pytorch_model(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    config: Dict,
) -> nn.Module:
    """Production PyTorch training with MLflow tracking."""

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config["learning_rate"],
        weight_decay=config.get("weight_decay", 0.01),
    )
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=config["epochs"]
    )
    criterion = nn.CrossEntropyLoss()
    early_stopping = EarlyStopping(patience=config.get("patience", 5))

    with mlflow.start_run():
        mlflow.log_params(config)

        for epoch in range(config["epochs"]):
            # Training
            model.train()
            train_loss = 0.0
            for batch_X, batch_y in train_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                train_loss += loss.item()

            # Validation
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0
            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                    outputs = model(batch_X)
                    val_loss += criterion(outputs, batch_y).item()
                    _, predicted = torch.max(outputs, 1)
                    total += batch_y.size(0)
                    correct += (predicted == batch_y).sum().item()

            avg_train = train_loss / len(train_loader)
            avg_val = val_loss / len(val_loader)
            accuracy = correct / total

            mlflow.log_metrics({
                "train_loss": avg_train,
                "val_loss": avg_val,
                "val_accuracy": accuracy,
                "learning_rate": scheduler.get_last_lr()[0],
            }, step=epoch)

            logger.info(f"Epoch {epoch+1}: train_loss={avg_train:.4f}, val_loss={avg_val:.4f}, acc={accuracy:.4f}")

            scheduler.step()
            early_stopping(avg_val)
            if early_stopping.should_stop:
                logger.info(f"Early stopping at epoch {epoch+1}")
                break

        mlflow.pytorch.log_model(model, "model")

    return model

Data Validation with Great Expectations

# src/data/validation.py
import great_expectations as gx
import pandas as pd
from typing import Dict, List


def build_training_expectations(context: gx.DataContext, suite_name: str = "training_data"):
    """Define data quality expectations for training data."""
    suite = context.add_expectation_suite(suite_name)

    # Schema expectations
    suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchSet(
        column_set=["user_id", "feature_a", "feature_b", "target"],
    ))

    # Completeness
    suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="target"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(
        column="feature_a", mostly=0.95,  # Allow 5% nulls
    ))

    # Value ranges
    suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
        column="feature_a", min_value=0, max_value=1000,
    ))

    # Distribution checks
    suite.add_expectation(gx.expectations.ExpectColumnMeanToBeBetween(
        column="feature_b", min_value=10, max_value=100,
    ))

    # Target balance
    suite.add_expectation(gx.expectations.ExpectColumnDistinctValuesToBeInSet(
        column="target", value_set=[0, 1],
    ))

    return suite

Anti-Patterns to Avoid

  1. ❌ Training on all data without a holdout — always keep a test set completely separate
  2. ❌ Using accuracy for imbalanced datasets — use F1, precision/recall, AUC-ROC instead
  3. ❌ Feature leakage — never use target-derived features or future data in training
  4. ❌ Fitting transformers on test data — fit only on training, transform both
  5. ❌ No data validation before training — garbage in, garbage out
  6. ❌ Deploying without monitoring — models degrade silently in production
  7. ❌ Notebook-to-production copy-paste — refactor into proper modules with tests
  8. ❌ Ignoring latency requirements — a perfect model that's too slow is useless
  9. ❌ Manual model promotion — automate with gated checks and champion/challenger
  10. ❌ Not versioning data alongside models — you can't reproduce without both

Key Metrics to Track

Stage Metrics
Data Quality Null rates, schema violations, distribution shifts
Training Loss curves, CV scores, training time
Evaluation F1, AUC-ROC, precision/recall per class
Serving P50/P95/P99 latency, throughput, error rate
Production Prediction drift, feature drift, business KPIs

Adapted from buildwithclaude by Dave Poon (MIT)

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.