cocoindex-io

cocoindex-v1

40
6
# Install this skill:
npx skills add cocoindex-io/cocoindex-claude --skill "cocoindex-v1"

Install specific skill from multi-skill repository

# Description

This skill should be used when building data processing pipelines with CocoIndex v1, a Python library for incremental data transformation. Use when the task involves processing files/data into databases, creating vector embeddings, building knowledge graphs, ETL workflows, or any data pipeline requiring automatic change detection and incremental updates. CocoIndex v1 is Python-native (supports any Python types), has no DSL, and is currently under pre-release (version 1.0.0a1 or later).

# SKILL.md


name: cocoindex-v1
description: This skill should be used when building data processing pipelines with CocoIndex v1, a Python library for incremental data transformation. Use when the task involves processing files/data into databases, creating vector embeddings, building knowledge graphs, ETL workflows, or any data pipeline requiring automatic change detection and incremental updates. CocoIndex v1 is Python-native (supports any Python types), has no DSL, and is currently under pre-release (version 1.0.0a1 or later).


CocoIndex v1

CocoIndex v1 is a Python library for building incremental data processing pipelines with declarative target states. Think spreadsheets or React for data pipelines: declare what the output should look like based on current input, and CocoIndex automatically handles incremental updates, change detection, and syncing to external systems.

Overview

CocoIndex v1 enables building data pipelines that:

  • Automatically handle incremental updates: Only reprocess changed data
  • Use declarative target states: Declare what should exist, not how to update
  • Support any Python types: No custom DSL—use dataclasses, Pydantic, NamedTuple
  • Provide function memoization: Skip expensive operations when inputs/code unchanged
  • Sync to multiple targets: PostgreSQL, SQLite, LanceDB, Qdrant, file systems

Key principle: TargetState = Transform(SourceState)

When to Use This Skill

Use this skill when building pipelines that involve:

  • Document processing: PDF/Markdown conversion, text extraction, chunking
  • Vector embeddings: Embedding documents/code for semantic search
  • Database transformations: ETL from source DB to target DB
  • Knowledge graphs: Extract entities and relationships from data
  • LLM-based extraction: Structured data extraction using LLMs
  • File-based pipelines: Transform files from one format to another
  • Incremental indexing: Keep search indexes up-to-date with source changes

Quick Start: Creating a New Project

Initialize Project

Use the built-in CLI to create a new project:

cocoindex init my-project
cd my-project

This creates:

  • main.py - Main app definition
  • pyproject.toml - Dependencies with pre-release config
  • .env - Environment configuration
  • README.md - Quick start guide

Add Dependencies for Specific Use Cases

Add dependencies to pyproject.toml based on your needs:

# For vector embeddings
dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]

# For PostgreSQL only
dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]

# For LLM extraction
dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]

See references/setup_project.md for complete examples.

Set Up Database (if using Postgres/Qdrant)

For PostgreSQL with Docker:

# Create docker-compose.yml with pgvector image
docker-compose up -d

For Qdrant with Docker:

# Create docker-compose.yml with Qdrant image
docker-compose up -d

See references/setup_database.md for detailed setup instructions.

Run the Pipeline

pip install -e .
cocoindex update main.py

Core Concepts

1. Apps

An app is the top-level executable that binds a main function with parameters:

import cocoindex as coco

@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
    # Processing logic here
    ...

app = coco.App(
    coco.AppConfig(name="MyApp"),
    app_main,
    sourcedir=pathlib.Path("./data"),
    outdir=pathlib.Path("./output"),
)

if __name__ == "__main__":
    app.update(report_to_stdout=True)

2. Processing Components

A processing component groups an item's processing with its target states. Mount components using coco.mount():

for file in files:
    coco.mount(
        coco.component_subpath("file", str(file.file_path.path)),
        process_file,
        file,
        outdir,
    )

For operations that return values, use coco.mount_run() to wait for the result:

table = coco.mount_run(
    coco.component_subpath("setup", "table"),
    db.declare_table_target,
    table_name="my_table",
    table_schema=schema,
).result()

