Manage Apple Reminders via the `remindctl` CLI on macOS (list, add, edit, complete, delete)....
npx skills add streamingfast/substreams-skills --skill "substreams-sink"
Install specific skill from multi-skill repository
# Description
Expert knowledge for consuming Substreams data in applications. Use when building sinks, real-time data pipelines, or integrating Substreams outputs into Go, JavaScript, Python, or Rust applications.
# SKILL.md
name: substreams-sink
description: Expert knowledge for consuming Substreams data in applications. Use when building sinks, real-time data pipelines, or integrating Substreams outputs into Go, JavaScript, Python, or Rust applications.
license: Apache-2.0
compatibility:
platforms: [claude-code, cursor, vscode, windsurf]
metadata:
version: 1.0.0
author: StreamingFast
documentation: https://substreams.streamingfast.io
Substreams Sink Development Expert
Expert assistant for consuming Substreams data - building production-grade sinks and data pipelines.
Core Concepts
What is a Substreams Sink?
A Substreams sink is an application that:
- Connects to a Substreams endpoint via gRPC
- Streams processed blockchain data from a Substreams package (.spkg)
- Handles cursor persistence for resumability
- Manages chain reorganizations (reorgs) gracefully
- Processes the data into your destination (database, queue, etc.)
Note: Before building a custom sink, consider using existing solutions:
- substreams-sink-sql - For PostgreSQL and ClickHouse. Handles cursor management, reorgs, batching, and schema management out of the box.
- substreams-sink-kv - For key-value stores.
- substreams-sink-files - For file-based outputs (JSON, CSV, Parquet).The examples in this guide use database code for illustration purposes. For production SQL database sinks,
substreams-sink-sqlis highly recommended as it solves cursor persistence, reorg handling, batching, and many edge cases already.
Key Components
- Endpoint: gRPC server providing Substreams data (e.g.,
mainnet.eth.streamingfast.io:443) - Package (.spkg): Compiled Substreams with modules and protobuf schemas
- Module: The specific output module to stream from
- Cursor: Opaque string for resuming streams at exact position
- Block Range: Start and stop blocks for the stream
Authentication
All Substreams endpoints require authentication:
# Set API key (recommended for CLI tools)
export SUBSTREAMS_API_KEY="your-api-key"
# Or set bearer token directly
export SUBSTREAMS_API_TOKEN="your-jwt-token"
Get your API key from thegraph.market or pinax.network.
Language Recommendations
| Language | Recommendation | Best For |
|---|---|---|
| Go | Official SDK (Recommended) | Production sinks, StreamingFast sinks |
| JavaScript | Official SDK | Web apps, Node.js services |
| Python | Reference implementation | Prototyping, data analysis |
| Rust | Reference implementation | High-performance custom sinks |
Quick Start by Language
Go (Recommended)
package main
import (
"context"
"log"
"github.com/streamingfast/substreams/sink"
)
func main() {
sinker, err := sink.New(
sink.NewFromManifest("substreams.spkg", "map_events"),
sink.WithBlockRange(":+1000"),
)
if err != nil {
log.Fatalf("create sinker: %v", err)
}
sinker.Run(ctx, sink.NewSinker(
handleBlockScopedData,
handleBlockUndoSignal,
))
}
func handleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *sink.Cursor) error {
// Process block data
// Persist cursor after successful processing
return nil
}
func handleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
// Handle reorg: rewind data to undoSignal.LastValidBlock
// Persist undoSignal.LastValidCursor
return nil
}
See references/go-sink.md for complete guide.
JavaScript (Node.js)
import { createRegistry, createRequest } from "@substreams/core";
import { createGrpcTransport } from "@connectrpc/connect-node";
const transport = createGrpcTransport({
baseUrl: "https://mainnet.eth.streamingfast.io:443",
httpVersion: "2",
});
const request = createRequest({
substreamPackage: pkg,
outputModule: "map_events",
startBlockNum: 17000000n,
stopBlockNum: "+1000",
});
for await (const response of stream(request, registry, transport)) {
if (response.message.case === "blockScopedData") {
// Process block data
await persistCursor(response.message.value.cursor);
} else if (response.message.case === "blockUndoSignal") {
// Handle reorg
await handleUndo(response.message.value);
}
}
See references/javascript-sink.md for complete guide.
Python
import grpc
from sf.substreams.rpc.v2 import service_pb2, service_pb2_grpc
creds = grpc.ssl_channel_credentials()
with grpc.secure_channel("mainnet.eth.streamingfast.io:443", creds) as channel:
stub = service_pb2_grpc.StreamStub(channel)
metadata = [("authorization", f"Bearer {token}")]
request = service_pb2.Request(
start_block_num=17000000,
stop_block_num=17001000,
modules=package.modules,
output_module="map_events",
production_mode=True,
)
for response in stub.Blocks(request, metadata=metadata):
if response.WhichOneof("message") == "block_scoped_data":
# Process block data
pass
elif response.WhichOneof("message") == "block_undo_signal":
# Handle reorg
pass
See references/python-sink.md for complete guide.
Rust
use substreams_stream::{BlockResponse, SubstreamsStream};
let stream = SubstreamsStream::new(
endpoint,
cursor,
package,
modules.clone(),
"map_events".to_string(),
start_block,
stop_block,
);
while let Some(response) = stream.next().await {
match response? {
BlockResponse::New(data) => {
// Process block data
persist_cursor(&data.cursor);
}
BlockResponse::Undo(signal) => {
// Handle reorg
rewind_to_block(signal.last_valid_block);
persist_cursor(&signal.last_valid_cursor);
}
}
}
See references/rust-sink.md for complete guide.
Critical Concepts
Cursor Management
The cursor is the most critical piece of sink development.
RULE #1: Persist cursor AFTER successful processing, never before.
RULE #2: On restart, load persisted cursor and resume from there.
RULE #3: Blank/empty cursor means start from the beginning.
The cursor is an opaque string that encodes:
- Block number and hash
- Module execution state
- Position within the block
Cursor persistence patterns:
| Storage | Use Case | Example |
|---|---|---|
| File | Development, single instance | cursor.txt |
| Database | Production, multi-instance | Cursors table with module key |
| Redis | High availability | Key-value with TTL |
See references/cursor-reorg.md for detailed patterns.
Chain Reorganization (Reorg) Handling
When a chain reorganizes, you receive a BlockUndoSignal:
BlockUndoSignal {
last_valid_block: BlockRef { num: 17000100, id: "0xabc..." }
last_valid_cursor: "opaque-cursor-string"
}
Required actions:
1. Delete/revert all data for blocks > last_valid_block.num
2. Persist the last_valid_cursor
3. Continue streaming (new blocks will follow automatically)
Final blocks only mode (recommended for most sinks):
- Set final_blocks_only: true in request
- Only receive blocks that cannot be reorganized
- Eliminates need for undo handling
- Trade-off: ~2-3 minutes delay from chain tip
Error Handling & Retry
Fatal errors (do not retry):
- Unauthenticated - Invalid or expired token
- InvalidArgument - Bad request parameters
- Internal - Server-side bug
Retryable errors (implement exponential backoff):
- Unavailable - Server temporarily unavailable
- ResourceExhausted - Rate limited
- Connection timeouts
Exponential backoff pattern:
Base delay: 500ms
Max delay: 45-90 seconds
Jitter: Add random 0-100ms
Production Mode vs Development Mode
| Feature | Production Mode | Development Mode |
|---|---|---|
| Parallel execution | Yes | No |
| Output | Single module only | All modules |
| Performance | Optimized | Debug-friendly |
| Use case | Sinks | Testing, debugging |
Always use production mode for sinks:
sink.WithProductionMode() // Go
production_mode=True // Python
productionMode: true // JavaScript
Common Endpoints
| Network | Endpoint |
|---|---|
| Ethereum Mainnet | mainnet.eth.streamingfast.io:443 |
| Ethereum Sepolia | sepolia.eth.streamingfast.io:443 |
| Polygon | polygon.streamingfast.io:443 |
| Arbitrum One | arb-one.streamingfast.io:443 |
| Optimism | optimism.streamingfast.io:443 |
| Base | base.streamingfast.io:443 |
| BSC | bsc.streamingfast.io:443 |
| Solana | mainnet.sol.streamingfast.io:443 |
| Near | mainnet.near.streamingfast.io:443 |
Full list: thegraph.market/supported-networks
Block Range Syntax
# Explicit range
--start-block 17000000 --stop-block 17001000
# From manifest initialBlock
--start-block : --stop-block 17001000
# Relative stop (process 1000 blocks)
--start-block 17000000 --stop-block +1000
# To chain head (live streaming)
--start-block 17000000 --stop-block 0
Module Parameters
Pass runtime parameters to modules:
# Single parameter
-p "map_events=0xa0b86a33e6..."
# Multiple parameters
-p "map_events=0xa0b86a33..." -p "filter_module=type:transfer"
# JSON parameters
-p 'map_events={"contracts":["0x123","0x456"],"min_value":1000}'
Protobuf Code Generation
Generate language bindings from .spkg files:
# Install buf
go install github.com/bufbuild/buf/cmd/buf@latest
# Generate from local .spkg
buf generate --exclude-path="google" ./my-substreams.spkg#format=bin
# Generate from URL
buf generate "https://spkg.io/streamingfast/substreams-eth-block-meta-v0.4.3.spkg#format=binpb"
# Generate from buf registry
buf generate buf.build/streamingfast/substreams --include-imports
Troubleshooting
Connection Issues
"Unauthenticated" error:
- Verify API key/token is set correctly
- Check token hasn't expired
- Ensure correct environment variable name
"Connection refused" error:
- Verify endpoint URL and port
- Check TLS is enabled for https://
- Test network connectivity
Empty Output
No data received:
- Verify initialBlock in manifest is before your start block
- Check the output module name is correct
- Ensure the block range contains relevant data
- Try a known-good block range first
Performance Issues
Slow processing:
- Enable production mode
- Use gzip compression
- Increase connection timeout
- Consider final_blocks_only for non-realtime needs
Resources
- Official Documentation
- Go Sink Reference
- JavaScript Sink Reference
- Python Sink Reference
- Rust Sink Reference
- Cursor & Reorg Handling
Getting Help
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.