yha9806

batch-orchestrator

by @yha9806 in Tools
0
0
# Install this skill:
npx skills add yha9806/claude-skills-vulca --skill "batch-orchestrator"

Install specific skill from multi-skill repository

# Description

编排批量评论生成任务的主控Skill。当需要运行批量任务、检查进度、继续中断的任务、或管理western expansion工作流时使用。自动协调generator和merger,追踪全局进度。

# SKILL.md


name: batch-orchestrator
description: 编排批量评论生成任务的主控Skill。当需要运行批量任务、检查进度、继续中断的任务、或管理western expansion工作流时使用。自动协调generator和merger,追踪全局进度。


Batch Processing Orchestrator

核心职责

  • 管理批量任务的生命周期
  • 协调culture-batch-generator和lancedb-merger
  • 追踪全局进度和检查点
  • 处理错误恢复和重试
  • 支持多文化扩展: chinese, korean, indian, mural, western

多文化扩展支持 (v2.0)

当前扩展目标 (2025-12-19)

Culture 当前 目标 需增量 Agent
Korean 64 150+ +86 korean-painting-critique-agent
Indian 98 150+ +52 indian-bilingual-agent-v3
Mural 109 150+ +41 mural-bilingual-agent
Chinese 1,854 2,100+ +246 chinese-painting-critique-agent

多文化状态查询

import json
from pathlib import Path

def check_all_cultures():
    cultures = ['chinese', 'korean', 'indian', 'mural', 'western']
    for culture in cultures:
        progress_file = Path(f'checkpoints/{culture}_expansion_progress.json')
        if progress_file.exists():
            with open(progress_file) as f:
                p = json.load(f)
            print(f"{culture:12} {p['completed']:>4}/{p['total']:>4} ({p['completed']/p['total']*100:.1f}%)")
        else:
            print(f"{culture:12} 未配置")

check_all_cultures()

执行优先级

按样本量缺口从大到小:
1. Korean (缺86条) - 优先处理
2. Indian (缺52条)
3. Mural (缺41条)
4. Chinese (缺246条) - 最后处理

快速命令

检查当前状态

# 查看进度概览
python scripts/batch_status.py

# 输出示例:
# === Western Expansion Progress ===
# Total: 3,704 images
# Completed: 100 (2.7%)
# Next batch: 005
# Last artist: Maximilien Luce (100/117)
# Estimated remaining: 145 batches

继续执行任务

# 运行下一个批次
python scripts/run_next_batch.py

# 运行N个批次
python scripts/run_next_batch.py --count 5

同步到数据库

# 合并所有待处理批次
python scripts/sync_to_lancedb.py

主工作流

完整执行循环

┌─────────────────────────────────────────────────────────────┐
│                    Batch Orchestrator                        │
├─────────────────────────────────────────────────────────────┤
│  1. 读取进度 → checkpoints/western_expansion_progress.json  │
│  2. 准备批次 → western-batch-generator                      │
│  3. 生成评论 → Task(western-image-critique-agent)           │
│  4. 验证质量 → 检查字数/维度/分数                           │
│  5. 保存检查点 → western_batch_{NNN}_output.json            │
│  6. 每100条合并 → lancedb-merger                            │
│  7. 更新进度 → 返回Step 1                                   │
└─────────────────────────────────────────────────────────────┘

自动化脚本: run_batch_workflow.py

#!/usr/bin/env python3
"""
Western Art Critique Batch Workflow Orchestrator
自动执行批量生成、验证、合并的完整流程
"""

import json
import os
from datetime import datetime
from pathlib import Path

CHECKPOINT_DIR = Path('checkpoints')
PROGRESS_FILE = CHECKPOINT_DIR / 'western_expansion_progress.json'
CANDIDATES_FILE = CHECKPOINT_DIR / 'western_expansion_candidates.json'

def load_progress():
    """加载或初始化进度"""
    if PROGRESS_FILE.exists():
        with open(PROGRESS_FILE) as f:
            return json.load(f)
    return {
        'total': 3704,
        'completed': 0,
        'next_batch': 1,
        'last_updated': None,
        'completed_batches': [],
        'failed_batches': [],
        'merged_batches': []
    }

def save_progress(progress):
    """保存进度"""
    progress['last_updated'] = datetime.now().isoformat()
    with open(PROGRESS_FILE, 'w') as f:
        json.dump(progress, f, indent=2, ensure_ascii=False)

def get_pending_images(progress):
    """获取待处理图像"""
    with open(CANDIDATES_FILE) as f:
        data = json.load(f)

    # 收集已完成路径
    completed_paths = set()
    for batch_id in progress['completed_batches']:
        output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
        if output_file.exists():
            with open(output_file) as f:
                for item in json.load(f):
                    completed_paths.add(item['filepath'])

    # 筛选未完成
    return [img for img in data['images'] if img['filepath'] not in completed_paths]

def prepare_next_batch(progress, size=10):
    """准备下一批次 (v2.1: 每批10张,避免context过载)"""
    pending = get_pending_images(progress)
    if not pending:
        return None

    batch_id = progress['next_batch']
    batch = pending[:size]

    input_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_input.json'
    with open(input_file, 'w') as f:
        json.dump(batch, f, indent=2, ensure_ascii=False)

    return batch_id, batch

