ramidamolis-alt

ml-pipeline

0
0
# Install this skill:
npx skills add ramidamolis-alt/agent-skills-workflows --skill "ml-pipeline"

Install specific skill from multi-skill repository

# Description

Advanced Machine Learning Pipeline skill - Data preprocessing, model selection, training workflows, real-time inference, and MLOps integration. Use when building ML models, analyzing data, or implementing predictive features.

# SKILL.md


name: ml-pipeline
description: Advanced Machine Learning Pipeline skill - Data preprocessing, model selection, training workflows, real-time inference, and MLOps integration. Use when building ML models, analyzing data, or implementing predictive features.
triggers: ["ml", "machine learning", "model", "train", "predict", "neural", "data science", "โมเดล", "เทรน"]


🤖 ML Pipeline Master Skill

Expert in building production-grade machine learning pipelines with best practices from Google, Meta, and OpenAI.


Capability Overview

capabilities:
  data:
    - preprocessing: "Cleaning, normalization, feature engineering"
    - validation: "Schema validation, drift detection"
    - versioning: "DVC, Delta Lake patterns"

  modeling:
    - selection: "Algorithm selection heuristics"
    - training: "Distributed training patterns"
    - tuning: "Hyperparameter optimization"
    - ensemble: "Model combination strategies"

  deployment:
    - serving: "Real-time and batch inference"
    - monitoring: "Performance tracking, drift detection"
    - versioning: "Model registry patterns"

  mlops:
    - ci_cd: "Continuous training pipelines"
    - experiment_tracking: "MLflow, W&B patterns"
    - governance: "Model cards, audit trails"

Data Pipeline Patterns

Data Preprocessing with MCP

# Using MCP servers for ML data operations

async def preprocess_with_mcp(data_source):
    """
    ML data preprocessing with MCP integration
    """
    # 1. Load data description from Memory
    schema = await mcp_Memory_search_nodes("data_schema")

    # 2. Research best practices
    practices = await mcp_Context7_query_docs(
        libraryId="/pandas/pandas",
        query="data preprocessing best practices"
    )

    # 3. Design preprocessing with UltraThink
    pipeline = await mcp_UltraThink_ultrathink(
        thought=f"""
        Designing preprocessing pipeline:
        - Data schema: {schema}
        - Best practices: {practices}
        - Steps needed:
          1. Handle missing values
          2. Encode categoricals
          3. Normalize numerics
          4. Feature engineering
        """,
        total_thoughts=15
    )

    return pipeline

Feature Engineering Patterns

feature_patterns:
  numeric:
    - normalization: "MinMax, Standard, RobustScaler"
    - binning: "Equal width, quantile, custom"
    - transformations: "Log, Box-Cox, Yeo-Johnson"

  categorical:
    - encoding: "One-hot, Label, Target, Ordinal"
    - embedding: "Entity embeddings for high cardinality"
    - hashing: "Feature hashing for very high cardinality"

  temporal:
    - components: "Year, month, day, hour, minute"
    - cyclical: "Sin/cos encoding for periodic features"
    - aggregations: "Rolling windows, lag features"

  text:
    - vectorization: "TF-IDF, CountVectorizer"
    - embeddings: "Word2Vec, BERT, OpenAI embeddings"
    - extraction: "NER, keywords, sentiment"

Model Selection Heuristics

Problem-Model Mapping

model_selection:
  classification:
    binary:
      quick_baseline: "LogisticRegression"
      structured_data: "XGBoost, LightGBM"
      unstructured: "Neural Network"
      imbalanced: "SMOTE + XGBoost"

    multiclass:
      few_classes: "RandomForest"
      many_classes: "XGBoost with softmax"
      hierarchical: "Hierarchical classifier"

  regression:
    linear_relationships: "Ridge, Lasso, ElasticNet"
    complex_patterns: "XGBoost, LightGBM"
    time_series: "ARIMA, Prophet, LSTM"

  clustering:
    known_k: "KMeans"
    unknown_k: "DBSCAN, HDBSCAN"
    high_dimensional: "SpectralClustering"

  recommendation:
    collaborative: "Matrix Factorization, NCF"
    content_based: "Embeddings + KNN"
    hybrid: "Two-tower architecture"

Selection with UltraThink

async def select_model(problem_description, data_characteristics):
    """
    Use UltraThink for ML model selection
    """
    return await mcp_UltraThink_ultrathink(
        thought=f"""
        Selecting optimal model:

        Problem: {problem_description}
        Data size: {data_characteristics['rows']} rows
        Features: {data_characteristics['features']} columns
        Target: {data_characteristics['target_type']}

        Evaluation criteria:
        1. Accuracy vs interpretability tradeoff
        2. Training time constraints
        3. Inference latency requirements
        4. Data characteristics (missing, imbalanced, etc.)

        Candidate models:
        - Option A: [model] because [reason]
        - Option B: [model] because [reason]
        - Option C: [model] because [reason]

        Recommendation: ...
        """,
        total_thoughts=20,
        confidence=None  # Auto-calculate
    )

Training Pipeline

Standard Training Flow

from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier

def create_training_pipeline():
    """
    Standard ML training pipeline
    """
    # 1. Data Split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # 2. Create Pipeline
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', XGBClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=5,
            random_state=42
        ))
    ])

    # 3. Cross-validation
    cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)

    # 4. Train final model
    pipeline.fit(X_train, y_train)

    # 5. Evaluate
    test_score = pipeline.score(X_test, y_test)

    return pipeline, cv_scores, test_score

