Use when you have a written implementation plan to execute in a separate session with review checkpoints
npx skills add ramidamolis-alt/agent-skills-workflows --skill "state-machine"
Install specific skill from multi-skill repository
# Description
Expert in complex state management with Finite State Machines, XState patterns, state persistence, race condition handling, and distributed state synchronization. Use for managing complex application states.
# SKILL.md
name: state-machine
description: Expert in complex state management with Finite State Machines, XState patterns, state persistence, race condition handling, and distributed state synchronization. Use for managing complex application states.
triggers: ["state", "fsm", "state machine", "xstate", "transition", "สถานะ"]
⚙️ State Machine Master Skill
Expert in designing and implementing robust state management systems for complex applications.
State Machine Fundamentals
Finite State Machine (FSM)
fsm_components:
states:
description: "Discrete conditions the system can be in"
examples: ["idle", "loading", "success", "error"]
events:
description: "Triggers that cause state transitions"
examples: ["FETCH", "SUCCESS", "FAILURE", "RETRY"]
transitions:
description: "Rules for moving between states"
format: "currentState + event → nextState"
actions:
description: "Side effects during transitions"
types: ["entry", "exit", "transition"]
guards:
description: "Conditions that must be true for transition"
examples: ["canRetry", "hasPermission", "isValidData"]
State Diagram Example
┌─────────────────────────────────────────────────────────────────┐
│ ORDER STATE MACHINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────┐ SUBMIT ┌─────────────┐ PAY ┌──────────┐ │
│ │ DRAFT │──────────>│ PENDING │────────>│ PAID │ │
│ └────────┘ └──────┬──────┘ └────┬─────┘ │
│ ^ │ │ │
│ │ EDIT CANCEL SHIP │
│ │ │ │ │
│ │ ┌──────v──────┐ ┌────v─────┐ │
│ └───────────────│ CANCELLED │ │ SHIPPED │ │
│ └─────────────┘ └────┬─────┘ │
│ │ │
│ DELIVER │
│ │ │
│ ┌──────v──────┐ │
│ │ DELIVERED │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
XState Implementation Patterns
Basic Machine
import { createMachine, interpret } from 'xstate';
const orderMachine = createMachine({
id: 'order',
initial: 'draft',
context: {
orderId: null,
items: [],
total: 0,
retries: 0
},
states: {
draft: {
on: {
ADD_ITEM: {
actions: 'addItem'
},
SUBMIT: {
target: 'pending',
guard: 'hasItems'
}
}
},
pending: {
on: {
PAY: 'processing',
CANCEL: 'cancelled'
}
},
processing: {
invoke: {
src: 'processPayment',
onDone: {
target: 'paid',
actions: 'savePaymentInfo'
},
onError: {
target: 'pending',
actions: 'handlePaymentError'
}
}
},
paid: {
on: { SHIP: 'shipped' }
},
shipped: {
on: { DELIVER: 'delivered' }
},
delivered: {
type: 'final'
},
cancelled: {
type: 'final'
}
}
});
Parallel States
const checkoutMachine = createMachine({
id: 'checkout',
type: 'parallel',
states: {
payment: {
initial: 'idle',
states: {
idle: { on: { ENTER_CARD: 'validating' } },
validating: {
on: {
VALID: 'complete',
INVALID: 'error'
}
},
complete: { type: 'final' },
error: { on: { RETRY: 'idle' } }
}
},
shipping: {
initial: 'idle',
states: {
idle: { on: { ENTER_ADDRESS: 'validating' } },
validating: {
on: {
VALID: 'complete',
INVALID: 'error'
}
},
complete: { type: 'final' },
error: { on: { RETRY: 'idle' } }
}
}
}
});
State Persistence Patterns
Checkpoint System
interface StateCheckpoint {
id: string;
timestamp: Date;
machineId: string;
state: any;
context: any;
history: StateTransition[];
}
class StatePersistence {
async saveCheckpoint(machine: any): Promise<string> {
const checkpoint: StateCheckpoint = {
id: generateId(),
timestamp: new Date(),
machineId: machine.id,
state: machine.getSnapshot().value,
context: machine.getSnapshot().context,
history: machine.getSnapshot().historyValue
};
// Save to Memory MCP
await mcp_Memory_create_entities([{
name: `Checkpoint_${checkpoint.id}`,
entityType: 'StateCheckpoint',
observations: [JSON.stringify(checkpoint)]
}]);
return checkpoint.id;
}
async restoreCheckpoint(checkpointId: string): Promise<any> {
const result = await mcp_Memory_search_nodes(`Checkpoint_${checkpointId}`);
const checkpoint = JSON.parse(result.observations[0]);
// Restore machine to checkpoint state
return interpret(machine)
.start(checkpoint.state)
.send({ type: 'RESTORE_CONTEXT', context: checkpoint.context });
}
}
Database Persistence
from sqlalchemy import Column, String, JSON, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class StateRecord(Base):
__tablename__ = 'state_records'
id = Column(String, primary_key=True)
entity_type = Column(String, index=True)
entity_id = Column(String, index=True)
current_state = Column(String)
context = Column(JSON)
updated_at = Column(DateTime)
class StateMachineRepository:
def save_state(self, entity_type: str, entity_id: str, state: str, context: dict):
record = StateRecord(
id=f"{entity_type}_{entity_id}",
entity_type=entity_type,
entity_id=entity_id,
current_state=state,
context=context,
updated_at=datetime.utcnow()
)
self.session.merge(record)
self.session.commit()
def load_state(self, entity_type: str, entity_id: str) -> Optional[StateRecord]:
return self.session.query(StateRecord).filter_by(
entity_type=entity_type,
entity_id=entity_id
).first()
Race Condition Handling
Optimistic Locking
interface StateUpdate {
entityId: string;
expectedVersion: number;
newState: string;
newContext: any;
}
async function updateStateOptimistic(update: StateUpdate): Promise<boolean> {
const result = await db.query(`
UPDATE state_records
SET current_state = $1, context = $2, version = version + 1
WHERE entity_id = $3 AND version = $4
RETURNING *
`, [update.newState, update.newContext, update.entityId, update.expectedVersion]);
if (result.rowCount === 0) {
throw new ConcurrencyError('State was modified by another process');
}
return true;
}
Event Sourcing Pattern
interface StateEvent {
id: string;
aggregateId: string;
eventType: string;
payload: any;
timestamp: Date;
version: number;
}
class EventSourcedStateMachine {
private events: StateEvent[] = [];
private currentState: string = 'initial';
private context: any = {};
apply(event: StateEvent): void {
// Store event
this.events.push(event);
// Update state based on event
const transition = this.machine.transition(this.currentState, event.eventType);
this.currentState = transition.value;
this.context = transition.context;
}
rebuild(events: StateEvent[]): void {
// Rebuild state from event history
this.events = [];
this.currentState = 'initial';
this.context = {};
for (const event of events) {
this.apply(event);
}
}
getEventsSince(version: number): StateEvent[] {
return this.events.filter(e => e.version > version);
}
}
Distributed State Synchronization
Multi-Node State Sync
distributed_patterns:
leader_election:
description: "Single leader handles state mutations"
implementation:
- Use Redis SETNX for lock
- Leader processes all transitions
- Followers read replicas
conflict_resolution:
strategies:
- last_write_wins: "Timestamp-based"
- merge: "Custom merge function"
- vector_clocks: "Causality tracking"
eventual_consistency:
implementation:
- Local state machine
- Async event publishing
- Periodic reconciliation
Redis-Based Distributed Lock
import redis
import time
class DistributedStateMachine:
def __init__(self, redis_client: redis.Redis, machine_id: str):
self.redis = redis_client
self.machine_id = machine_id
self.lock_key = f"lock:{machine_id}"
def acquire_lock(self, timeout: int = 10) -> bool:
"""Acquire distributed lock for state transition"""
identifier = str(uuid.uuid4())
acquired = self.redis.set(
self.lock_key,
identifier,
nx=True,
ex=timeout
)
return acquired
def release_lock(self, identifier: str) -> bool:
"""Release lock if we own it"""
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return self.redis.eval(script, 1, self.lock_key, identifier)
async def transition_with_lock(self, event: str) -> bool:
"""Execute state transition with distributed lock"""
identifier = str(uuid.uuid4())
if not self.acquire_lock():
raise ConcurrencyError("Could not acquire lock")
try:
# Load current state
state_data = await self.load_state()
# Apply transition
new_state = self.machine.transition(state_data.state, event)
# Save new state
await self.save_state(new_state)
return True
finally:
self.release_lock(identifier)
MCP Integration
State Analysis with UltraThink
async def analyze_state_design(requirements):
"""
Design state machine using UltraThink
"""
return await mcp_UltraThink_ultrathink(
thought=f"""
Designing state machine for:
{requirements}
Analysis steps:
1. Identify all possible states
2. Map events that trigger transitions
3. Define guards for conditional transitions
4. Plan entry/exit actions
5. Consider parallel states
6. Design error recovery paths
State diagram:
""",
total_thoughts=25
)
Persist to Memory MCP
async def save_state_machine_design(design):
"""
Save state machine design to Memory
"""
await mcp_Memory_create_entities([{
"name": f"StateMachine_{design.name}",
"entityType": "StateMachineDesign",
"observations": [
f"States: {design.states}",
f"Events: {design.events}",
f"Transitions: {design.transitions}",
f"Guards: {design.guards}"
]
}])
Quick Reference
State Design Checklist
checklist:
states:
- [ ] All states identified
- [ ] Initial state defined
- [ ] Final states marked
- [ ] No orphan states
transitions:
- [ ] All events mapped
- [ ] Guards defined where needed
- [ ] Actions specified
- [ ] Error paths covered
implementation:
- [ ] Persistence strategy chosen
- [ ] Concurrency handled
- [ ] Rollback capability
- [ ] Logging/monitoring
Common Patterns
| Pattern | Use Case | Implementation |
|---|---|---|
| Simple FSM | Linear workflows | XState basic |
| Hierarchical | Nested states | XState compound |
| Parallel | Independent processes | XState parallel |
| History | Resume from substate | XState history |
| Event Sourcing | Audit trail | Custom + persistence |
Related Skills
rollback-engine: State rollback capabilitiesomega-agent: Complex state orchestrationml-pipeline: ML model state managementlanggraph: Graph-based agent states
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.