BAiSEDagent

event-driven

0
0
# Install this skill:
npx skills add BAiSEDagent/openclaw-skills --skill "event-driven"

Install specific skill from multi-skill repository

# Description

Event-driven architecture. Message queues, pub/sub, event sourcing. Use when building systems that react to events or need async processing.

# SKILL.md


name: event-driven
description: "Event-driven architecture. Message queues, pub/sub, event sourcing. Use when building systems that react to events or need async processing."
metadata:
openclaw:
emoji: "📨"


Event-Driven Architecture

Build systems that react to events asynchronously. Essential for agent coordination and blockchain event processing.

Patterns

Pattern Use Case Complexity
Event Emitter In-process pub/sub Low
WebSocket Real-time client updates Medium
Redis Pub/Sub Multi-process messaging Medium
Message Queue Reliable async processing High
Event Sourcing Full audit trail High

In-Process Event Bus

import { EventEmitter } from 'events';

const bus = new EventEmitter();

// Define event types
interface AgentEvents {
  'payment:received': { from: string; amount: bigint; txHash: string };
  'payment:settled': { txHash: string; attestationUid: string };
  'agent:registered': { name: string; address: string };
  'health:degraded': { service: string; responseMs: number };
}

// Subscribe
bus.on('payment:received', async (event) => {
  console.log(`Payment from ${event.from}: ${formatUSDC(event.amount)}`);
  await createEASAttestation(event);
  bus.emit('payment:settled', { txHash: event.txHash, attestationUid: uid });
});

// Publish
bus.emit('payment:received', { from: '0x...', amount: 100000n, txHash: '0x...' });

WebSocket (Real-Time Updates)

import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

// Broadcast events to all connected clients
function broadcast(event: string, data: any) {
  const message = JSON.stringify({ type: event, data, timestamp: Date.now() });
  wss.clients.forEach(client => {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  });
}

// Wire up: blockchain events → WebSocket broadcast
publicClient.watchEvent({
  address: USDC_ADDRESS,
  event: transferEvent,
  args: { to: AGENT_ADDRESS },
  onLogs: (logs) => {
    for (const log of logs) {
      broadcast('payment:received', {
        from: log.args.from,
        amount: log.args.value.toString(),
        txHash: log.transactionHash,
      });
    }
  }
});

Redis Pub/Sub (Multi-Process)

import Redis from 'ioredis';

const publisher = new Redis(process.env.REDIS_URL);
const subscriber = new Redis(process.env.REDIS_URL);

// Publish
await publisher.publish('agent:events', JSON.stringify({
  type: 'payment:received',
  data: { from: '0x...', amount: '100000' }
}));

// Subscribe
subscriber.subscribe('agent:events');
subscriber.on('message', (channel, message) => {
  const event = JSON.parse(message);
  handleEvent(event);
});

Event Sourcing Pattern

interface DomainEvent {
  id: string;
  type: string;
  aggregateId: string;    // e.g., agent address
  data: Record<string, any>;
  timestamp: number;
  version: number;
}

// Store all events (append-only)
async function appendEvent(event: DomainEvent) {
  await db.insert(events).values(event);
}

// Rebuild state from events
async function getAgentState(agentId: string) {
  const history = await db.select().from(events)
    .where(eq(events.aggregateId, agentId))
    .orderBy(events.version);

  return history.reduce((state, event) => {
    switch (event.type) {
      case 'agent:registered': return { ...state, active: true, ...event.data };
      case 'payment:received': return { ...state, balance: state.balance + event.data.amount };
      case 'payment:sent': return { ...state, balance: state.balance - event.data.amount };
      default: return state;
    }
  }, { balance: 0n, active: false });
}

Blockchain Events → Application Events

// Pattern: Watch on-chain events, convert to application events
async function startBlockchainListener() {
  // USDC transfers
  publicClient.watchEvent({
    address: USDC_ADDRESS,
    event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'),
    args: { to: AGENT_ADDRESS },
    onLogs: (logs) => logs.forEach(log => 
      bus.emit('payment:received', {
        from: log.args.from,
        amount: log.args.value,
        txHash: log.transactionHash,
      })
    )
  });

  // EAS attestations
  publicClient.watchEvent({
    address: EAS_ADDRESS,
    event: parseAbiItem('event Attested(address indexed, address indexed, bytes32, bytes32 indexed)'),
    onLogs: (logs) => logs.forEach(log =>
      bus.emit('attestation:created', {
        attester: log.args[0],
        recipient: log.args[1],
        uid: log.args[2],
      })
    )
  });
}

Cross-References

  • webhooks: HTTP-based event delivery
  • api-design: Event-driven API patterns
  • monitoring-alerting: Alert on specific events
  • database-patterns: Event storage

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.