Hyperparameter Tuning

tuning_strategies:
  grid_search:
    use_when: "Small parameter space"
    example: |
      param_grid = {
          'model__n_estimators': [100, 200, 300],
          'model__max_depth': [3, 5, 7],
          'model__learning_rate': [0.01, 0.1, 0.3]
      }
      GridSearchCV(pipeline, param_grid, cv=5)

  random_search:
    use_when: "Large parameter space"
    example: |
      param_distributions = {
          'model__n_estimators': randint(100, 500),
          'model__max_depth': randint(3, 10)
      }
      RandomizedSearchCV(pipeline, param_distributions, n_iter=50)

  bayesian_optimization:
    use_when: "Expensive evaluations"
    tools: ["Optuna", "Hyperopt", "Ray Tune"]

Real-Time Inference Patterns

Production Serving

inference_patterns:
  batch:
    description: "Process large datasets offline"
    use_when: "Latency not critical"
    implementation: |
      predictions = model.predict(batch_data)
      save_to_database(predictions)

  real_time:
    description: "Single prediction on demand"
    use_when: "Low latency required"
    implementation: |
      @app.post("/predict")
      async def predict(request: PredictionRequest):
          features = preprocess(request.data)
          prediction = model.predict([features])[0]
          return {"prediction": prediction, "confidence": confidence}

  streaming:
    description: "Continuous predictions"
    use_when: "Event-driven systems"
    implementation: |
      for event in kafka_consumer:
          prediction = model.predict(event.data)
          kafka_producer.send(prediction)

Model Serving Architecture

┌─────────────────────────────────────────────────────────────┐
│                    ML SERVING ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐    ┌─────────────┐    ┌─────────────────────┐ │
│  │ Request │───>│ API Gateway │───>│ Feature Store       │ │
│  └─────────┘    └──────┬──────┘    │ (Redis/Feast)       │ │
│                        │           └──────────┬──────────┘ │
│                        v                      │            │
│              ┌─────────────────┐              │            │
│              │ Load Balancer   │<─────────────┘            │
│              └────────┬────────┘                           │
│                       │                                    │
│         ┌─────────────┼─────────────┐                      │
│         v             v             v                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                 │
│  │ Model    │  │ Model    │  │ Model    │                 │
│  │ Server 1 │  │ Server 2 │  │ Server N │                 │
│  └──────────┘  └──────────┘  └──────────┘                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

MLOps Integration

Experiment Tracking

import mlflow

def train_with_tracking():
    """
    MLflow experiment tracking pattern
    """
    mlflow.set_experiment("my_experiment")

    with mlflow.start_run():
        # Log parameters
        mlflow.log_params({
            "model": "XGBoost",
            "n_estimators": 100,
            "learning_rate": 0.1
        })

        # Train model
        model = train_model()

        # Log metrics
        mlflow.log_metrics({
            "accuracy": accuracy,
            "f1_score": f1,
            "auc_roc": auc
        })

        # Log model
        mlflow.sklearn.log_model(model, "model")

        # Log artifacts
        mlflow.log_artifact("confusion_matrix.png")

Model Registry Pattern

model_registry:
  stages:
    - name: "Development"
      validation: "Unit tests pass"

    - name: "Staging"
      validation: "Integration tests, performance baseline"

    - name: "Production"
      validation: "A/B test results, approval"

    - name: "Archived"
      trigger: "New model promoted"

  versioning:
    format: "semantic: major.minor.patch"
    triggers:
      major: "Architecture change"
      minor: "Retraining with new data"
      patch: "Hyperparameter tuning"

MCP Integration Examples

Research with Context7

async def research_ml_technique(technique):
    """
    Research ML technique using Context7
    """
    # Get library ID
    lib = await mcp_Context7_resolve_library_id(
        libraryName="scikit-learn",
        query=f"How to implement {technique}"
    )

    # Query documentation
    docs = await mcp_Context7_query_docs(
        libraryId=lib.id,
        query=f"{technique} implementation example"
    )

    return docs

Persist Learnings to Memory

async def save_ml_learnings(experiment_results):
    """
    Save ML learnings to Memory MCP
    """
    await mcp_Memory_create_entities([{
        "name": f"MLExperiment_{experiment_results.id}",
        "entityType": "MLExperiment",
        "observations": [
            f"Problem: {experiment_results.problem_type}",
            f"Best model: {experiment_results.best_model}",
            f"Accuracy: {experiment_results.accuracy}",
            f"Key learnings: {experiment_results.learnings}",
            f"Gotchas: {experiment_results.gotchas}"
        ]
    }])

Quick Reference

When to Use This Skill

triggers:
  explicit:
    - "train a model"
    - "predict"
    - "machine learning"
    - "data science"

  implicit:
    - Dataset processing tasks
    - Prediction requirements
    - Pattern recognition needs
    - Recommendation systems

Common Workflows

  1. Quick Model: Load data → Auto-select model → Train → Evaluate
  2. Production Pipeline: EDA → Feature eng → Model selection → Tuning → Deploy
  3. Experiment: Research → Implement → Track → Compare → Document
  4. Debug: Load model → Analyze predictions → Feature importance → Fix

  • nlp-master: Text processing for NLP models
  • performance-optimizer: Model optimization
  • e2e-testing: Model testing
  • omega-agent: Complex ML orchestration

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.