def validate_batch_output(batch_id):
    """验证批次输出"""
    output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
    if not output_file.exists():
        return {'valid': False, 'error': 'output file not found'}

    with open(output_file) as f:
        critiques = json.load(f)

    issues = []
    for c in critiques:
        if len(c.get('critique_zh', '')) < 200:
            issues.append(f"{c['filepath']}: short_zh")
        if len(c.get('covered_dimensions', [])) < 18:
            issues.append(f"{c['filepath']}: low_dims")

    return {
        'valid': len(issues) == 0,
        'total': len(critiques),
        'issues': issues
    }

def run_single_batch(progress):
    """执行单个批次的完整流程"""
    # 1. 准备批次
    result = prepare_next_batch(progress)
    if not result:
        print("No more images to process!")
        return False

    batch_id, batch = result
    print(f"\n=== Processing Batch {batch_id:03d} ({len(batch)} images) ===")

    # 2. 调用Agent生成评论
    # 这部分需要通过Claude Task工具执行
    print(f"Input file: checkpoints/western_batch_{batch_id:03d}_input.json")
    print("Please run: Task(western-image-critique-agent, ...)")

    return batch_id

# 主入口
if __name__ == '__main__':
    progress = load_progress()
    print(f"Current progress: {progress['completed']}/{progress['total']}")
    run_single_batch(progress)

进度追踪

进度文件格式

{
  "total": 3704,
  "completed": 100,
  "next_batch": 5,
  "last_updated": "2025-12-18T10:30:00",
  "completed_batches": [1, 2, 3, 4],
  "merged_batches": [],
  "failed_batches": [],
  "artists_status": {
    "Maximilien Luce": {"total": 117, "completed": 100},
    "Michelangelo": {"total": 112, "completed": 0}
  }
}

状态查询

def print_status():
    """打印详细状态"""
    progress = load_progress()

    print("=" * 50)
    print("WESTERN EXPANSION STATUS")
    print("=" * 50)
    print(f"Total images:      {progress['total']}")
    print(f"Completed:         {progress['completed']} ({progress['completed']/progress['total']*100:.1f}%)")
    print(f"Next batch:        {progress['next_batch']:03d}")
    print(f"Last updated:      {progress['last_updated']}")
    print()
    print("Completed batches:", progress['completed_batches'])
    print("Merged batches:   ", progress['merged_batches'])
    print("Failed batches:   ", progress['failed_batches'])

错误处理

批次失败处理

def handle_batch_failure(batch_id, error):
    """处理批次失败"""
    progress = load_progress()
    progress['failed_batches'].append({
        'batch_id': batch_id,
        'error': str(error),
        'timestamp': datetime.now().isoformat()
    })
    save_progress(progress)
    print(f"Batch {batch_id} failed: {error}")

def retry_failed_batches():
    """重试所有失败的批次"""
    progress = load_progress()
    for failed in progress['failed_batches'][:]:  # 复制列表
        batch_id = failed['batch_id']
        print(f"Retrying batch {batch_id}...")
        # 重新执行...

断点续传

def resume_from_checkpoint():
    """从检查点恢复执行"""
    progress = load_progress()

    # 找到未完成的批次
    for batch_id in range(1, progress['next_batch']):
        output_file = CHECKPOINT_DIR / f'western_batch_{batch_id:03d}_output.json'
        if not output_file.exists() and batch_id not in progress['failed_batches']:
            print(f"Found incomplete batch: {batch_id}")
            return batch_id

    return progress['next_batch']

并行执行

启动多个Agent

def run_parallel_batches(count=4):
    """并行运行多个批次 (v2.1: 最多4个agents,避免context爆炸)

    重要约束:
    - 每批次: 10张图片
    - 最多并行: 4个agents
    - 返回格式: 只返回摘要,完整数据写入checkpoint
    """
    progress = load_progress()
    tasks = []

    # 硬编码最大并行数为4
    max_parallel = min(count, 4)

    for i in range(max_parallel):
        batch_id = progress['next_batch'] + i
        batch = prepare_batch(batch_id)
        if batch:
            tasks.append({
                'batch_id': batch_id,
                'input_file': f'checkpoints/western_batch_{batch_id:03d}_input.json'
            })

    # 通过Task工具并行启动
    # Task(western-image-critique-agent, batch_id=X, run_in_background=True)
    return tasks

每日运行建议

Session开始时

  1. python scripts/batch_status.py - 检查进度
  2. 确认上次的批次是否完成
  3. 如有未合并数据,先同步到LanceDB

执行批次

  1. 每次session处理5-10个批次
  2. 每完成4个批次(100条)合并一次
  3. 定期检查质量报告

Session结束前

  1. 保存所有输出文件
  2. 更新进度文件
  3. 同步到LanceDB

里程碑检查点

里程碑 批次范围 预计图像数 Western总数
M1 001-010 250 2,188
M2 011-030 750 2,688
M3 031-060 1,500 3,438
M4 061-100 2,500 4,438
M5 101-149 3,704 5,642

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.