Use when you have a written implementation plan to execute in a separate session with review checkpoints
npx skills add yha9806/claude-skills-vulca --skill "batch-orchestrator"
Install specific skill from multi-skill repository
# Description
编排批量评论生成任务的主控Skill。当需要运行批量任务、检查进度、继续中断的任务、或管理western expansion工作流时使用。自动协调generator和merger,追踪全局进度。
# SKILL.md
name: batch-orchestrator
description: 编排批量评论生成任务的主控Skill。当需要运行批量任务、检查进度、继续中断的任务、或管理western expansion工作流时使用。自动协调generator和merger,追踪全局进度。
Batch Processing Orchestrator
核心职责
- 管理批量任务的生命周期
- 协调culture-batch-generator和lancedb-merger
- 追踪全局进度和检查点
- 处理错误恢复和重试
- 支持多文化扩展: chinese, korean, indian, mural, western
多文化扩展支持 (v2.0)
当前扩展目标 (2025-12-19)
| Culture | 当前 | 目标 | 需增量 | Agent |
|---|---|---|---|---|
| Korean | 64 | 150+ | +86 | korean-painting-critique-agent |
| Indian | 98 | 150+ | +52 | indian-bilingual-agent-v3 |
| Mural | 109 | 150+ | +41 | mural-bilingual-agent |
| Chinese | 1,854 | 2,100+ | +246 | chinese-painting-critique-agent |
多文化状态查询
import json
from pathlib import Path
def check_all_cultures():
cultures = ['chinese', 'korean', 'indian', 'mural', 'western']
for culture in cultures:
progress_file = Path(f'checkpoints/{culture}_expansion_progress.json')
if progress_file.exists():
with open(progress_file) as f:
p = json.load(f)
print(f"{culture:12} {p['completed']:>4}/{p['total']:>4} ({p['completed']/p['total']*100:.1f}%)")
else:
print(f"{culture:12} 未配置")
check_all_cultures()
执行优先级
按样本量缺口从大到小:
1. Korean (缺86条) - 优先处理
2. Indian (缺52条)
3. Mural (缺41条)
4. Chinese (缺246条) - 最后处理
快速命令
检查当前状态
# 查看进度概览
python scripts/batch_status.py
# 输出示例:
# === Western Expansion Progress ===
# Total: 3,704 images
# Completed: 100 (2.7%)
# Next batch: 005
# Last artist: Maximilien Luce (100/117)
# Estimated remaining: 145 batches
继续执行任务
# 运行下一个批次
python scripts/run_next_batch.py
# 运行N个批次
python scripts/run_next_batch.py --count 5
同步到数据库
# 合并所有待处理批次
python scripts/sync_to_lancedb.py
主工作流
完整执行循环
┌─────────────────────────────────────────────────────────────┐
│ Batch Orchestrator │
├─────────────────────────────────────────────────────────────┤
│ 1. 读取进度 → checkpoints/western_expansion_progress.json │
│ 2. 准备批次 → western-batch-generator │
│ 3. 生成评论 → Task(western-image-critique-agent) │
│ 4. 验证质量 → 检查字数/维度/分数 │
│ 5. 保存检查点 → western_batch_{NNN}_output.json │
│ 6. 每100条合并 → lancedb-merger │
│ 7. 更新进度 → 返回Step 1 │
└─────────────────────────────────────────────────────────────┘
自动化脚本: run_batch_workflow.py
#!/usr/bin/env python3
"""
Western Art Critique Batch Workflow Orchestrator
自动执行批量生成、验证、合并的完整流程
"""
import json
import os
from datetime import datetime
from pathlib import Path
CHECKPOINT_DIR = Path('checkpoints')
PROGRESS_FILE = CHECKPOINT_DIR / 'western_expansion_progress.json'
CANDIDATES_FILE = CHECKPOINT_DIR / 'western_expansion_candidates.json'
def load_progress():
"""加载或初始化进度"""
if PROGRESS_FILE.exists():
with open(PROGRESS_FILE) as f:
return json.load(f)
return {
'total': 3704,
'completed': 0,
'next_batch': 1,
'last_updated': None,
'completed_batches': [],
'failed_batches': [],
'merged_batches': []
}
def save_progress(progress):
"""保存进度"""
progress['last_updated'] = datetime.now().isoformat()
with open(PROGRESS_FILE, 'w') as f:
json.dump(progress, f, indent=2, ensure_ascii=False)
def get_pending_images(progress):
"""获取待处理图像"""
with open(CANDIDATES_FILE) as f:
data = json.load(f)
# 收集已完成路径
completed_paths = set()
for batch_id in progress['completed_batches']:
output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
if output_file.exists():
with open(output_file) as f:
for item in json.load(f):
completed_paths.add(item['filepath'])
# 筛选未完成
return [img for img in data['images'] if img['filepath'] not in completed_paths]
def prepare_next_batch(progress, size=10):
"""准备下一批次 (v2.1: 每批10张,避免context过载)"""
pending = get_pending_images(progress)
if not pending:
return None
batch_id = progress['next_batch']
batch = pending[:size]
input_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_input.json'
with open(input_file, 'w') as f:
json.dump(batch, f, indent=2, ensure_ascii=False)
return batch_id, batch
def validate_batch_output(batch_id):
"""验证批次输出"""
output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
if not output_file.exists():
return {'valid': False, 'error': 'output file not found'}
with open(output_file) as f:
critiques = json.load(f)
issues = []
for c in critiques:
if len(c.get('critique_zh', '')) < 200:
issues.append(f"{c['filepath']}: short_zh")
if len(c.get('covered_dimensions', [])) < 18:
issues.append(f"{c['filepath']}: low_dims")
return {
'valid': len(issues) == 0,
'total': len(critiques),
'issues': issues
}
def run_single_batch(progress):
"""执行单个批次的完整流程"""
# 1. 准备批次
result = prepare_next_batch(progress)
if not result:
print("No more images to process!")
return False
batch_id, batch = result
print(f"\n=== Processing Batch {batch_id:03d} ({len(batch)} images) ===")
# 2. 调用Agent生成评论
# 这部分需要通过Claude Task工具执行
print(f"Input file: checkpoints/western_batch_{batch_id:03d}_input.json")
print("Please run: Task(western-image-critique-agent, ...)")
return batch_id
# 主入口
if __name__ == '__main__':
progress = load_progress()
print(f"Current progress: {progress['completed']}/{progress['total']}")
run_single_batch(progress)
进度追踪
进度文件格式
{
"total": 3704,
"completed": 100,
"next_batch": 5,
"last_updated": "2025-12-18T10:30:00",
"completed_batches": [1, 2, 3, 4],
"merged_batches": [],
"failed_batches": [],
"artists_status": {
"Maximilien Luce": {"total": 117, "completed": 100},
"Michelangelo": {"total": 112, "completed": 0}
}
}
状态查询
def print_status():
"""打印详细状态"""
progress = load_progress()
print("=" * 50)
print("WESTERN EXPANSION STATUS")
print("=" * 50)
print(f"Total images: {progress['total']}")
print(f"Completed: {progress['completed']} ({progress['completed']/progress['total']*100:.1f}%)")
print(f"Next batch: {progress['next_batch']:03d}")
print(f"Last updated: {progress['last_updated']}")
print()
print("Completed batches:", progress['completed_batches'])
print("Merged batches: ", progress['merged_batches'])
print("Failed batches: ", progress['failed_batches'])
错误处理
批次失败处理
def handle_batch_failure(batch_id, error):
"""处理批次失败"""
progress = load_progress()
progress['failed_batches'].append({
'batch_id': batch_id,
'error': str(error),
'timestamp': datetime.now().isoformat()
})
save_progress(progress)
print(f"Batch {batch_id} failed: {error}")
def retry_failed_batches():
"""重试所有失败的批次"""
progress = load_progress()
for failed in progress['failed_batches'][:]: # 复制列表
batch_id = failed['batch_id']
print(f"Retrying batch {batch_id}...")
# 重新执行...
断点续传
def resume_from_checkpoint():
"""从检查点恢复执行"""
progress = load_progress()
# 找到未完成的批次
for batch_id in range(1, progress['next_batch']):
output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
if not output_file.exists() and batch_id not in progress['failed_batches']:
print(f"Found incomplete batch: {batch_id}")
return batch_id
return progress['next_batch']
并行执行
启动多个Agent
def run_parallel_batches(count=4):
"""并行运行多个批次 (v2.1: 最多4个agents,避免context爆炸)
重要约束:
- 每批次: 10张图片
- 最多并行: 4个agents
- 返回格式: 只返回摘要,完整数据写入checkpoint
"""
progress = load_progress()
tasks = []
# 硬编码最大并行数为4
max_parallel = min(count, 4)
for i in range(max_parallel):
batch_id = progress['next_batch'] + i
batch = prepare_batch(batch_id)
if batch:
tasks.append({
'batch_id': batch_id,
'input_file': f'checkpoints/western_batch_{batch_id:03d}_input.json'
})
# 通过Task工具并行启动
# Task(western-image-critique-agent, batch_id=X, run_in_background=True)
return tasks
每日运行建议
Session开始时
python scripts/batch_status.py- 检查进度- 确认上次的批次是否完成
- 如有未合并数据,先同步到LanceDB
执行批次
- 每次session处理5-10个批次
- 每完成4个批次(100条)合并一次
- 定期检查质量报告
Session结束前
- 保存所有输出文件
- 更新进度文件
- 同步到LanceDB
里程碑检查点
| 里程碑 | 批次范围 | 预计图像数 | Western总数 |
|---|---|---|---|
| M1 | 001-010 | 250 | 2,188 |
| M2 | 011-030 | 750 | 2,688 |
| M3 | 031-060 | 1,500 | 3,438 |
| M4 | 061-100 | 2,500 | 4,438 |
| M5 | 101-149 | 3,704 | 5,642 |
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.