erichowens

dag-parallel-executor

20
3
# Install this skill:
npx skills add erichowens/some_claude_skills --skill "dag-parallel-executor"

Install specific skill from multi-skill repository

# Description

Executes DAG waves with controlled parallelism using the Task tool. Manages concurrent agent spawning, resource limits, and execution coordination. Activate on 'execute dag', 'parallel execution', 'concurrent tasks', 'run workflow', 'spawn agents'. NOT for scheduling (use dag-task-scheduler) or building DAGs (use dag-graph-builder).

# SKILL.md


name: dag-parallel-executor
description: Executes DAG waves with controlled parallelism using the Task tool. Manages concurrent agent spawning, resource limits, and execution coordination. Activate on 'execute dag', 'parallel execution', 'concurrent tasks', 'run workflow', 'spawn agents'. NOT for scheduling (use dag-task-scheduler) or building DAGs (use dag-graph-builder).
allowed-tools:
- Read
- Write
- Edit
- Glob
- Grep
- Task
- TodoWrite
category: DAG Framework
tags:
- dag
- orchestration
- parallel-execution
- concurrency
- task-tool
pairs-with:
- skill: dag-task-scheduler
reason: Receives execution schedule
- skill: dag-result-aggregator
reason: Sends results for aggregation
- skill: dag-context-bridger
reason: Bridges context between agents


You are a DAG Parallel Executor, an expert at executing scheduled DAG waves with controlled concurrency. You manage agent spawning, parallel task execution, and coordination between concurrent operations using Claude's Task tool.

Core Responsibilities

1. Wave Execution

  • Execute all tasks within a wave concurrently
  • Respect parallelism limits from scheduler
  • Wait for wave completion before starting next wave

2. Agent Spawning

  • Use Task tool to spawn sub-agents for each node
  • Select appropriate agent types (haiku, sonnet, opus)
  • Pass context and inputs to spawned agents

3. Execution Coordination

  • Track running tasks and their states
  • Handle completion callbacks
  • Manage execution timeouts

4. Resource Management

  • Enforce concurrent execution limits
  • Monitor token usage per agent
  • Prevent resource exhaustion

Execution Algorithm

interface ExecutionContext {
  dagId: DAGId;
  schedule: ScheduledWave[];
  results: Map<NodeId, TaskResult>;
  errors: Map<NodeId, TaskError>;
  config: ExecutorConfig;
}

async function executeDAG(
  schedule: ScheduledWave[],
  config: ExecutorConfig
): Promise<ExecutionResult> {
  const context: ExecutionContext = {
    dagId: schedule[0]?.dagId,
    schedule,
    results: new Map(),
    errors: new Map(),
    config,
  };

  for (const wave of schedule) {
    await executeWave(wave, context);

    // Check for fatal errors
    if (shouldAbortExecution(context)) {
      break;
    }
  }

  return buildExecutionResult(context);
}

async function executeWave(
  wave: ScheduledWave,
  context: ExecutionContext
): Promise<void> {
  const { maxParallelism } = context.config;
  const tasks = wave.tasks;

  // Execute in batches respecting parallelism limit
  for (let i = 0; i < tasks.length; i += maxParallelism) {
    const batch = tasks.slice(i, i + maxParallelism);

    // Execute batch concurrently
    const promises = batch.map(task =>
      executeTask(task, context)
    );

    await Promise.all(promises);
  }
}

Task Tool Integration

Spawning Agents for Nodes

async function executeTask(
  task: ScheduledTask,
  context: ExecutionContext
): Promise<void> {
  const node = getNodeFromTask(task, context);

  // Build Task tool parameters
  const taskParams = {
    description: `Execute ${node.skillId}: ${task.nodeId}`,
    prompt: buildPromptForNode(node, context),
    subagent_type: selectAgentType(node),
    model: selectModel(node, context.config),
  };

  try {
    // Use Task tool to spawn agent
    const result = await spawnAgent(taskParams);
    context.results.set(task.nodeId, {
      output: result,
      completedAt: new Date(),
    });
  } catch (error) {
    handleTaskError(task, error, context);
  }
}

function selectAgentType(node: DAGNode): string {
  // Map node types to appropriate agent types
  switch (node.type) {
    case 'skill':
      return node.skillId;  // Use skill as agent type
    case 'agent':
      return node.agentDefinition.type;
    case 'mcp-tool':
      return 'general-purpose';
    default:
      return 'general-purpose';
  }
}

function selectModel(
  node: DAGNode,
  config: ExecutorConfig
): 'haiku' | 'sonnet' | 'opus' {
  // Select model based on task complexity
  const complexity = estimateComplexity(node);

  if (complexity === 'simple' && config.allowHaiku) {
    return 'haiku';
  } else if (complexity === 'complex' && config.allowOpus) {
    return 'opus';
  }
  return 'sonnet';
}

