Use when adding new error messages to React, or seeing "unknown error code" warnings.
npx skills add agentmc15/polymarket-trader --skill "trader-analysis"
Install specific skill from multi-skill repository
# Description
Analyze Polymarket traders, identify profitable traders to follow, and track their performance. Use when building copy trading features or trader discovery.
# SKILL.md
name: trader-analysis
description: Analyze Polymarket traders, identify profitable traders to follow, and track their performance. Use when building copy trading features or trader discovery.
Trader Analysis Skill
Tracking Trader Activity
On-Chain Data
from web3 import Web3
import httpx
from typing import AsyncIterator
CTF_EXCHANGE = "0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E"
class TraderTracker:
def __init__(self, polygon_rpc: str):
self.w3 = Web3(Web3.HTTPProvider(polygon_rpc))
self.exchange = self.w3.eth.contract(
address=CTF_EXCHANGE,
abi=CTF_EXCHANGE_ABI
)
async def get_trader_trades(
self,
address: str,
from_block: int = None
) -> list[dict]:
"""Fetch all trades for an address."""
events = self.exchange.events.OrderFilled.get_logs(
fromBlock=from_block or "earliest",
argument_filters={"maker": address}
)
return [self._parse_trade_event(e) for e in events]
def _parse_trade_event(self, event: dict) -> dict:
"""Parse OrderFilled event into trade dict."""
return {
"tx_hash": event.transactionHash.hex(),
"block_number": event.blockNumber,
"maker": event.args.maker,
"taker": event.args.taker,
"token_id": str(event.args.tokenId),
"amount": event.args.amount / 1e6, # Assuming 6 decimals
"price": event.args.price / 1e18,
"side": "BUY" if event.args.side == 0 else "SELL",
"timestamp": self._get_block_timestamp(event.blockNumber)
}
Polymarket Data API
class PolymarketDataClient:
BASE_URL = "https://data-api.polymarket.com"
def __init__(self):
self.client = httpx.AsyncClient(
base_url=self.BASE_URL,
timeout=30.0
)
async def get_trader_profile(self, address: str) -> dict:
"""Fetch trader profile and stats."""
response = await self.client.get(f"/users/{address}")
response.raise_for_status()
return response.json()
async def get_trader_positions(self, address: str) -> list[dict]:
"""Get all positions for a trader."""
response = await self.client.get(
"/positions",
params={"user": address}
)
response.raise_for_status()
return response.json()
async def get_trader_activity(
self,
address: str,
limit: int = 100,
offset: int = 0
) -> list[dict]:
"""Get recent trading activity."""
response = await self.client.get(
"/activity",
params={
"user": address,
"limit": limit,
"offset": offset
}
)
response.raise_for_status()
return response.json()
async def get_leaderboard(
self,
period: str = "all",
limit: int = 100
) -> list[dict]:
"""Get top traders by P&L."""
response = await self.client.get(
"/leaderboard",
params={"period": period, "limit": limit}
)
response.raise_for_status()
return response.json()
Trader Scoring System
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
from typing import Optional
@dataclass
class TraderMetrics:
address: str
total_pnl: float
realized_pnl: float
unrealized_pnl: float
win_rate: float
avg_return_per_trade: float
sharpe_ratio: float
total_trades: int
unique_markets: int
avg_position_size: float
avg_hold_time: timedelta
consistency_score: float
recency_score: float
largest_win: float
largest_loss: float
profit_factor: float # gross profit / gross loss
class TraderAnalyzer:
def __init__(self, data_client: PolymarketDataClient):
self.client = data_client
async def analyze_trader(
self,
address: str,
days: int = 90
) -> TraderMetrics:
"""Comprehensive trader analysis."""
activity = await self.client.get_trader_activity(
address, limit=1000
)
positions = await self.client.get_trader_positions(address)
# Filter to time period
cutoff = datetime.utcnow() - timedelta(days=days)
recent_trades = [
t for t in activity
if datetime.fromisoformat(t["timestamp"]) > cutoff
]
return TraderMetrics(
address=address,
total_pnl=self._calculate_total_pnl(positions, recent_trades),
realized_pnl=self._calculate_realized_pnl(recent_trades),
unrealized_pnl=self._calculate_unrealized_pnl(positions),
win_rate=self._calculate_win_rate(recent_trades),
avg_return_per_trade=self._calculate_avg_return(recent_trades),
sharpe_ratio=self._calculate_sharpe(recent_trades),
total_trades=len(recent_trades),
unique_markets=len(set(t["market_id"] for t in recent_trades)),
avg_position_size=self._calculate_avg_size(recent_trades),
avg_hold_time=self._calculate_avg_hold_time(recent_trades),
consistency_score=self._calculate_consistency(recent_trades),
recency_score=self._calculate_recency_score(recent_trades),
largest_win=max((t.get("pnl", 0) for t in recent_trades), default=0),
largest_loss=min((t.get("pnl", 0) for t in recent_trades), default=0),
profit_factor=self._calculate_profit_factor(recent_trades)
)
def _calculate_win_rate(self, trades: list[dict]) -> float:
"""Calculate percentage of profitable trades."""
if not trades:
return 0
winning = sum(1 for t in trades if t.get("pnl", 0) > 0)
return winning / len(trades)
def _calculate_sharpe(self, trades: list[dict]) -> float:
"""Calculate Sharpe ratio of returns."""
returns = [t.get("return_pct", 0) for t in trades if "return_pct" in t]
if len(returns) < 2:
return 0
mean_return = np.mean(returns)
std_return = np.std(returns)
if std_return == 0:
return 0
# Annualize assuming daily trades
return (mean_return * 365**0.5) / std_return
def _calculate_consistency(self, trades: list[dict]) -> float:
"""Score how consistent the trader's performance is."""
if len(trades) < 10:
return 0
# Group by week
weekly_pnl = {}
for trade in trades:
week = datetime.fromisoformat(trade["timestamp"]).isocalendar()[:2]
weekly_pnl[week] = weekly_pnl.get(week, 0) + trade.get("pnl", 0)
if len(weekly_pnl) < 4:
return 0
# Calculate consistency as % of profitable weeks
profitable_weeks = sum(1 for pnl in weekly_pnl.values() if pnl > 0)
return profitable_weeks / len(weekly_pnl)
def _calculate_recency_score(self, trades: list[dict]) -> float:
"""Score based on recent activity (more recent = higher)."""
if not trades:
return 0
latest = max(
datetime.fromisoformat(t["timestamp"]) for t in trades
)
days_since = (datetime.utcnow() - latest).days
# Decay score over 30 days
return max(0, 1 - (days_since / 30))
def _calculate_profit_factor(self, trades: list[dict]) -> float:
"""Gross profit / gross loss."""
gross_profit = sum(t.get("pnl", 0) for t in trades if t.get("pnl", 0) > 0)
gross_loss = abs(sum(t.get("pnl", 0) for t in trades if t.get("pnl", 0) < 0))
if gross_loss == 0:
return float('inf') if gross_profit > 0 else 0
return gross_profit / gross_loss
class TraderScorer:
def __init__(self, weights: dict = None):
self.weights = weights or {
"pnl": 0.20,
"win_rate": 0.15,
"sharpe": 0.15,
"consistency": 0.15,
"recency": 0.10,
"profit_factor": 0.10,
"experience": 0.10,
"diversity": 0.05
}
def calculate_score(self, metrics: TraderMetrics) -> float:
"""Calculate overall trader score (0-100)."""
scores = {
"pnl": self._normalize_pnl(metrics.total_pnl),
"win_rate": metrics.win_rate * 100,
"sharpe": self._normalize_sharpe(metrics.sharpe_ratio),
"consistency": metrics.consistency_score * 100,
"recency": metrics.recency_score * 100,
"profit_factor": self._normalize_profit_factor(metrics.profit_factor),
"experience": self._normalize_trades(metrics.total_trades),
"diversity": self._normalize_markets(metrics.unique_markets)
}
return sum(scores[k] * self.weights[k] for k in self.weights)
def _normalize_pnl(self, pnl: float) -> float:
"""Normalize P&L to 0-100 scale."""
if pnl <= 0:
return max(0, 50 + pnl / 1000)
return min(100, 50 + np.log1p(pnl) * 8)
def _normalize_sharpe(self, sharpe: float) -> float:
"""Normalize Sharpe ratio to 0-100."""
# Sharpe of 2+ is excellent
return min(100, max(0, sharpe * 33))
def _normalize_profit_factor(self, pf: float) -> float:
"""Normalize profit factor to 0-100."""
if pf == float('inf'):
return 100
# PF of 2+ is good
return min(100, pf * 40)
def _normalize_trades(self, trades: int) -> float:
"""Normalize trade count to 0-100."""
# 100+ trades shows experience
return min(100, trades)
def _normalize_markets(self, markets: int) -> float:
"""Normalize unique markets to 0-100."""
# Trading 10+ markets shows diversity
return min(100, markets * 10)
Finding Traders to Follow
class TraderDiscovery:
def __init__(
self,
data_client: PolymarketDataClient,
analyzer: TraderAnalyzer
):
self.client = data_client
self.analyzer = analyzer
self.scorer = TraderScorer()
async def find_top_traders(
self,
min_trades: int = 50,
min_pnl: float = 1000,
min_win_rate: float = 0.5,
days: int = 30
) -> list[tuple[str, float, TraderMetrics]]:
"""Discover top performing traders."""
leaderboard = await self.client.get_leaderboard(limit=500)
candidates = []
for trader in leaderboard:
try:
metrics = await self.analyzer.analyze_trader(
trader["address"],
days=days
)
# Apply filters
if (metrics.total_trades >= min_trades and
metrics.total_pnl >= min_pnl and
metrics.win_rate >= min_win_rate):
score = self.scorer.calculate_score(metrics)
candidates.append((trader["address"], score, metrics))
except Exception as e:
# Skip traders with errors
continue
return sorted(candidates, key=lambda x: x[1], reverse=True)
async def find_market_specialists(
self,
market_category: str,
min_trades_in_category: int = 20
) -> list[str]:
"""Find traders who specialize in specific market categories."""
# Implementation would query by category
pass
async def find_original_traders(
self,
min_originality_score: float = 0.7
) -> list[str]:
"""
Find traders who make original trades (not copy trading).
Originality is measured by:
- Trade timing (not consistently after other traders)
- Position uniqueness (not mirroring others)
- Contrarian indicators
"""
leaderboard = await self.client.get_leaderboard(limit=200)
original_traders = []
for trader in leaderboard:
activity = await self.client.get_trader_activity(
trader["address"],
limit=100
)
originality = await self._calculate_originality(activity)
if originality >= min_originality_score:
original_traders.append(trader["address"])
return original_traders
async def _calculate_originality(
self,
trades: list[dict]
) -> float:
"""Calculate how original a trader's trades are."""
# Compare trade timing with market average
# Check for unique position entries
# Measure contrarian behavior
return 0.5 # Placeholder
class CopyTradingManager:
def __init__(
self,
data_client: PolymarketDataClient,
trading_service, # Your trading service
config: dict
):
self.client = data_client
self.trading = trading_service
self.tracked_traders: dict[str, dict] = {}
self.copy_delay = config.get("copy_delay_seconds", 30)
self.size_multiplier = config.get("size_multiplier", 0.25)
self.max_position_pct = config.get("max_position_pct", 0.1)
def add_trader(
self,
address: str,
multiplier: float = None,
markets: list[str] = None
):
"""Add a trader to copy."""
self.tracked_traders[address] = {
"multiplier": multiplier or self.size_multiplier,
"markets": markets, # None = all markets
"last_trade": None
}
def remove_trader(self, address: str):
"""Stop copying a trader."""
self.tracked_traders.pop(address, None)
async def process_trade(self, trade: dict):
"""Process a trade from tracked trader."""
address = trade["trader_address"]
if address not in self.tracked_traders:
return
config = self.tracked_traders[address]
# Check market filter
if config["markets"] and trade["market_id"] not in config["markets"]:
return
# Wait for delay
await asyncio.sleep(self.copy_delay)
# Calculate size
size = trade["size"] * config["multiplier"]
# Apply max position limit
portfolio = await self.trading.get_portfolio()
max_size = portfolio["value"] * self.max_position_pct / trade["price"]
size = min(size, max_size)
# Execute copy trade
await self.trading.place_order(
token_id=trade["token_id"],
side=trade["side"],
price=trade["price"],
size=size,
metadata={"copy_source": address}
)
Real-Time Monitoring
import asyncio
from collections import defaultdict
from typing import Callable, Awaitable
class LiveTraderMonitor:
def __init__(self, tracked_addresses: list[str]):
self.tracked = set(tracked_addresses)
self.callbacks: dict[str, list[Callable]] = defaultdict(list)
self._running = False
def on_trade(self, callback: Callable[[dict], Awaitable[None]]):
"""Register callback for trade events."""
self.callbacks["trade"].append(callback)
return callback
def on_position_change(self, callback: Callable[[dict], Awaitable[None]]):
"""Register callback for position changes."""
self.callbacks["position"].append(callback)
return callback
async def start(self):
"""Start monitoring tracked traders."""
self._running = True
async for event in self._watch_events():
if not self._running:
break
if event.get("trader") in self.tracked:
event_type = event.get("type", "trade")
for callback in self.callbacks[event_type]:
try:
await callback(event)
except Exception as e:
print(f"Callback error: {e}")
def stop(self):
"""Stop monitoring."""
self._running = False
def add_trader(self, address: str):
"""Add trader to watch list."""
self.tracked.add(address)
def remove_trader(self, address: str):
"""Remove trader from watch list."""
self.tracked.discard(address)
async def _watch_events(self):
"""Watch for on-chain events."""
# Implementation would use WebSocket or polling
while self._running:
# Poll for new events
await asyncio.sleep(5)
yield {} # Placeholder
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.