Spec Engine

The spec engine is the runtime layer that reads a decomposed plan file, executes each task in order, runs quality gates after each one, and tracks progress so a run can be paused and resumed.

Mental model

A spec plan is an XML file stored under .claude/plans/. When you trigger execution, the engine works through four concerns in sequence:

  1. Readingread_spec(plan_path) parses the plan file and returns a list of DecomposedTask objects.
  2. OrchestratingPipelineOrchestrator iterates those tasks, calls quality gates after each one via run_gates_for_task, and collects results into a PipelineResult.
  3. Gating — each task produces a TaskResult with fields like quality_gate_passed, tests_passed, gate_score, and severity. The orchestrator uses these to decide whether to continue, pause for approval, or surface an error.
  4. State trackingSpecState records which task IDs are in completed and which is current. save_state writes this back into an HTML comment inside the plan file itself, so the file is the single source of truth. get_pending_tasks filters the full task list down to whatever hasn't finished yet, enabling resumption mid-run.

Core data structures

Type What it represents
TaskResult The outcome of one task: whether it executed, whether quality_gate_passed and tests_passed are satisfied, the gate_score (float), and any error string. The severity property classifies the gate result for display.
PipelineResult The rolled-up outcome across all tasks: spec_path, every TaskResult in tasks, total_cost, duration_ms, and success (true only when all tasks executed and passed gates).
SpecState Durable progress record: plan_path, the list of completed task IDs, the current task ID, and an auto_run flag that controls whether the engine prompts for approval between tasks.

Execution entry points

Interactive execution with per-task approval

from spec import execute_with_approval

result = execute_with_approval(
    spec_path,
    on_task_complete,
    skip_gates=False,
    skip_tests=False,
    skip_simplify=False,
)

execute_with_approval pauses after each task so you can approve, redo with new instructions, or flip auto_run to let the rest complete unattended.

Programmatic execution

from pipeline import PipelineOrchestrator

orchestrator = PipelineOrchestrator(
    spec_path,
    skip_gates=False,
    skip_tests=False,
    skip_simplify=False,
)
result = orchestrator.run_all(
    on_task_complete=callback,
    skip_task_ids={"task-id-to-omit"},
)

Pass skip_task_ids to re-run a subset of tasks from an existing plan without reprocessing ones that already passed.

State lifecycle

load_state(plan_path)        # returns SpecState | None
    │
    ▼
get_pending_tasks(tasks, state)   # filters out completed IDs
    │
    ▼
[execute tasks, update state.completed after each]
    │
    ├─ save_state(state)     # persists progress into plan file
    │
    └─ clear_state(plan_path)  # removes state when run finishes

find_resumable_plans(plans_dir) scans .claude/plans/ for any plan file that still carries a SpecState comment, giving you a list of interrupted runs you can pick back up.

Presentation layer

The spec package includes a set of formatting functions that render engine output for display:

When the engine matters

The spec engine is relevant whenever you need to understand why a run stopped, how to resume it, or how quality gate outcomes map to the severity and gate_score fields on TaskResult. If you're writing code that hooks into execution — for example, an on_task_complete callback or a custom presenter — these are the types and functions you'll work with directly.

Unresolved references

Auto-generated by attune-author fact-check. Review and either fix the source code, fix this doc, or add an override.

Location Severity Issue
Line 36 (code fence) error from spec import … — module not importable
Line 52 (code fence) error from pipeline import … — module not importable