Refactor high-complexity React components in Dify frontend. Use when `pnpm analyze-component...
npx skills add ramidamolis-alt/agent-skills-workflows --skill "ml-pipeline"
Install specific skill from multi-skill repository
# Description
Advanced Machine Learning Pipeline skill - Data preprocessing, model selection, training workflows, real-time inference, and MLOps integration. Use when building ML models, analyzing data, or implementing predictive features.
# SKILL.md
name: ml-pipeline
description: Advanced Machine Learning Pipeline skill - Data preprocessing, model selection, training workflows, real-time inference, and MLOps integration. Use when building ML models, analyzing data, or implementing predictive features.
triggers: ["ml", "machine learning", "model", "train", "predict", "neural", "data science", "ΰΉΰΈ‘ΰΉΰΈΰΈ₯", "ΰΉΰΈΰΈ£ΰΈ"]
π€ ML Pipeline Master Skill
Expert in building production-grade machine learning pipelines with best practices from Google, Meta, and OpenAI.
Capability Overview
capabilities:
data:
- preprocessing: "Cleaning, normalization, feature engineering"
- validation: "Schema validation, drift detection"
- versioning: "DVC, Delta Lake patterns"
modeling:
- selection: "Algorithm selection heuristics"
- training: "Distributed training patterns"
- tuning: "Hyperparameter optimization"
- ensemble: "Model combination strategies"
deployment:
- serving: "Real-time and batch inference"
- monitoring: "Performance tracking, drift detection"
- versioning: "Model registry patterns"
mlops:
- ci_cd: "Continuous training pipelines"
- experiment_tracking: "MLflow, W&B patterns"
- governance: "Model cards, audit trails"
Data Pipeline Patterns
Data Preprocessing with MCP
# Using MCP servers for ML data operations
async def preprocess_with_mcp(data_source):
"""
ML data preprocessing with MCP integration
"""
# 1. Load data description from Memory
schema = await mcp_Memory_search_nodes("data_schema")
# 2. Research best practices
practices = await mcp_Context7_query_docs(
libraryId="/pandas/pandas",
query="data preprocessing best practices"
)
# 3. Design preprocessing with UltraThink
pipeline = await mcp_UltraThink_ultrathink(
thought=f"""
Designing preprocessing pipeline:
- Data schema: {schema}
- Best practices: {practices}
- Steps needed:
1. Handle missing values
2. Encode categoricals
3. Normalize numerics
4. Feature engineering
""",
total_thoughts=15
)
return pipeline
Feature Engineering Patterns
feature_patterns:
numeric:
- normalization: "MinMax, Standard, RobustScaler"
- binning: "Equal width, quantile, custom"
- transformations: "Log, Box-Cox, Yeo-Johnson"
categorical:
- encoding: "One-hot, Label, Target, Ordinal"
- embedding: "Entity embeddings for high cardinality"
- hashing: "Feature hashing for very high cardinality"
temporal:
- components: "Year, month, day, hour, minute"
- cyclical: "Sin/cos encoding for periodic features"
- aggregations: "Rolling windows, lag features"
text:
- vectorization: "TF-IDF, CountVectorizer"
- embeddings: "Word2Vec, BERT, OpenAI embeddings"
- extraction: "NER, keywords, sentiment"
Model Selection Heuristics
Problem-Model Mapping
model_selection:
classification:
binary:
quick_baseline: "LogisticRegression"
structured_data: "XGBoost, LightGBM"
unstructured: "Neural Network"
imbalanced: "SMOTE + XGBoost"
multiclass:
few_classes: "RandomForest"
many_classes: "XGBoost with softmax"
hierarchical: "Hierarchical classifier"
regression:
linear_relationships: "Ridge, Lasso, ElasticNet"
complex_patterns: "XGBoost, LightGBM"
time_series: "ARIMA, Prophet, LSTM"
clustering:
known_k: "KMeans"
unknown_k: "DBSCAN, HDBSCAN"
high_dimensional: "SpectralClustering"
recommendation:
collaborative: "Matrix Factorization, NCF"
content_based: "Embeddings + KNN"
hybrid: "Two-tower architecture"
Selection with UltraThink
async def select_model(problem_description, data_characteristics):
"""
Use UltraThink for ML model selection
"""
return await mcp_UltraThink_ultrathink(
thought=f"""
Selecting optimal model:
Problem: {problem_description}
Data size: {data_characteristics['rows']} rows
Features: {data_characteristics['features']} columns
Target: {data_characteristics['target_type']}
Evaluation criteria:
1. Accuracy vs interpretability tradeoff
2. Training time constraints
3. Inference latency requirements
4. Data characteristics (missing, imbalanced, etc.)
Candidate models:
- Option A: [model] because [reason]
- Option B: [model] because [reason]
- Option C: [model] because [reason]
Recommendation: ...
""",
total_thoughts=20,
confidence=None # Auto-calculate
)
Training Pipeline
Standard Training Flow
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
def create_training_pipeline():
"""
Standard ML training pipeline
"""
# 1. Data Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 2. Create Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
))
])
# 3. Cross-validation
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)
# 4. Train final model
pipeline.fit(X_train, y_train)
# 5. Evaluate
test_score = pipeline.score(X_test, y_test)
return pipeline, cv_scores, test_score
Hyperparameter Tuning
tuning_strategies:
grid_search:
use_when: "Small parameter space"
example: |
param_grid = {
'model__n_estimators': [100, 200, 300],
'model__max_depth': [3, 5, 7],
'model__learning_rate': [0.01, 0.1, 0.3]
}
GridSearchCV(pipeline, param_grid, cv=5)
random_search:
use_when: "Large parameter space"
example: |
param_distributions = {
'model__n_estimators': randint(100, 500),
'model__max_depth': randint(3, 10)
}
RandomizedSearchCV(pipeline, param_distributions, n_iter=50)
bayesian_optimization:
use_when: "Expensive evaluations"
tools: ["Optuna", "Hyperopt", "Ray Tune"]
Real-Time Inference Patterns
Production Serving
inference_patterns:
batch:
description: "Process large datasets offline"
use_when: "Latency not critical"
implementation: |
predictions = model.predict(batch_data)
save_to_database(predictions)
real_time:
description: "Single prediction on demand"
use_when: "Low latency required"
implementation: |
@app.post("/predict")
async def predict(request: PredictionRequest):
features = preprocess(request.data)
prediction = model.predict([features])[0]
return {"prediction": prediction, "confidence": confidence}
streaming:
description: "Continuous predictions"
use_when: "Event-driven systems"
implementation: |
for event in kafka_consumer:
prediction = model.predict(event.data)
kafka_producer.send(prediction)
Model Serving Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML SERVING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
β β Request ββββ>β API Gateway ββββ>β Feature Store β β
β βββββββββββ ββββββββ¬βββββββ β (Redis/Feast) β β
β β ββββββββββββ¬βββββββββββ β
β v β β
β βββββββββββββββββββ β β
β β Load Balancer β<ββββββββββββββ β
β ββββββββββ¬βββββββββ β
β β β
β βββββββββββββββΌββββββββββββββ β
β v v v β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β Model β β Model β β Model β β
β β Server 1 β β Server 2 β β Server N β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
MLOps Integration
Experiment Tracking
import mlflow
def train_with_tracking():
"""
MLflow experiment tracking pattern
"""
mlflow.set_experiment("my_experiment")
with mlflow.start_run():
# Log parameters
mlflow.log_params({
"model": "XGBoost",
"n_estimators": 100,
"learning_rate": 0.1
})
# Train model
model = train_model()
# Log metrics
mlflow.log_metrics({
"accuracy": accuracy,
"f1_score": f1,
"auc_roc": auc
})
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("confusion_matrix.png")
Model Registry Pattern
model_registry:
stages:
- name: "Development"
validation: "Unit tests pass"
- name: "Staging"
validation: "Integration tests, performance baseline"
- name: "Production"
validation: "A/B test results, approval"
- name: "Archived"
trigger: "New model promoted"
versioning:
format: "semantic: major.minor.patch"
triggers:
major: "Architecture change"
minor: "Retraining with new data"
patch: "Hyperparameter tuning"
MCP Integration Examples
Research with Context7
async def research_ml_technique(technique):
"""
Research ML technique using Context7
"""
# Get library ID
lib = await mcp_Context7_resolve_library_id(
libraryName="scikit-learn",
query=f"How to implement {technique}"
)
# Query documentation
docs = await mcp_Context7_query_docs(
libraryId=lib.id,
query=f"{technique} implementation example"
)
return docs
Persist Learnings to Memory
async def save_ml_learnings(experiment_results):
"""
Save ML learnings to Memory MCP
"""
await mcp_Memory_create_entities([{
"name": f"MLExperiment_{experiment_results.id}",
"entityType": "MLExperiment",
"observations": [
f"Problem: {experiment_results.problem_type}",
f"Best model: {experiment_results.best_model}",
f"Accuracy: {experiment_results.accuracy}",
f"Key learnings: {experiment_results.learnings}",
f"Gotchas: {experiment_results.gotchas}"
]
}])
Quick Reference
When to Use This Skill
triggers:
explicit:
- "train a model"
- "predict"
- "machine learning"
- "data science"
implicit:
- Dataset processing tasks
- Prediction requirements
- Pattern recognition needs
- Recommendation systems
Common Workflows
- Quick Model: Load data β Auto-select model β Train β Evaluate
- Production Pipeline: EDA β Feature eng β Model selection β Tuning β Deploy
- Experiment: Research β Implement β Track β Compare β Document
- Debug: Load model β Analyze predictions β Feature importance β Fix
Related Skills
nlp-master: Text processing for NLP modelsperformance-optimizer: Model optimizatione2e-testing: Model testingomega-agent: Complex ML orchestration
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.