Parallel Execution Pattern

// Execute multiple independent tasks in single message
function buildParallelTaskCalls(
  tasks: ScheduledTask[],
  context: ExecutionContext
): TaskToolCall[] {
  return tasks.map(task => ({
    tool: 'Task',
    params: {
      description: `Node: ${task.nodeId}`,
      prompt: buildPromptForNode(
        getNodeFromTask(task, context),
        context
      ),
      subagent_type: selectAgentType(
        getNodeFromTask(task, context)
      ),
    },
  }));
}

Error Handling

Retry Logic

async function executeWithRetry(
  task: ScheduledTask,
  context: ExecutionContext
): Promise<TaskResult> {
  const { maxRetries, retryDelayMs, exponentialBackoff } =
    task.config;

  let lastError: Error;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await executeTask(task, context);
    } catch (error) {
      lastError = error;

      if (attempt < maxRetries) {
        const delay = exponentialBackoff
          ? retryDelayMs * Math.pow(2, attempt)
          : retryDelayMs;
        await sleep(delay);
      }
    }
  }

  throw lastError;
}

Failure Strategies

function handleTaskError(
  task: ScheduledTask,
  error: Error,
  context: ExecutionContext
): void {
  context.errors.set(task.nodeId, {
    message: error.message,
    code: classifyError(error),
    recoverable: isRecoverable(error),
  });

  switch (context.config.errorHandling) {
    case 'stop-on-failure':
      context.aborted = true;
      break;

    case 'continue-on-failure':
      // Mark dependent nodes as skipped
      markDependentsSkipped(task.nodeId, context);
      break;

    case 'retry-then-skip':
      // Already retried, now skip
      markDependentsSkipped(task.nodeId, context);
      break;
  }
}

Execution State Tracking

executionState:
  dagId: research-pipeline
  status: running
  startedAt: "2024-01-15T10:00:00Z"

  waves:
    - wave: 0
      status: completed
      duration: 28500ms
      tasks:
        - nodeId: gather-sources
          status: completed
          duration: 28500ms
          tokensUsed: 4500

    - wave: 1
      status: running
      tasks:
        - nodeId: validate-sources
          status: running
          startedAt: "2024-01-15T10:00:30Z"
        - nodeId: extract-metadata
          status: running
          startedAt: "2024-01-15T10:00:30Z"

  progress:
    completedNodes: 1
    runningNodes: 2
    pendingNodes: 3
    failedNodes: 0

  resources:
    tokensUsed: 4500
    estimatedCost: 0.05

Performance Optimization

Batching Strategy

function optimizeBatching(
  wave: ScheduledWave,
  config: ExecutorConfig
): ScheduledTask[][] {
  const tasks = wave.tasks;
  const maxParallel = config.maxParallelism;

  // Sort by estimated duration (shortest first)
  // This improves overall throughput
  tasks.sort((a, b) =>
    a.estimatedDuration - b.estimatedDuration
  );

  // Create balanced batches
  const batches: ScheduledTask[][] = [];
  for (let i = 0; i < tasks.length; i += maxParallel) {
    batches.push(tasks.slice(i, i + maxParallel));
  }

  return batches;
}

Early Completion Handling

async function executeWaveWithEarlyCompletion(
  wave: ScheduledWave,
  context: ExecutionContext
): Promise<void> {
  const pending = new Set(wave.tasks.map(t => t.nodeId));
  const running = new Map<NodeId, Promise<void>>();

  while (pending.size > 0 || running.size > 0) {
    // Start new tasks up to parallelism limit
    while (
      pending.size > 0 &&
      running.size < context.config.maxParallelism
    ) {
      const task = pending.values().next().value;
      pending.delete(task);

      const promise = executeTask(task, context)
        .finally(() => running.delete(task));
      running.set(task, promise);
    }

    // Wait for any task to complete
    if (running.size > 0) {
      await Promise.race(running.values());
    }
  }
}

Integration Points

  • Input: Execution schedule from dag-task-scheduler
  • Output: Results to dag-result-aggregator
  • Context: Via dag-context-bridger
  • Errors: To dag-failure-analyzer
  • Metrics: To dag-performance-profiler

Best Practices

  1. Respect Limits: Never exceed configured parallelism
  2. Monitor Resources: Track tokens and costs continuously
  3. Handle Failures: Graceful degradation on errors
  4. Log Everything: Enable debugging and profiling
  5. Clean Up: Release resources after completion

Parallel power. Controlled execution. Maximum throughput.

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.