yha9806

lancedb-merger

0
0
# Install this skill:
npx skills add yha9806/claude-skills-vulca --skill "lancedb-merger"

Install specific skill from multi-skill repository

# Description

将生成的评论批量合并到LanceDB数据库。当需要合并评论到数据库、更新matched_pairs、或同步LanceDB时使用。支持增量合并、重复检测、数据验证。

# SKILL.md


name: lancedb-merger
description: 将生成的评论批量合并到LanceDB数据库。当需要合并评论到数据库、更新matched_pairs、或同步LanceDB时使用。支持增量合并、重复检测、数据验证。


LanceDB Critique Merger

核心功能

  • 将JSON批次文件合并到LanceDB matched_pairs表
  • 自动检测重复记录
  • 验证必需字段完整性
  • 支持增量合并和回滚

快速合并

合并单个批次

import lancedb
import json

def merge_batch_to_lancedb(batch_id: int):
    """合并单个批次到LanceDB"""
    # 读取批次输出
    with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
        critiques = json.load(f)

    # 连接数据库
    db = lancedb.connect('/home/yhryzy/vulca_lancedb')
    pairs = db.open_table('matched_pairs')

    # 获取已有路径(去重用)
    existing = pairs.search().where("culture = 'western'").limit(10000).to_list()
    existing_paths = set(r.get('image_path') or r.get('filepath') for r in existing)

    # 准备新记录
    new_records = []
    for c in critiques:
        if c['filepath'] not in existing_paths:
            record = {
                'critique_id': c['critique_id'],
                'culture': c['culture'],
                'image_path': c['filepath'],
                'optimized_path': c['filepath'],  # Western已优化
                'artist': c['artist'],
                'title': c['title'],
                'critique_zh': c['critique_zh'],
                'critique_en': c['critique_en'],
                'covered_dimensions': c['covered_dimensions'],
                'quality_score': c.get('quality_score', 85)
            }
            new_records.append(record)

    # 添加到数据库
    if new_records:
        pairs.add(new_records)
        print(f"Added {len(new_records)} new records")

    return len(new_records)

批量合并多个批次

def merge_batches_range(start: int, end: int):
    """合并指定范围的批次"""
    total_added = 0
    for batch_id in range(start, end + 1):
        try:
            added = merge_batch_to_lancedb(batch_id)
            total_added += added
            print(f"Batch {batch_id:03d}: +{added} records")
        except FileNotFoundError:
            print(f"Batch {batch_id:03d}: not found, skipping")
    return total_added

数据验证

必需字段检查

REQUIRED_FIELDS = [
    'critique_id', 'culture', 'image_path', 'artist', 'title',
    'critique_zh', 'critique_en', 'covered_dimensions'
]

def validate_record(record: dict) -> list:
    """验证记录完整性,返回缺失字段列表"""
    missing = []
    for field in REQUIRED_FIELDS:
        if not record.get(field):
            missing.append(field)
    return missing

质量阈值检查

def check_quality_thresholds(record: dict) -> dict:
    """检查质量阈值"""
    issues = {}

    zh_len = len(record.get('critique_zh', ''))
    if zh_len < 150:
        issues['critique_zh'] = f'too short: {zh_len} chars (need ≥150)'

    en_words = len(record.get('critique_en', '').split())
    if en_words < 100:
        issues['critique_en'] = f'too short: {en_words} words (need ≥100)'

    dims = len(record.get('covered_dimensions', []))
    if dims < 18:
        issues['dimensions'] = f'too few: {dims} (need ≥18)'

    return issues

合并报告

每次合并后生成报告:

def generate_merge_report(batch_ids: list) -> dict:
    """生成合并报告"""
    db = lancedb.connect('/home/yhryzy/vulca_lancedb')
    pairs = db.open_table('matched_pairs')

    # 统计各文化数量
    cultures = {}
    for culture in ['chinese', 'western', 'japanese', 'korean', 'islamic', 'indian', 'mural']:
        count = len(pairs.search().where(f"culture = '{culture}'").limit(10000).to_list())
        cultures[culture] = count

    return {
        'timestamp': datetime.now().isoformat(),
        'batches_merged': batch_ids,
        'culture_counts': cultures,
        'total_matched_pairs': sum(cultures.values())
    }

回滚支持

如果合并出错,可以回滚:

def rollback_batch(batch_id: int):
    """回滚指定批次的合并"""
    # 读取批次中的critique_ids
    with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
        critiques = json.load(f)

    critique_ids = [c['critique_id'] for c in critiques]

    # 从数据库删除
    db = lancedb.connect('/home/yhryzy/vulca_lancedb')
    pairs = db.open_table('matched_pairs')

    # LanceDB删除语法
    for cid in critique_ids:
        pairs.delete(f"critique_id = '{cid}'")

    print(f"Rolled back {len(critique_ids)} records from batch {batch_id}")

增量同步

检查并同步未合并的批次:

def sync_all_pending():
    """同步所有待合并的批次"""
    import glob

    # 找到所有输出文件
    outputs = glob.glob('checkpoints/western_batch_*_output.json')
    batch_ids = sorted([int(f.split('_')[2]) for f in outputs])

    # 检查哪些已合并
    db = lancedb.connect('/home/yhryzy/vulca_lancedb')
    pairs = db.open_table('matched_pairs')
    existing_ids = set(r['critique_id'] for r in pairs.search().where("culture = 'western'").limit(10000).to_list())

    # 合并未同步的
    for batch_id in batch_ids:
        with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
            critiques = json.load(f)

        pending = [c for c in critiques if c['critique_id'] not in existing_ids]
        if pending:
            print(f"Batch {batch_id}: {len(pending)} pending records")
            # merge...

使用示例

# 合并Batch 001-004
python -c "
import sys
sys.path.insert(0, 'scripts')
from merge_to_lancedb import merge_batches_range
merge_batches_range(1, 4)
"

# 检查合并结果
python -c "
import lancedb
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
western = pairs.search().where(\"culture = 'western'\").limit(10000).to_list()
print(f'Western total: {len(western)}')
"

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.