Key points:

  • Each component runs independently
  • Use mount() for parallel processing, mount_run() when you need the result immediately
  • Use stable paths for proper memoization
  • Component path determines target state ownership

3. Function Memoization

Add memo=True to skip re-execution when inputs/code unchanged:

@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
    # LLM call, embedding generation, heavy computation
    result = expensive_transform(data)
    return result

4. Target States

Declare what should exist—CocoIndex handles creation/update/deletion:

# File target
localfs.declare_file(outdir / "output.txt", content)

# Database row target
table.declare_row(row=MyRecord(id=1, name="example"))

# Vector point target (Qdrant)
collection.declare_point(point=PointStruct(id="1", vector=[...]))

5. Context for Shared Resources

Use ContextKey to share expensive resources across components:

EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
    embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
    builder.provide(EMBEDDER, embedder)
    yield

@coco.function
def process_item(text: str) -> None:
    embedder = coco.use_context(EMBEDDER)
    embedding = embedder.embed(text)

Common Pipeline Patterns

Pattern 1: File Transformation

Transform files from input to output directory:

import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher

@coco.function(memo=True)
def process_file(file, outdir):
    content = file.read_text()
    transformed = transform_content(content)  # Your logic
    outname = file.file_path.path.stem + ".out"
    localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)

@coco.function
def app_main(sourcedir, outdir):
    files = localfs.walk_dir(
        sourcedir,
        recursive=True,
        path_matcher=PatternFilePathMatcher(
            included_patterns=["*.txt", "*.md"],
            excluded_patterns=[".*/**"],
        ),
    )
    for f in files:
        coco.mount(coco.component_subpath("file", str(f.file_path.path)), process_file, f, outdir)

app = coco.App(coco.AppConfig(name="Transform"), app_main, sourcedir=Path("./data"), outdir=Path("./out"))

Pattern 2: Vector Embedding Pipeline

Chunk and embed documents for semantic search:

import asyncio
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from numpy.typing import NDArray

PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()

@dataclass
class Embedding:
    filename: str
    location: str
    text: str
    embedding: Annotated[NDArray, _embedder]  # Auto-infer dimensions
    start_line: int
    end_line: int

@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
    async with await postgres.create_pool(DATABASE_URL) as pool:
        builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
        yield

@coco.function(memo=True)
async def process_chunk(filename, chunk, table):
    location = f"{chunk.start.char_offset}-{chunk.end.char_offset}"
    table.declare_row(
        row=Embedding(
            filename=str(filename),
            location=location,
            text=chunk.text,
            embedding=await _embedder.embed_async(chunk.text),
            start_line=chunk.start.line,
            end_line=chunk.end.line,
        ),
    )

@coco.function(memo=True)
async def process_file(file, table):
    text = file.read_text()
    chunks = _splitter.split(text, chunk_size=1000, min_chunk_size=300, chunk_overlap=200)
    await asyncio.gather(*(process_chunk(file.file_path.path, chunk, table) for chunk in chunks))

@coco.function
def app_main(sourcedir):
    target_db = coco.use_context(PG_DB)
    target_table = coco.mount_run(
        coco.component_subpath("setup", "table"),
        target_db.declare_table_target,
        table_name="embeddings",
        table_schema=postgres.TableSchema(Embedding, primary_key=["filename", "location"]),
    ).result()

    files = localfs.walk_dir(sourcedir, recursive=True)
    for file in files:
        coco.mount(coco.component_subpath("file", str(file.file_path.path)), process_file, file, target_table)

app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))

Pattern 3: LLM-Based Extraction

Extract structured data using LLMs:

import instructor
from pydantic import BaseModel
from litellm import acompletion

_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)

class ExtractionResult(BaseModel):
    title: str
    topics: list[str]

@coco.function(memo=True)  # Memo avoids re-calling LLM
async def extract_and_store(content, message_id, table):
    result = await _instructor_client.chat.completions.create(
        model="gpt-4",
        response_model=ExtractionResult,
        messages=[{"role": "user", "content": f"Extract topics: {content}"}],
    )
    table.declare_row(row=Message(id=message_id, title=result.title, content=content))

Connectors and Operations

