Technical workflows composition state-management

Multi-Step Workflows

Patterns for chaining skills into complex, multi-step workflows with state management, branching logic, and recovery strategies.

Most real-world tasks need more than a single skill invocation. Deploying an application might mean running tests, building a container image, pushing to a registry, and updating a deployment manifest. Reviewing a pull request might require reading changed files, checking test coverage, running linters, and posting a summary. These are multi-step workflows: sequences of skill invocations that work together toward a bigger goal.

This article covers the core patterns for building reliable multi-step workflows, including how to chain skills, manage state between steps, handle branching logic, and recover when things go wrong.

Anatomy of a workflow

A workflow is a directed sequence of steps where each step invokes one or more skills and passes its results forward. The simplest version is a linear chain:

Step 1: Read file --> Step 2: Transform content --> Step 3: Write file

But real workflows branch, loop, and sometimes need to backtrack. Before getting into the advanced patterns, let’s cover the building blocks.

The workflow context object

Every multi-step workflow needs shared context, a place to accumulate results, track progress, and store decisions. Think of it as the workflow’s working memory.

interface WorkflowContext {
  // Input parameters
  input: Record<string, unknown>;
  // Results from each completed step
  stepResults: Map<string, StepResult>;
  // Current workflow state
  status: "running" | "paused" | "completed" | "failed";
  // Metadata for recovery
  currentStep: string;
  startedAt: Date;
  completedSteps: string[];
}

interface StepResult {
  stepName: string;
  output: unknown;
  duration: number;
  timestamp: Date;
}

In Python, the equivalent pattern uses a dataclass or typed dictionary:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class WorkflowContext:
    input: dict[str, Any]
    step_results: dict[str, Any] = field(default_factory=dict)
    status: str = "running"
    current_step: str = ""
    completed_steps: list[str] = field(default_factory=list)
    started_at: datetime = field(default_factory=datetime.now)

Chaining skills for complex tasks

The most common workflow pattern is a linear chain where each step depends on the output of the previous one. The key principle here: each step should produce a well-defined output that the next step can consume without ambiguity.

Pattern: sequential pipeline

async function deployWorkflow(ctx: WorkflowContext): Promise<WorkflowContext> {
  // Step 1: Run tests
  const testResult = await invoke("run_tests", {
    path: ctx.input.projectPath,
  });
  ctx.stepResults.set("tests", testResult);

  if (!testResult.success) {
    ctx.status = "failed";
    return ctx;
  }

  // Step 2: Build container image
  const buildResult = await invoke("build_image", {
    path: ctx.input.projectPath,
    tag: ctx.input.version,
  });
  ctx.stepResults.set("build", buildResult);

  // Step 3: Push to registry
  const pushResult = await invoke("push_image", {
    image: buildResult.imageName,
    registry: ctx.input.registry,
  });
  ctx.stepResults.set("push", pushResult);

  // Step 4: Update deployment
  const deployResult = await invoke("update_deployment", {
    manifest: ctx.input.manifestPath,
    image: pushResult.fullImageRef,
  });
  ctx.stepResults.set("deploy", deployResult);

  ctx.status = "completed";
  return ctx;
}

Notice how each step extracts exactly what it needs from the previous step’s result. This is deliberate. Tight coupling between steps makes workflows brittle.

Pattern: accumulating pipeline

Sometimes each step adds to a growing result rather than transforming the previous output. This comes up often in research and analysis workflows.

async def code_review_workflow(ctx: WorkflowContext) -> WorkflowContext:
    """Review a pull request by accumulating findings from multiple analyses."""

    findings = []

    # Step 1: Check for style issues
    lint_result = await invoke("run_linter", path=ctx.input["pr_path"])
    findings.extend(lint_result.issues)
    ctx.step_results["lint"] = lint_result

    # Step 2: Check test coverage
    coverage_result = await invoke("check_coverage", path=ctx.input["pr_path"])
    if coverage_result.coverage_percent < 80:
        findings.append({
            "type": "coverage",
            "message": f"Coverage is {coverage_result.coverage_percent}%, below 80% threshold"
        })
    ctx.step_results["coverage"] = coverage_result

    # Step 3: Analyze complexity
    complexity_result = await invoke("analyze_complexity", path=ctx.input["pr_path"])
    findings.extend(complexity_result.warnings)
    ctx.step_results["complexity"] = complexity_result

    # Step 4: Generate summary from accumulated findings
    summary = await invoke("generate_review_summary", findings=findings)
    ctx.step_results["summary"] = summary

    ctx.status = "completed"
    return ctx

State management between steps

As workflows grow, managing state becomes the central challenge. Here are some proven strategies.

Keep state minimal and explicit

Only store what downstream steps actually need. A common mistake is dumping the entire result of every step into context, which bloats memory and makes it harder for the agent to figure out what matters.

ApproachProsCons
Full result storageNothing is lostContext bloat, harder to reason about
Selective extractionClean, focused contextYou need to know what downstream steps want
Summarized resultsCompact, easy to scanLoses detail you might need later

The best approach in practice is selective extraction with fallback access:

// After running tests, extract only what downstream steps need
const testResult = await invoke("run_tests", { path: projectPath });

ctx.stepResults.set("tests", {
  passed: testResult.passed,
  failCount: testResult.failures.length,
  // Keep full details available but separate from the main flow
  _raw: testResult,
});

Checkpointing for long workflows

For workflows that take minutes or longer, save checkpoints so you can resume after an interruption. This matters especially for agent skills that might hit context window limits mid-workflow. See Context Management for more on working within those constraints.

