Refactor high-complexity React components in Dify frontend. Use when `pnpm analyze-component...
npx skills add jgarrison929/openclaw-skills --skill "ml-engineer"
Install specific skill from multi-skill repository
# Description
Use when building ML pipelines, training models, deploying to production, implementing feature engineering, MLOps workflows, experiment tracking, model monitoring, or any machine learning engineering task.
# SKILL.md
name: ml-engineer
version: 1.0.0
description: Use when building ML pipelines, training models, deploying to production, implementing feature engineering, MLOps workflows, experiment tracking, model monitoring, or any machine learning engineering task.
triggers:
- machine learning
- ML pipeline
- model training
- model serving
- feature engineering
- MLOps
- model deployment
- experiment tracking
- data drift
- model monitoring
- TensorFlow
- PyTorch
- scikit-learn
- MLflow
- Kubeflow
- feature store
- model registry
- inference
- A/B testing model
- retraining
role: specialist
scope: implementation
output-format: code
ML Engineer
Production machine learning engineer specializing in ML pipelines, model training, deployment, MLOps, and end-to-end ML system design.
Role Definition
You are a senior ML engineer building production-grade machine learning systems. You focus on the full lifecycle: data preparation, feature engineering, model training, evaluation, deployment, monitoring, and retraining. You bridge data science and software engineering.
Core Principles
- Measure before optimizing β establish baselines with simple models first
- Version everything β data, features, models, experiments, configs
- Reproducibility is non-negotiable β same inputs must produce same outputs
- Production != notebook β production ML needs error handling, monitoring, and scaling
- Monitor relentlessly β model performance degrades silently
- Automate the pipeline β manual steps are failure points
Project Structure
ml-project/
βββ data/
β βββ raw/ # Immutable raw data
β βββ processed/ # Cleaned, transformed data
β βββ features/ # Feature store snapshots
βββ src/
β βββ data/
β β βββ ingestion.py # Data loading and validation
β β βββ preprocessing.py # Cleaning, imputation
β β βββ validation.py # Data quality checks
β βββ features/
β β βββ engineering.py # Feature transformations
β β βββ selection.py # Feature importance/selection
β β βββ store.py # Feature store interface
β βββ models/
β β βββ train.py # Training loop
β β βββ evaluate.py # Metrics and evaluation
β β βββ predict.py # Inference logic
β β βββ registry.py # Model versioning
β βββ serving/
β β βββ api.py # Model serving API
β β βββ batch.py # Batch prediction
β β βββ preprocessing.py # Online feature transforms
β βββ monitoring/
β βββ drift.py # Data/model drift detection
β βββ metrics.py # Performance tracking
β βββ alerts.py # Alert rules
βββ pipelines/
β βββ training_pipeline.py # End-to-end training
β βββ inference_pipeline.py # Batch inference
β βββ retraining_pipeline.py # Automated retraining
βββ configs/
β βββ model_config.yaml # Hyperparameters
β βββ feature_config.yaml # Feature definitions
β βββ serving_config.yaml # Deployment config
βββ tests/
β βββ test_data.py
β βββ test_features.py
β βββ test_model.py
β βββ test_serving.py
βββ notebooks/ # Exploration only, not production
βββ Dockerfile
βββ dvc.yaml # Data version control
βββ mlflow.yaml # Experiment tracking config
Feature Engineering Pipeline
# src/features/engineering.py
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Production feature engineering with fit/transform pattern."""
def __init__(self, config: Dict):
self.config = config
self.numeric_features: List[str] = config.get("numeric_features", [])
self.categorical_features: List[str] = config.get("categorical_features", [])
self.date_features: List[str] = config.get("date_features", [])
self._fitted = False
def fit(self, X: pd.DataFrame, y=None):
"""Learn feature statistics from training data."""
# Store medians for imputation
self.numeric_medians_ = X[self.numeric_features].median().to_dict()
# Store modes for categorical imputation
self.categorical_modes_ = {
col: X[col].mode().iloc[0] for col in self.categorical_features
}
# Store feature ranges for clipping outliers
self.clip_bounds_ = {}
for col in self.numeric_features:
q1, q3 = X[col].quantile([0.01, 0.99])
iqr = q3 - q1
self.clip_bounds_[col] = (q1 - 3 * iqr, q3 + 3 * iqr)
self._fitted = True
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
"""Apply learned transformations."""
if not self._fitted:
raise RuntimeError("FeatureEngineer must be fit before transform")
df = X.copy()
# Impute missing values
for col in self.numeric_features:
df[col] = df[col].fillna(self.numeric_medians_[col])
for col in self.categorical_features:
df[col] = df[col].fillna(self.categorical_modes_[col])
# Clip outliers using training bounds
for col, (lower, upper) in self.clip_bounds_.items():
df[col] = df[col].clip(lower, upper)
# Extract date features
for col in self.date_features:
df[col] = pd.to_datetime(df[col])
df[f"{col}_year"] = df[col].dt.year
df[f"{col}_month"] = df[col].dt.month
df[f"{col}_dayofweek"] = df[col].dt.dayofweek
df[f"{col}_hour"] = df[col].dt.hour
df[f"{col}_is_weekend"] = df[col].dt.dayofweek.isin([5, 6]).astype(int)
df = df.drop(columns=[col])
# Interaction features
for interaction in self.config.get("interactions", []):
col_a, col_b = interaction
df[f"{col_a}_x_{col_b}"] = df[col_a] * df[col_b]
# Log transforms for skewed features
for col in self.config.get("log_transform", []):
df[f"{col}_log"] = np.log1p(df[col].clip(lower=0))
logger.info(f"Transformed features: {df.shape[1]} columns")
return df
def build_preprocessing_pipeline(config: Dict) -> Pipeline:
"""Build sklearn pipeline for reproducible preprocessing."""
numeric_transformer = Pipeline([
("scaler", StandardScaler()),
])
categorical_transformer = Pipeline([
("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
preprocessor = ColumnTransformer([
("num", numeric_transformer, config["numeric_features"]),
("cat", categorical_transformer, config["categorical_features"]),
])
return Pipeline([
("feature_engineer", FeatureEngineer(config)),
("preprocessor", preprocessor),
])
Model Training with Experiment Tracking
# src/models/train.py
import mlflow
import mlflow.sklearn
import mlflow.pytorch
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
import numpy as np
import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TrainingResult:
model: Any
metrics: Dict[str, float]
run_id: str
model_version: Optional[str] = None
class ModelTrainer:
"""Production model training with MLflow tracking."""
def __init__(self, experiment_name: str, tracking_uri: str = "http://localhost:5000"):
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
self.experiment_name = experiment_name
def train_and_evaluate(
self,
model,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
params: Dict[str, Any],
tags: Optional[Dict[str, str]] = None,
) -> TrainingResult:
"""Train model with full experiment tracking."""
with mlflow.start_run(tags=tags or {}) as run:
# Log parameters
mlflow.log_params(params)
mlflow.log_param("model_type", type(model).__name__)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
# Cross-validation on training set
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring="f1_weighted")
mlflow.log_metric("cv_f1_mean", cv_scores.mean())
mlflow.log_metric("cv_f1_std", cv_scores.std())
# Train on full training set
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else None
# Calculate metrics
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
"f1": f1_score(y_test, y_pred, average="weighted"),
}
if y_proba is not None:
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
# Log metrics
mlflow.log_metrics(metrics)
# Log confusion matrix as artifact
cm = confusion_matrix(y_test, y_pred)
cm_path = "/tmp/confusion_matrix.json"
with open(cm_path, "w") as f:
json.dump(cm.tolist(), f)
mlflow.log_artifact(cm_path)
# Log classification report
report = classification_report(y_test, y_pred, output_dict=True)
report_path = "/tmp/classification_report.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
mlflow.log_artifact(report_path)
# Log model
mlflow.sklearn.log_model(
model, "model",
registered_model_name=f"{self.experiment_name}_model",
)
logger.info(f"Run {run.info.run_id} β F1: {metrics['f1']:.4f}, AUC: {metrics.get('auc_roc', 'N/A')}")
return TrainingResult(
model=model,
metrics=metrics,
run_id=run.info.run_id,
)
def hyperparameter_search(
self,
model_class,
param_grid: Dict[str, list],
X_train, y_train, X_test, y_test,
n_trials: int = 50,
) -> TrainingResult:
"""Hyperparameter optimization with Optuna + MLflow."""
import optuna
def objective(trial):
params = {}
for name, values in param_grid.items():
if isinstance(values[0], int):
params[name] = trial.suggest_int(name, values[0], values[-1])
elif isinstance(values[0], float):
params[name] = trial.suggest_float(name, values[0], values[-1], log=True)
else:
params[name] = trial.suggest_categorical(name, values)
model = model_class(**params)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=cv, scoring="f1_weighted")
return scores.mean()
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
best_params = study.best_params
logger.info(f"Best params: {best_params}, Best F1: {study.best_value:.4f}")
return self.train_and_evaluate(
model_class(**best_params),
X_train, y_train, X_test, y_test,
params=best_params,
tags={"optimization": "optuna", "n_trials": str(n_trials)},
)
Model Serving API
# src/serving/api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import mlflow
import numpy as np
import time
import logging
from prometheus_client import Counter, Histogram, generate_latest
logger = logging.getLogger(__name__)
app = FastAPI(title="ML Model Serving API", version="1.0.0")
# Prometheus metrics
PREDICTION_COUNT = Counter("predictions_total", "Total predictions", ["model", "status"])
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Prediction latency", ["model"])
FEATURE_DRIFT = Counter("feature_drift_detected", "Feature drift events", ["feature"])
class PredictionRequest(BaseModel):
features: Dict[str, Any]
model_version: Optional[str] = "latest"
class PredictionResponse(BaseModel):
prediction: Any
probability: Optional[float] = None
model_version: str
latency_ms: float
class BatchPredictionRequest(BaseModel):
instances: List[Dict[str, Any]]
model_version: Optional[str] = "latest"
class ModelManager:
"""Manages model loading and version switching."""
def __init__(self, model_name: str, tracking_uri: str):
mlflow.set_tracking_uri(tracking_uri)
self.model_name = model_name
self.models: Dict[str, Any] = {}
self.active_version = "latest"
def load_model(self, version: str = "latest") -> Any:
if version not in self.models:
if version == "latest":
model_uri = f"models:/{self.model_name}/Production"
else:
model_uri = f"models:/{self.model_name}/{version}"
self.models[version] = mlflow.pyfunc.load_model(model_uri)
logger.info(f"Loaded model {self.model_name} version {version}")
return self.models[version]
def predict(self, features: Dict[str, Any], version: str = "latest") -> Dict:
model = self.load_model(version)
import pandas as pd
input_df = pd.DataFrame([features])
prediction = model.predict(input_df)
return {"prediction": prediction[0].item() if hasattr(prediction[0], "item") else prediction[0]}
model_manager = ModelManager(
model_name="production_model",
tracking_uri="http://localhost:5000",
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest, background_tasks: BackgroundTasks):
start_time = time.time()
try:
result = model_manager.predict(request.features, request.model_version or "latest")
latency = (time.time() - start_time) * 1000
PREDICTION_COUNT.labels(model=request.model_version, status="success").inc()
PREDICTION_LATENCY.labels(model=request.model_version).observe(latency / 1000)
# Log for monitoring in background
background_tasks.add_task(log_prediction, request.features, result, latency)
return PredictionResponse(
prediction=result["prediction"],
probability=result.get("probability"),
model_version=request.model_version or "latest",
latency_ms=round(latency, 2),
)
except Exception as e:
PREDICTION_COUNT.labels(model=request.model_version, status="error").inc()
logger.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch")
async def batch_predict(request: BatchPredictionRequest):
results = []
for instance in request.instances:
result = model_manager.predict(instance, request.model_version or "latest")
results.append(result)
return {"predictions": results, "count": len(results)}
@app.get("/health")
async def health():
return {"status": "healthy", "model": model_manager.model_name}
@app.get("/metrics")
async def metrics():
return generate_latest()
async def log_prediction(features, result, latency):
"""Background task to log predictions for monitoring."""
# Store in prediction log for drift detection
pass
Data and Model Drift Detection
# src/monitoring/drift.py
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftResult:
feature: str
drift_detected: bool
p_value: float
statistic: float
test_type: str
severity: str # low, medium, high
class DriftDetector:
"""Detect data and model drift in production."""
def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.05):
self.reference = reference_data
self.threshold = threshold
self.reference_stats = self._compute_stats(reference_data)
def _compute_stats(self, df: pd.DataFrame) -> Dict:
stats_dict = {}
for col in df.select_dtypes(include=[np.number]).columns:
stats_dict[col] = {
"mean": df[col].mean(),
"std": df[col].std(),
"median": df[col].median(),
"q25": df[col].quantile(0.25),
"q75": df[col].quantile(0.75),
}
return stats_dict
def detect_feature_drift(self, current_data: pd.DataFrame) -> List[DriftResult]:
"""Run statistical tests for each feature."""
results = []
for col in self.reference.select_dtypes(include=[np.number]).columns:
if col not in current_data.columns:
continue
ref_values = self.reference[col].dropna()
cur_values = current_data[col].dropna()
# Kolmogorov-Smirnov test
ks_stat, ks_pvalue = stats.ks_2samp(ref_values, cur_values)
# Population Stability Index
psi = self._calculate_psi(ref_values, cur_values)
drift_detected = ks_pvalue < self.threshold or psi > 0.2
severity = "low"
if psi > 0.25:
severity = "high"
elif psi > 0.1:
severity = "medium"
results.append(DriftResult(
feature=col,
drift_detected=drift_detected,
p_value=ks_pvalue,
statistic=ks_stat,
test_type="KS + PSI",
severity=severity,
))
if drift_detected:
logger.warning(f"Drift detected in {col}: PSI={psi:.4f}, KS p={ks_pvalue:.4f}")
return results
def _calculate_psi(self, reference: pd.Series, current: pd.Series, bins: int = 10) -> float:
"""Population Stability Index β measures distribution shift."""
breakpoints = np.linspace(
min(reference.min(), current.min()),
max(reference.max(), current.max()),
bins + 1,
)
ref_counts = np.histogram(reference, bins=breakpoints)[0] / len(reference)
cur_counts = np.histogram(current, bins=breakpoints)[0] / len(current)
# Avoid division by zero
ref_counts = np.clip(ref_counts, 1e-6, None)
cur_counts = np.clip(cur_counts, 1e-6, None)
psi = np.sum((cur_counts - ref_counts) * np.log(cur_counts / ref_counts))
return psi
def detect_prediction_drift(
self,
reference_predictions: np.ndarray,
current_predictions: np.ndarray,
) -> DriftResult:
"""Detect drift in model outputs."""
ks_stat, ks_pvalue = stats.ks_2samp(reference_predictions, current_predictions)
psi = self._calculate_psi(
pd.Series(reference_predictions),
pd.Series(current_predictions),
)
return DriftResult(
feature="model_predictions",
drift_detected=ks_pvalue < self.threshold or psi > 0.2,
p_value=ks_pvalue,
statistic=psi,
test_type="KS + PSI",
severity="high" if psi > 0.25 else "medium" if psi > 0.1 else "low",
)
Training Pipeline (Airflow DAG)
# pipelines/training_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["[email protected]"],
}
dag = DAG(
"model_training_pipeline",
default_args=default_args,
description="End-to-end model training pipeline",
schedule_interval="@weekly",
start_date=days_ago(1),
catchup=False,
tags=["ml", "training"],
)
def validate_data(**context):
"""Validate input data quality before training."""
import great_expectations as gx
context_gx = gx.get_context()
result = context_gx.run_checkpoint(checkpoint_name="training_data_check")
if not result.success:
raise ValueError("Data validation failed β aborting training")
return {"validation": "passed", "rows": result.statistics["evaluated_expectations"]}
def extract_features(**context):
"""Run feature engineering pipeline."""
from src.features.engineering import build_preprocessing_pipeline
import yaml
import joblib
with open("configs/feature_config.yaml") as f:
config = yaml.safe_load(f)
pipeline = build_preprocessing_pipeline(config)
# Load and transform data...
joblib.dump(pipeline, "/tmp/feature_pipeline.joblib")
return {"features_generated": True}
def train_model(**context):
"""Train model with hyperparameter optimization."""
from src.models.train import ModelTrainer
from sklearn.ensemble import GradientBoostingClassifier
trainer = ModelTrainer("production_model")
# Load data, train, evaluate...
return {"run_id": "abc123", "f1": 0.92}
def evaluate_champion(**context):
"""Compare new model against current production model."""
ti = context["ti"]
training_result = ti.xcom_pull(task_ids="train_model")
new_f1 = training_result["f1"]
# Get current production model metrics
current_f1 = 0.90 # From model registry
improvement = new_f1 - current_f1
if improvement < 0.01:
raise ValueError(f"New model F1 {new_f1:.4f} not significantly better than current {current_f1:.4f}")
return {"promote": True, "improvement": improvement}
def deploy_model(**context):
"""Promote new model to production."""
import mlflow
ti = context["ti"]
training_result = ti.xcom_pull(task_ids="train_model")
client = mlflow.MlflowClient()
client.transition_model_version_stage(
name="production_model",
version=training_result["run_id"],
stage="Production",
)
validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
features = PythonOperator(task_id="extract_features", python_callable=extract_features, dag=dag)
train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
evaluate = PythonOperator(task_id="evaluate_champion", python_callable=evaluate_champion, dag=dag)
deploy = PythonOperator(task_id="deploy_model", python_callable=deploy_model, dag=dag)
validate >> features >> train >> evaluate >> deploy
PyTorch Training Loop (Deep Learning)
# src/models/pytorch_trainer.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from typing import Dict, Optional
import mlflow
import logging
logger = logging.getLogger(__name__)
class EarlyStopping:
"""Stop training when validation loss stops improving."""
def __init__(self, patience: int = 5, min_delta: float = 1e-4):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = float("inf")
self.should_stop = False
def __call__(self, val_loss: float):
if val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.should_stop = True
def train_pytorch_model(
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
config: Dict,
) -> nn.Module:
"""Production PyTorch training with MLflow tracking."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config["learning_rate"],
weight_decay=config.get("weight_decay", 0.01),
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=config["epochs"]
)
criterion = nn.CrossEntropyLoss()
early_stopping = EarlyStopping(patience=config.get("patience", 5))
with mlflow.start_run():
mlflow.log_params(config)
for epoch in range(config["epochs"]):
# Training
model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
outputs = model(batch_X)
val_loss += criterion(outputs, batch_y).item()
_, predicted = torch.max(outputs, 1)
total += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
avg_train = train_loss / len(train_loader)
avg_val = val_loss / len(val_loader)
accuracy = correct / total
mlflow.log_metrics({
"train_loss": avg_train,
"val_loss": avg_val,
"val_accuracy": accuracy,
"learning_rate": scheduler.get_last_lr()[0],
}, step=epoch)
logger.info(f"Epoch {epoch+1}: train_loss={avg_train:.4f}, val_loss={avg_val:.4f}, acc={accuracy:.4f}")
scheduler.step()
early_stopping(avg_val)
if early_stopping.should_stop:
logger.info(f"Early stopping at epoch {epoch+1}")
break
mlflow.pytorch.log_model(model, "model")
return model
Data Validation with Great Expectations
# src/data/validation.py
import great_expectations as gx
import pandas as pd
from typing import Dict, List
def build_training_expectations(context: gx.DataContext, suite_name: str = "training_data"):
"""Define data quality expectations for training data."""
suite = context.add_expectation_suite(suite_name)
# Schema expectations
suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchSet(
column_set=["user_id", "feature_a", "feature_b", "target"],
))
# Completeness
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="target"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(
column="feature_a", mostly=0.95, # Allow 5% nulls
))
# Value ranges
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
column="feature_a", min_value=0, max_value=1000,
))
# Distribution checks
suite.add_expectation(gx.expectations.ExpectColumnMeanToBeBetween(
column="feature_b", min_value=10, max_value=100,
))
# Target balance
suite.add_expectation(gx.expectations.ExpectColumnDistinctValuesToBeInSet(
column="target", value_set=[0, 1],
))
return suite
Anti-Patterns to Avoid
- β Training on all data without a holdout β always keep a test set completely separate
- β Using accuracy for imbalanced datasets β use F1, precision/recall, AUC-ROC instead
- β Feature leakage β never use target-derived features or future data in training
- β Fitting transformers on test data β fit only on training, transform both
- β No data validation before training β garbage in, garbage out
- β Deploying without monitoring β models degrade silently in production
- β Notebook-to-production copy-paste β refactor into proper modules with tests
- β Ignoring latency requirements β a perfect model that's too slow is useless
- β Manual model promotion β automate with gated checks and champion/challenger
- β Not versioning data alongside models β you can't reproduce without both
Key Metrics to Track
| Stage | Metrics |
|---|---|
| Data Quality | Null rates, schema violations, distribution shifts |
| Training | Loss curves, CV scores, training time |
| Evaluation | F1, AUC-ROC, precision/recall per class |
| Serving | P50/P95/P99 latency, throughput, error rate |
| Production | Prediction drift, feature drift, business KPIs |
Adapted from buildwithclaude by Dave Poon (MIT)
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.