CocoIndex v1 provides connectors for reading from and writing to various external systems including databases (SQL and vector), file systems, and more.

For detailed connector documentation, see:
- references/connectors.md - Complete connector reference with examples
- Pattern examples - Real-world usage in pipelines
- AI-optimized docs - Comprehensive online documentation

Text and Embedding Operations

Text Splitting

from cocoindex.ops.text import RecursiveSplitter, detect_code_language

splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")

chunks = splitter.split(
    text,
    chunk_size=1000,
    min_chunk_size=300,
    chunk_overlap=200,
    language=language,  # Syntax-aware splitting
)

Embeddings

from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder

embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")

# Sync
embedding = embedder.embed(text)

# Async
embedding = await embedder.embed_async(text)

CLI Commands

Run Pipeline

cocoindex update main.py              # Run app in main.py
cocoindex update main.py:my_app       # Run specific app
cocoindex update my_module:my_app     # Run from module

Drop All State

cocoindex drop main.py [-f]           # Drop and reset

List Apps

cocoindex ls main.py                  # List apps in file
cocoindex ls --db ./cocoindex.db      # List apps in DB

Show Component Paths

cocoindex show main.py                # Show component tree

Best Practices

1. Use Stable Component Paths

# ✅ Good: Stable identifiers
coco.component_subpath("file", str(file.file_path.path))
coco.component_subpath("record", record.id)

# ❌ Bad: Unstable identifiers
coco.component_subpath("file", file)      # Object reference
coco.component_subpath("idx", idx)        # Index changes

2. Add Memoization for Expensive Operations

# ✅ Good: Memoize expensive ops
@coco.function(memo=True)
async def process_chunk(chunk, table):
    embedding = await embedder.embed_async(chunk.text)  # Expensive!
    table.declare_row(...)

# ❌ Bad: No memoization
@coco.function  # Re-embeds every run
async def process_chunk(chunk, table):
    embedding = await embedder.embed_async(chunk.text)

3. Use Context for Shared Resources

# ✅ Good: Load model once
@coco.lifespan
def coco_lifespan(builder):
    model = load_expensive_model()
    builder.provide(MODEL_KEY, model)
    yield

# ❌ Bad: Load model every time
@coco.function
def process(data):
    model = load_expensive_model()  # Loaded repeatedly!

4. Use Type Annotations

# ✅ Good: Type-safe
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray

@dataclass
class Record:
    id: int
    name: str
    vector: Annotated[NDArray, embedder]  # Auto-infer dimensions

# ❌ Bad: No type safety
record = {"id": 1, "name": "example", "vector": [...]}

5. Organize with Setup Phase

with coco.component_subpath("setup"):
    table = coco.mount_run(...).result()

with coco.component_subpath("processing"):
    for item in items:
        coco.mount(coco.component_subpath(item.id), ...)

Troubleshooting

"Module not found" Error

Ensure pyproject.toml has pre-release config:

[tool.uv]
prerelease = "explicit"

PostgreSQL pgvector Not Found

Enable the pgvector extension:

# Connect to your database and enable the extension
psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"

See references/setup_database.md for detailed setup instructions.

Memoization Not Working

Check component paths are stable:

# Use stable IDs, not object references
coco.component_subpath(file.stable_key)  # ✅
coco.component_subpath(file)             # ❌

Everything Reprocessing

Add memo=True to expensive functions:

@coco.function(memo=True)  # Add this
async def process_item(item):
    ...

Resources

references/

assets/

  • simple-template/: Minimal project template structure

Additional Resources

For AI Agents:
- AI-Optimized Documentation - Comprehensive documentation optimized for LLM consumption

For Humans:
- CocoIndex Documentation - Full documentation site
- Programming Guide - Core concepts and patterns
- GitHub Examples - Real-world example projects
- CocoIndex on PyPI - Package repository (pre-release)

Version Note

This skill is for CocoIndex v1 (pre-release: >=1.0.0a1). It uses a completely different API from v0. Key differences:

  • Python-native (no DSL)
  • Any Python types supported
  • No flow definitions required
  • More flexible and seamless experience

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.