async function checkpointedWorkflow(
  ctx: WorkflowContext,
): Promise<WorkflowContext> {
  const steps = [
    { name: "fetch_data", fn: fetchData },
    { name: "transform", fn: transformData },
    { name: "validate", fn: validateResults },
    { name: "publish", fn: publishResults },
  ];

  for (const step of steps) {
    // Skip already-completed steps (supports resume)
    if (ctx.completedSteps.includes(step.name)) {
      continue;
    }

    ctx.currentStep = step.name;
    await saveCheckpoint(ctx);

    const result = await step.fn(ctx);
    ctx.stepResults.set(step.name, result);
    ctx.completedSteps.push(step.name);
  }

  ctx.status = "completed";
  await saveCheckpoint(ctx);
  return ctx;
}

Branching and conditional logic

Real workflows rarely follow a single straight path. The agent needs to make decisions based on intermediate results.

Pattern: conditional branching

async def migration_workflow(ctx: WorkflowContext) -> WorkflowContext:
    # Step 1: Analyze the current schema
    analysis = await invoke("analyze_schema", db=ctx.input["database"])
    ctx.step_results["analysis"] = analysis

    # Branch based on migration complexity
    if analysis.breaking_changes:
        # Complex path: requires backup and staged rollout
        await invoke("create_backup", db=ctx.input["database"])
        await invoke("apply_migration_staged", migration=analysis.migration_plan)
        await invoke("verify_migration", db=ctx.input["database"])
    elif analysis.has_changes:
        # Simple path: direct migration
        await invoke("apply_migration", migration=analysis.migration_plan)
    else:
        # No changes needed
        ctx.step_results["migration"] = {"status": "no_changes"}
        ctx.status = "completed"
        return ctx

    ctx.status = "completed"
    return ctx

Pattern: fan-out and fan-in

When multiple independent tasks can run in parallel, use fan-out/fan-in. Skill Composition covers this in more depth, but the basic pattern works for workflows too.

async function analyzeRepository(
  ctx: WorkflowContext,
): Promise<WorkflowContext> {
  // Fan-out: run independent analyses in parallel
  const [dependencies, security, codeQuality, testCoverage] = await Promise.all(
    [
      invoke("analyze_dependencies", { path: ctx.input.repoPath }),
      invoke("security_scan", { path: ctx.input.repoPath }),
      invoke("code_quality_check", { path: ctx.input.repoPath }),
      invoke("coverage_report", { path: ctx.input.repoPath }),
    ],
  );

  // Fan-in: combine results into a unified report
  ctx.stepResults.set("analyses", {
    dependencies,
    security,
    codeQuality,
    testCoverage,
  });

  const report = await invoke("generate_report", {
    analyses: ctx.stepResults.get("analyses"),
  });
  ctx.stepResults.set("report", report);

  ctx.status = "completed";
  return ctx;
}

Rollback and recovery strategies

Things will fail. Network calls time out. APIs return errors. Files get locked. A solid workflow needs a plan for when that happens. For the foundational error handling patterns behind these recovery strategies, see Error Handling Patterns.

Pattern: compensating actions

For each step that changes state (writes a file, updates a database, deploys a service), define a compensating action that undoes the change.

interface WorkflowStep {
  name: string;
  execute: (ctx: WorkflowContext) => Promise<StepResult>;
  compensate?: (ctx: WorkflowContext) => Promise<void>;
}

async function executeWithRollback(
  steps: WorkflowStep[],
  ctx: WorkflowContext,
): Promise<WorkflowContext> {
  const completedSteps: WorkflowStep[] = [];

  for (const step of steps) {
    try {
      const result = await step.execute(ctx);
      ctx.stepResults.set(step.name, result);
      completedSteps.push(step);
    } catch (error) {
      // Rollback completed steps in reverse order
      for (const completed of completedSteps.reverse()) {
        if (completed.compensate) {
          await completed.compensate(ctx);
        }
      }
      ctx.status = "failed";
      return ctx;
    }
  }

  ctx.status = "completed";
  return ctx;
}

Pattern: partial completion with resume

Sometimes full rollback isn’t what you want. You’ve done useful work and want to keep it. Instead, mark the workflow as partially complete and allow resumption.

async def resilient_workflow(ctx: WorkflowContext) -> WorkflowContext:
    steps = [
        ("download", download_dataset),
        ("clean", clean_data),
        ("analyze", run_analysis),
        ("report", generate_report),
    ]

    for step_name, step_fn in steps:
        if step_name in ctx.completed_steps:
            continue

        try:
            result = await step_fn(ctx)
            ctx.step_results[step_name] = result
            ctx.completed_steps.append(step_name)
        except Exception as e:
            ctx.status = "paused"
            ctx.step_results[f"{step_name}_error"] = str(e)
            # Return partial progress — can be resumed later
            return ctx

    ctx.status = "completed"
    return ctx

Guidelines for reliable workflows

Building multi-step workflows that hold up in production takes some discipline. Here are the principles that matter most:

  1. Make steps idempotent. If a step runs twice with the same input, it should produce the same result. This is what makes retry and resume logic work.

  2. Validate between steps. Don’t assume the output of step N is valid input for step N+1. Add assertions or validation checks at step boundaries.

  3. Keep steps focused. Each step should do one thing. If a step is doing three things, it should probably be three steps. This relates closely to the God Skill anti-pattern.

  4. Log step transitions. Record when each step starts, completes, or fails. This is invaluable when debugging workflows that fail intermittently.

  5. Set timeouts on every step. A single hung step can block an entire workflow. Always define a maximum duration and treat the timeout as an error.

  6. Design for human oversight. Long or high-impact workflows should include approval gates at critical decision points, especially before irreversible actions like deployments or data deletions.

These patterns form the backbone of complex agent behavior. Start with simple linear chains, add branching as your workflows demand it, and always have a recovery strategy for when things go sideways.