Refactor high-complexity React components in Dify frontend. Use when `pnpm analyze-component...
0
0
# Install this skill:
npx skills add yha9806/claude-skills-vulca --skill "lancedb-merger"
Install specific skill from multi-skill repository
# Description
将生成的评论批量合并到LanceDB数据库。当需要合并评论到数据库、更新matched_pairs、或同步LanceDB时使用。支持增量合并、重复检测、数据验证。
# SKILL.md
name: lancedb-merger
description: 将生成的评论批量合并到LanceDB数据库。当需要合并评论到数据库、更新matched_pairs、或同步LanceDB时使用。支持增量合并、重复检测、数据验证。
LanceDB Critique Merger
核心功能
- 将JSON批次文件合并到LanceDB matched_pairs表
- 自动检测重复记录
- 验证必需字段完整性
- 支持增量合并和回滚
快速合并
合并单个批次
import lancedb
import json
def merge_batch_to_lancedb(batch_id: int):
"""合并单个批次到LanceDB"""
# 读取批次输出
with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
critiques = json.load(f)
# 连接数据库
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
# 获取已有路径(去重用)
existing = pairs.search().where("culture = 'western'").limit(10000).to_list()
existing_paths = set(r.get('image_path') or r.get('filepath') for r in existing)
# 准备新记录
new_records = []
for c in critiques:
if c['filepath'] not in existing_paths:
record = {
'critique_id': c['critique_id'],
'culture': c['culture'],
'image_path': c['filepath'],
'optimized_path': c['filepath'], # Western已优化
'artist': c['artist'],
'title': c['title'],
'critique_zh': c['critique_zh'],
'critique_en': c['critique_en'],
'covered_dimensions': c['covered_dimensions'],
'quality_score': c.get('quality_score', 85)
}
new_records.append(record)
# 添加到数据库
if new_records:
pairs.add(new_records)
print(f"Added {len(new_records)} new records")
return len(new_records)
批量合并多个批次
def merge_batches_range(start: int, end: int):
"""合并指定范围的批次"""
total_added = 0
for batch_id in range(start, end + 1):
try:
added = merge_batch_to_lancedb(batch_id)
total_added += added
print(f"Batch {batch_id:03d}: +{added} records")
except FileNotFoundError:
print(f"Batch {batch_id:03d}: not found, skipping")
return total_added
数据验证
必需字段检查
REQUIRED_FIELDS = [
'critique_id', 'culture', 'image_path', 'artist', 'title',
'critique_zh', 'critique_en', 'covered_dimensions'
]
def validate_record(record: dict) -> list:
"""验证记录完整性,返回缺失字段列表"""
missing = []
for field in REQUIRED_FIELDS:
if not record.get(field):
missing.append(field)
return missing
质量阈值检查
def check_quality_thresholds(record: dict) -> dict:
"""检查质量阈值"""
issues = {}
zh_len = len(record.get('critique_zh', ''))
if zh_len < 150:
issues['critique_zh'] = f'too short: {zh_len} chars (need ≥150)'
en_words = len(record.get('critique_en', '').split())
if en_words < 100:
issues['critique_en'] = f'too short: {en_words} words (need ≥100)'
dims = len(record.get('covered_dimensions', []))
if dims < 18:
issues['dimensions'] = f'too few: {dims} (need ≥18)'
return issues
合并报告
每次合并后生成报告:
def generate_merge_report(batch_ids: list) -> dict:
"""生成合并报告"""
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
# 统计各文化数量
cultures = {}
for culture in ['chinese', 'western', 'japanese', 'korean', 'islamic', 'indian', 'mural']:
count = len(pairs.search().where(f"culture = '{culture}'").limit(10000).to_list())
cultures[culture] = count
return {
'timestamp': datetime.now().isoformat(),
'batches_merged': batch_ids,
'culture_counts': cultures,
'total_matched_pairs': sum(cultures.values())
}
回滚支持
如果合并出错,可以回滚:
def rollback_batch(batch_id: int):
"""回滚指定批次的合并"""
# 读取批次中的critique_ids
with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
critiques = json.load(f)
critique_ids = [c['critique_id'] for c in critiques]
# 从数据库删除
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
# LanceDB删除语法
for cid in critique_ids:
pairs.delete(f"critique_id = '{cid}'")
print(f"Rolled back {len(critique_ids)} records from batch {batch_id}")
增量同步
检查并同步未合并的批次:
def sync_all_pending():
"""同步所有待合并的批次"""
import glob
# 找到所有输出文件
outputs = glob.glob('checkpoints/western_batch_*_output.json')
batch_ids = sorted([int(f.split('_')[2]) for f in outputs])
# 检查哪些已合并
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
existing_ids = set(r['critique_id'] for r in pairs.search().where("culture = 'western'").limit(10000).to_list())
# 合并未同步的
for batch_id in batch_ids:
with open(f'checkpoints/western_batch_{batch_id:03d}_output.json') as f:
critiques = json.load(f)
pending = [c for c in critiques if c['critique_id'] not in existing_ids]
if pending:
print(f"Batch {batch_id}: {len(pending)} pending records")
# merge...
使用示例
# 合并Batch 001-004
python -c "
import sys
sys.path.insert(0, 'scripts')
from merge_to_lancedb import merge_batches_range
merge_batches_range(1, 4)
"
# 检查合并结果
python -c "
import lancedb
db = lancedb.connect('/home/yhryzy/vulca_lancedb')
pairs = db.open_table('matched_pairs')
western = pairs.search().where(\"culture = 'western'\").limit(10000).to_list()
print(f'Western total: {len(western)}')
"
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Amp
Antigravity
Claude Code
Clawdbot
Codex
Cursor
Droid
Gemini CLI
GitHub Copilot
Goose
Kilo Code
Kiro CLI
OpenCode
Roo Code
Trae
Windsurf
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.