Use when you have a written implementation plan to execute in a separate session with review checkpoints
npx skills add BAiSEDagent/openclaw-skills --skill "event-driven"
Install specific skill from multi-skill repository
# Description
Event-driven architecture. Message queues, pub/sub, event sourcing. Use when building systems that react to events or need async processing.
# SKILL.md
name: event-driven
description: "Event-driven architecture. Message queues, pub/sub, event sourcing. Use when building systems that react to events or need async processing."
metadata:
openclaw:
emoji: "π¨"
Event-Driven Architecture
Build systems that react to events asynchronously. Essential for agent coordination and blockchain event processing.
Patterns
| Pattern | Use Case | Complexity |
|---|---|---|
| Event Emitter | In-process pub/sub | Low |
| WebSocket | Real-time client updates | Medium |
| Redis Pub/Sub | Multi-process messaging | Medium |
| Message Queue | Reliable async processing | High |
| Event Sourcing | Full audit trail | High |
In-Process Event Bus
import { EventEmitter } from 'events';
const bus = new EventEmitter();
// Define event types
interface AgentEvents {
'payment:received': { from: string; amount: bigint; txHash: string };
'payment:settled': { txHash: string; attestationUid: string };
'agent:registered': { name: string; address: string };
'health:degraded': { service: string; responseMs: number };
}
// Subscribe
bus.on('payment:received', async (event) => {
console.log(`Payment from ${event.from}: ${formatUSDC(event.amount)}`);
await createEASAttestation(event);
bus.emit('payment:settled', { txHash: event.txHash, attestationUid: uid });
});
// Publish
bus.emit('payment:received', { from: '0x...', amount: 100000n, txHash: '0x...' });
WebSocket (Real-Time Updates)
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
// Broadcast events to all connected clients
function broadcast(event: string, data: any) {
const message = JSON.stringify({ type: event, data, timestamp: Date.now() });
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
// Wire up: blockchain events β WebSocket broadcast
publicClient.watchEvent({
address: USDC_ADDRESS,
event: transferEvent,
args: { to: AGENT_ADDRESS },
onLogs: (logs) => {
for (const log of logs) {
broadcast('payment:received', {
from: log.args.from,
amount: log.args.value.toString(),
txHash: log.transactionHash,
});
}
}
});
Redis Pub/Sub (Multi-Process)
import Redis from 'ioredis';
const publisher = new Redis(process.env.REDIS_URL);
const subscriber = new Redis(process.env.REDIS_URL);
// Publish
await publisher.publish('agent:events', JSON.stringify({
type: 'payment:received',
data: { from: '0x...', amount: '100000' }
}));
// Subscribe
subscriber.subscribe('agent:events');
subscriber.on('message', (channel, message) => {
const event = JSON.parse(message);
handleEvent(event);
});
Event Sourcing Pattern
interface DomainEvent {
id: string;
type: string;
aggregateId: string; // e.g., agent address
data: Record<string, any>;
timestamp: number;
version: number;
}
// Store all events (append-only)
async function appendEvent(event: DomainEvent) {
await db.insert(events).values(event);
}
// Rebuild state from events
async function getAgentState(agentId: string) {
const history = await db.select().from(events)
.where(eq(events.aggregateId, agentId))
.orderBy(events.version);
return history.reduce((state, event) => {
switch (event.type) {
case 'agent:registered': return { ...state, active: true, ...event.data };
case 'payment:received': return { ...state, balance: state.balance + event.data.amount };
case 'payment:sent': return { ...state, balance: state.balance - event.data.amount };
default: return state;
}
}, { balance: 0n, active: false });
}
Blockchain Events β Application Events
// Pattern: Watch on-chain events, convert to application events
async function startBlockchainListener() {
// USDC transfers
publicClient.watchEvent({
address: USDC_ADDRESS,
event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'),
args: { to: AGENT_ADDRESS },
onLogs: (logs) => logs.forEach(log =>
bus.emit('payment:received', {
from: log.args.from,
amount: log.args.value,
txHash: log.transactionHash,
})
)
});
// EAS attestations
publicClient.watchEvent({
address: EAS_ADDRESS,
event: parseAbiItem('event Attested(address indexed, address indexed, bytes32, bytes32 indexed)'),
onLogs: (logs) => logs.forEach(log =>
bus.emit('attestation:created', {
attester: log.args[0],
recipient: log.args[1],
uid: log.args[2],
})
)
});
}
Cross-References
- webhooks: HTTP-based event delivery
- api-design: Event-driven API patterns
- monitoring-alerting: Alert on specific events
- database-patterns: Event storage
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.